Bybitのログアーカイブ https://public.bybit.com/trading/ からログをダウンロードして解凍したい。
- HTTPSでファイルをダウンロード
- ダウンロードしながらストリームで解凍
- 順次処理(ログは大きいのでできるだけ省メモリで処理したい)
- HTTPクライアント Reqwest https://docs.rs/reqwest/latest/reqwest/
- 解凍ライブラリ async-compressionのGzDecoder
- CSV読み込み 安易に文字列を
,
でSplit(複雑になったらSerdeなど使った方が良い)
ライブラリの利用をcargo.toml
で宣言する。バージョンは2022/11時点での最新版。
reqwest = {version = "0.11.12", features=["stream", "gzip"]}
futures = {version="0.3.21"}
async-compression = { version = "0.3.12", features = ["futures-io", "gzip"] }
https://docs.rs/reqwest/latest/reqwest/index.html まずはReqwestのドキュメントをベースにwww.yahoo.co.jp
にアクセスするコードを試す。
#[tokio::test]
async fn download_yahoo() {
use futures::StreamExt;
match reqwest::get("https://www.yahoo.co.jp").await {
Ok(response) => {
let mut stream = response.bytes_stream();
while let Some(item) = stream.next().await {
print!("{:?}", item.unwrap());
}
},
Err(e) => {
panic!("Open URL Error {:?}", e);
}
}
}
プリント文でHTMLが出てくる。reqwest
の基本的な使い方はこれでOK.
ただ、過去約定ログは、「大きい」「GZIP圧縮されている」という条件があるのでストリームを解凍しながら処理したい。またClientにaccept: gzip
を指定しても、*.gzというファイルをGETするときにはうまく動かないので改善が必要。
ストリーム付きで処理するには以下のようにする。エラー処理はいい加減にStringに変換してしまっている。
#[tokio::test]
async fn stream_ungzip_get() -> Result<(), String> {
use async_compression::futures::bufread::GzipDecoder;
use futures::{
io::{self, BufReader, ErrorKind},
prelude::*,
};
use reqwest::Response;
// async で GET
let result = reqwest::get("https://public.bybit.com/trading/BTCUSD/BTCUSD2022-11-17.csv.gz").await;
let response: Response;
match result {
Ok(r) => {
response = r;
},
Err(e) => {
return Err(e.to_string());
}
}
// 順次解凍しながら表示
let reader = response
.bytes_stream()
.map_err(|e| io::Error::new(ErrorKind::Other, e))
.into_async_read();
let mut lines = BufReader::new(GzipDecoder::new(BufReader::new(reader)));
loop {
let mut buf = String::new();
match lines.read_line(&mut buf).await {
Ok(read_size) => {
if read_size == 0 {
break;
}
print!("{}", buf); // 表示
},
Err(_e) => {
// EOF
break;
}
}
}
Ok(())
}
これを動かすと、bybitのログが解凍して表示される。以下のようなかんじ。
1668728606,BTCUSD,Buy,1,16661.00,PlusTick,72f328d1-df58-5e22-ac52-9a6489430203,6002.040693835904,1,6.002040693835904e-05
1668728606,BTCUSD,Buy,611,16661.00,ZeroMinusTick,00fcc5e9-d36f-51ae-9906-11f83ed505a7,3.6672468639337374e+06,611,0.036672468639337374
1668728606,BTCUSD,Buy,61,16661.00,ZeroMinusTick,1e81d895-88b5-5c76-864f-13feac1e432d,366124.48232399015,61,0.0036612448232399017
1668728606,BTCUSD,Buy,433,16661.00,ZeroMinusTick,3709c1f7-12da-5f02-bed9-6eeac2829123,2.598883620430947e+06,433,0.025988836204309466
1668728606,BTCUSD,Buy,136,16661.50,PlusTick,4d4f2b17-acf2-5203-910c-8bde4d7e3d76,816253.0384419169,136,0.00816253038441917
1668728606,BTCUSD,Buy,1,16661.50,ZeroMinusTick,877369b5-b64d-5dc7-94d6-d2193a2fd85d,6001.860576778801,1,6.0018605767788016e-05
1668728606,BTCUSD,Buy,3077,16661.50,ZeroMinusTick,d5c4d55e-1dc9-50b8-9d22-4d8247cf2fe2,1.8467724994748373e+07,3077,0.18467724994748372
1668728606,BTCUSD,Buy,1,16661.50,ZeroMinusTick,1efb8db2-8777-59e3-a1d2-dd1a9dd57a03,6001.860576778801,1,6.0018605767788016e-05
1668728608,BTCUSD,Buy,2041,16661.50,ZeroMinusTick,2b974fcf-f959-502f-a020-7a0f2a1dca10,1.2249797437205534e+07,2041,0.12249797437205534
1668728610,BTCUSD,Sell,35,16661.00,MinusTick,d0561035-e9b0-5491-b6c5-755c652c2661,210071.42428425665,35,0.0021007142428425664
1668728611,BTCUSD,Buy,1839,16661.50,PlusTick,f2506a8e-8e49-570b-8bd8-26d42f387869,1.1037421600696217e+07,1839,0.11037421600696216
ちょっと安易だが、文字列のsplit(",")
で実装した。
ヘッは、timestamp,symbol,side,size,price,tickDirection,trdMatchID,grossValue,homeNotional,foreignNotional
なので必要な位置の情報のみ取り出してプリントした。エラー処理もまずはテキトー。
pub fn parse_log_rec(rec: &str) {
let rec_trim = rec.trim();
let row = rec_trim.split(","); // カラムに分割
let mut time_us: i64 = 0;
let mut price: f64 = 0.0;
let mut size: f64 = 0.0;
let mut order_side: String = String::new();
let mut id: String = String::new();
// カラム毎の処理
for (i, col) in row.enumerate() {
match i {
0 => {
/*timestamp*/
time_us = col.parse::<i64>().unwrap();
time_us *= 1_000;
}
1 => { /* symbol IGNORE */ }
2 => {
/* side */
order_side = col.to_string();
}
3 => {
/* size */
size = col.parse::<f64>().unwrap();
}
4 => {
/* price */
price = col.parse::<f64>().unwrap();
}
5 => { /* tickDirection IGNORE */ }
6 => {
/* trdMatchID */
id = col.to_string();
}
7 => { /* grossValue IGNORE */ }
8 => { /* homeNotional IGNORE */ }
9 => { /* foreignNotional IGNORE */ }
_ => {
/* ERROR */
panic!("unknon record format");
}
}
}
println!("time_us: {}, order_side: {}, price: {}, size: {}, id: {}", time_us, order_side, size, price, id);
}
テストコード
#[test]
fn test_parse() {
parse_log_rec("1668728606,BTCUSD,Buy,611,16661.00,ZeroMinusTick,00fcc5e9-d36f-51ae-9906-11f83ed505a7,3.6672468639337374e+06,611,0.036672468639337374");
}
結果
running 1 test
time_us: 1668728606000, order_side: Buy, price: 611, size: 16661, id: 00fcc5e9-d36f-51ae-9906-11f83ed505a7
test exchange::test_exchange::test_parse ... ok
パースして数値は数値型へ変更することができた。
csvだと、Async IOには対応していない。そこでAsyncIO対応のCSV-asyncを使う。
ドキュメントのページ https://docs.rs/crate/csv-async/1.2.4
まずCargo.toml
に以下を追加。
csv-async = {version="1.2.4"}
BufferReader
で行単位の処理をしているところを、csv_async::AsyncReader
に置き換えて実装。
以下具体的な実装
#[tokio::test]
async fn stream_ungzip_get_csv() -> Result<(), String> {
use async_compression::futures::bufread::GzipDecoder;
use csv_async;
use futures::{
io::{self, BufReader, ErrorKind},
prelude::*,
};
use reqwest::Response;
// async で GET
let result =
reqwest::get("https://public.bybit.com/trading/BTCUSD/BTCUSD2022-11-17.csv.gz").await;
let response: Response;
match result {
Ok(r) => {
response = r;
}
Err(e) => {
return Err(e.to_string());
}
}
// 順次解凍しながら表示
let reader = response
.bytes_stream()
.map_err(|e| io::Error::new(ErrorKind::Other, e))
.into_async_read();
//let mut lines = BufReader::new(GzipDecoder::new(BufReader::new(reader)));
let gz_reader = GzipDecoder::new(BufReader::new(reader));
let mut csv_reader = csv_async::AsyncReader::from_reader(gz_reader);
csv_reader.has_headers();
let mut records = csv_reader.records();
while let Some(record) = records.next().await {
let mut time_us: i64 = 0;
let mut price: f64 = 0.0;
let mut size: f64 = 0.0;
let mut order_side: String = String::new();
let mut id: String = String::new();
let record = record.unwrap();
time_us = record.get(0).unwrap().parse::<i64>().unwrap() * 1_000;
order_side = record.get(2).unwrap().to_string();
size = record.get(3).unwrap().parse::<f64>().unwrap();
price = record.get(4).unwrap().parse::<f64>().unwrap();
id = record.get(6).unwrap().to_string();
println!(
"time:{}, price:{}, size:{}, order_side:{}, id:{}",
time_us,
order_side,
size,
price,
id,
);
}
Ok(())
}
コメント