MENU

RustでBybitのログを解凍しながらダウンロードする。

Bybitのログアーカイブ https://public.bybit.com/trading/ からログをダウンロードして解凍したい。

  • HTTPSでファイルをダウンロード
  • ダウンロードしながらストリームで解凍
  • 順次処理(ログは大きいのでできるだけ省メモリで処理したい)

Rustのライブラリとして用意するもの

  • HTTPクライアント Reqwest https://docs.rs/reqwest/latest/reqwest/
  • 解凍ライブラリ async-compressionのGzDecoder
  • CSV読み込み 安易に文字列を , でSplit(複雑になったらSerdeなど使った方が良い)

cargo.toml

ライブラリの利用をcargo.tomlで宣言する。バージョンは2022/11時点での最新版。

reqwest = {version = "0.11.12", features=["stream", "gzip"]}
futures = {version="0.3.21"}
async-compression = { version = "0.3.12", features = ["futures-io", "gzip"] }

HTTPSアクセス(Reqwest)

https://docs.rs/reqwest/latest/reqwest/index.html まずはReqwestのドキュメントをベースにwww.yahoo.co.jpにアクセスするコードを試す。

    #[tokio::test]
    async fn download_yahoo() {
        use futures::StreamExt;

        match reqwest::get("https://www.yahoo.co.jp").await {
            Ok(response) => {
                let mut stream = response.bytes_stream();

                while let Some(item) = stream.next().await {
                    print!("{:?}", item.unwrap());
                }
            },
            Err(e) => {
                panic!("Open URL Error {:?}", e);
            }
        }
    }

プリント文でHTMLが出てくる。reqwestの基本的な使い方はこれでOK.

ただ、過去約定ログは、「大きい」「GZIP圧縮されている」という条件があるのでストリームを解凍しながら処理したい。またClientにaccept: gzipを指定しても、*.gzというファイルをGETするときにはうまく動かないので改善が必要。

GZ解凍付きHTTPSアクセス

ストリーム付きで処理するには以下のようにする。エラー処理はいい加減にStringに変換してしまっている。

   #[tokio::test]
    async fn stream_ungzip_get() -> Result<(), String> {
        use async_compression::futures::bufread::GzipDecoder;
        use futures::{
            io::{self, BufReader, ErrorKind},
            prelude::*,
        };
        use reqwest::Response;

        // async で GET
        let result = reqwest::get("https://public.bybit.com/trading/BTCUSD/BTCUSD2022-11-17.csv.gz").await;

        let response: Response;
        match result {
            Ok(r) => {
                response = r;
            },
            Err(e) => {
                return Err(e.to_string());
            }
        }

        // 順次解凍しながら表示
        let reader = response
            .bytes_stream()
            .map_err(|e| io::Error::new(ErrorKind::Other, e))
            .into_async_read();
        let mut lines  = BufReader::new(GzipDecoder::new(BufReader::new(reader)));

        loop {
            let mut buf = String::new();

            match lines.read_line(&mut buf).await {
                Ok(read_size) => {
                    if read_size == 0 {
                        break;
                    }
                    print!("{}", buf);  // 表示
                },
                Err(_e) => {
                    // EOF
                    break;
                }
            } 
        }

        Ok(())
    }

これを動かすと、bybitのログが解凍して表示される。以下のようなかんじ。

1668728606,BTCUSD,Buy,1,16661.00,PlusTick,72f328d1-df58-5e22-ac52-9a6489430203,6002.040693835904,1,6.002040693835904e-05
1668728606,BTCUSD,Buy,611,16661.00,ZeroMinusTick,00fcc5e9-d36f-51ae-9906-11f83ed505a7,3.6672468639337374e+06,611,0.036672468639337374
1668728606,BTCUSD,Buy,61,16661.00,ZeroMinusTick,1e81d895-88b5-5c76-864f-13feac1e432d,366124.48232399015,61,0.0036612448232399017
1668728606,BTCUSD,Buy,433,16661.00,ZeroMinusTick,3709c1f7-12da-5f02-bed9-6eeac2829123,2.598883620430947e+06,433,0.025988836204309466
1668728606,BTCUSD,Buy,136,16661.50,PlusTick,4d4f2b17-acf2-5203-910c-8bde4d7e3d76,816253.0384419169,136,0.00816253038441917
1668728606,BTCUSD,Buy,1,16661.50,ZeroMinusTick,877369b5-b64d-5dc7-94d6-d2193a2fd85d,6001.860576778801,1,6.0018605767788016e-05
1668728606,BTCUSD,Buy,3077,16661.50,ZeroMinusTick,d5c4d55e-1dc9-50b8-9d22-4d8247cf2fe2,1.8467724994748373e+07,3077,0.18467724994748372
1668728606,BTCUSD,Buy,1,16661.50,ZeroMinusTick,1efb8db2-8777-59e3-a1d2-dd1a9dd57a03,6001.860576778801,1,6.0018605767788016e-05
1668728608,BTCUSD,Buy,2041,16661.50,ZeroMinusTick,2b974fcf-f959-502f-a020-7a0f2a1dca10,1.2249797437205534e+07,2041,0.12249797437205534
1668728610,BTCUSD,Sell,35,16661.00,MinusTick,d0561035-e9b0-5491-b6c5-755c652c2661,210071.42428425665,35,0.0021007142428425664
1668728611,BTCUSD,Buy,1839,16661.50,PlusTick,f2506a8e-8e49-570b-8bd8-26d42f387869,1.1037421600696217e+07,1839,0.11037421600696216

CSVのパース

ちょっと安易だが、文字列のsplit(",")で実装した。

ヘッは、timestamp,symbol,side,size,price,tickDirection,trdMatchID,grossValue,homeNotional,foreignNotional なので必要な位置の情報のみ取り出してプリントした。エラー処理もまずはテキトー。

    pub fn parse_log_rec(rec: &str) {
        let rec_trim = rec.trim();
        let row = rec_trim.split(",");  // カラムに分割
    
        let mut time_us: i64 = 0;
        let mut price: f64 = 0.0;
        let mut size: f64 = 0.0;
        let mut order_side: String = String::new();
        let mut id: String = String::new();
    
        // カラム毎の処理
        for (i, col) in row.enumerate() {
            match i {
                0 => {
                    /*timestamp*/
                    time_us = col.parse::<i64>().unwrap();
                    time_us *= 1_000;
                }
                1 => { /* symbol IGNORE */ }
                2 => {
                    /* side */
                    order_side = col.to_string();
                }
                3 => {
                    /* size */
                    size = col.parse::<f64>().unwrap();
                }
                4 => {
                    /* price */
                    price = col.parse::<f64>().unwrap();
                }
                5 => { /* tickDirection IGNORE */ }
                6 => {
                    /* trdMatchID */
                    id = col.to_string();
                }
                7 => { /* grossValue IGNORE */ }
                8 => { /* homeNotional IGNORE */ }
                9 => { /* foreignNotional IGNORE */ }
                _ => {
                    /* ERROR */
                    panic!("unknon record format");
                }
            }
        }

        println!("time_us: {}, order_side: {}, price: {}, size: {}, id: {}", time_us, order_side, size, price, id);
    }
目次

テストコード


    #[test]
    fn test_parse() {
        parse_log_rec("1668728606,BTCUSD,Buy,611,16661.00,ZeroMinusTick,00fcc5e9-d36f-51ae-9906-11f83ed505a7,3.6672468639337374e+06,611,0.036672468639337374");
    }

結果

running 1 test
time_us: 1668728606000, order_side: Buy, price: 611, size: 16661, id: 00fcc5e9-d36f-51ae-9906-11f83ed505a7
test exchange::test_exchange::test_parse ... ok

パースして数値は数値型へ変更することができた。

追加(async IO対応のcsv-asyncを使う)

csvだと、Async IOには対応していない。そこでAsyncIO対応のCSV-asyncを使う。

ドキュメントのページ https://docs.rs/crate/csv-async/1.2.4

まずCargo.tomlに以下を追加。

csv-async = {version="1.2.4"}

BufferReaderで行単位の処理をしているところを、csv_async::AsyncReaderに置き換えて実装。

以下具体的な実装

    #[tokio::test]
    async fn stream_ungzip_get_csv() -> Result<(), String> {
        use async_compression::futures::bufread::GzipDecoder;
        use csv_async;
        use futures::{
            io::{self, BufReader, ErrorKind},
            prelude::*,
        };
        use reqwest::Response;

        // async で GET
        let result =
            reqwest::get("https://public.bybit.com/trading/BTCUSD/BTCUSD2022-11-17.csv.gz").await;

        let response: Response;
        match result {
            Ok(r) => {
                response = r;
            }
            Err(e) => {
                return Err(e.to_string());
            }
        }

        // 順次解凍しながら表示
        let reader = response
            .bytes_stream()
            .map_err(|e| io::Error::new(ErrorKind::Other, e))
            .into_async_read();
        //let mut lines  = BufReader::new(GzipDecoder::new(BufReader::new(reader)));
        let gz_reader = GzipDecoder::new(BufReader::new(reader));
        let mut csv_reader = csv_async::AsyncReader::from_reader(gz_reader);

        csv_reader.has_headers();
        let mut records = csv_reader.records();

        while let Some(record) = records.next().await {
            let mut time_us: i64 = 0;
            let mut price: f64 = 0.0;
            let mut size: f64 = 0.0;
            let mut order_side: String = String::new();
            let mut id: String = String::new();

            let record = record.unwrap();
            time_us = record.get(0).unwrap().parse::<i64>().unwrap() * 1_000;
            order_side = record.get(2).unwrap().to_string();
            size = record.get(3).unwrap().parse::<f64>().unwrap();
            price = record.get(4).unwrap().parse::<f64>().unwrap();
            id = record.get(6).unwrap().to_string();
            println!(
                "time:{}, price:{}, size:{}, order_side:{}, id:{}",
                time_us,
                order_side,
                size,
                price,
                id,
            );
        }

        Ok(())
    }
よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

コメント

コメントする

CAPTCHA


目次