[go: up one dir, main page]

Module ws_exchange

Module ws_exchange 

Source
Expand description

§WebSocket Exchange Trait

This module defines the WsExchange trait for real-time data streaming via WebSocket connections, enabling live market data and account updates.

§Overview

The WsExchange trait complements the REST-based Exchange trait by providing real-time streaming capabilities:

  • Connection Management: Connect, disconnect, and monitor WebSocket state
  • Public Data Streams: Real-time ticker, order book, trades, and OHLCV updates
  • Private Data Streams: Live balance, order, and trade updates (requires authentication)
  • Subscription Management: Subscribe/unsubscribe to specific channels

§Architecture

┌─────────────────────────────────────────────────────────────┐
│                     WsExchange Trait                        │
├─────────────────────────────────────────────────────────────┤
│  Connection Management                                      │
│  ├── ws_connect(), ws_disconnect()                          │
│  └── ws_is_connected(), ws_state()                          │
├─────────────────────────────────────────────────────────────┤
│  Public Data Streams                                        │
│  ├── watch_ticker(), watch_tickers()                        │
│  ├── watch_order_book(), watch_trades()                     │
│  └── watch_ohlcv()                                          │
├─────────────────────────────────────────────────────────────┤
│  Private Data Streams (Authenticated)                       │
│  ├── watch_balance()                                        │
│  ├── watch_orders()                                         │
│  └── watch_my_trades()                                      │
├─────────────────────────────────────────────────────────────┤
│  Subscription Management                                    │
│  └── subscribe(), unsubscribe(), subscriptions()            │
└─────────────────────────────────────────────────────────────┘

§Key Types

  • WsExchange: The WebSocket streaming trait
  • MessageStream<T>: A pinned, boxed async stream yielding Result<T> items
  • FullExchange: Combined trait for exchanges supporting both REST and WebSocket

§Usage Examples

§Watching Real-Time Ticker Updates

use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;

async fn watch_ticker(exchange: &dyn WsExchange, symbol: &str) {
    // Connect to WebSocket
    exchange.ws_connect().await.unwrap();
     
    // Watch ticker updates
    let mut stream = exchange.watch_ticker(symbol).await.unwrap();
     
    while let Some(ticker) = stream.next().await {
        match ticker {
            Ok(t) => println!("Price: {:?}", t.last),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
}

§Watching Order Book Depth

use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;

async fn watch_orderbook(exchange: &dyn WsExchange, symbol: &str) {
    exchange.ws_connect().await.unwrap();
     
    let mut stream = exchange.watch_order_book(symbol, Some(10)).await.unwrap();
     
    while let Some(result) = stream.next().await {
        if let Ok(orderbook) = result {
            if let (Some(best_bid), Some(best_ask)) = (
                orderbook.bids.first(),
                orderbook.asks.first()
            ) {
                println!("Spread: {} - {}", best_bid.price, best_ask.price);
            }
        }
    }
}

§Monitoring Connection State

use ccxt_core::ws_exchange::WsExchange;
use ccxt_core::ws_client::WsConnectionState;

async fn ensure_connected(exchange: &dyn WsExchange) -> ccxt_core::Result<()> {
    if !exchange.ws_is_connected() {
        println!("Connecting to WebSocket...");
        exchange.ws_connect().await?;
    }
     
    match exchange.ws_state() {
        WsConnectionState::Connected => println!("Connected!"),
        WsConnectionState::Connecting => println!("Still connecting..."),
        WsConnectionState::Disconnected => println!("Disconnected"),
        WsConnectionState::Reconnecting => println!("Reconnecting..."),
        WsConnectionState::Error => println!("Error state"),
    }
     
    Ok(())
}

§Using FullExchange for REST + WebSocket

use ccxt_core::ws_exchange::FullExchange;
use futures::StreamExt;

async fn hybrid_trading(exchange: &dyn FullExchange, symbol: &str) {
    // Use REST API to get initial state
    let ticker = exchange.fetch_ticker(symbol).await.unwrap();
    println!("Initial price: {:?}", ticker.last);
     
    // Switch to WebSocket for real-time updates
    exchange.ws_connect().await.unwrap();
    let mut stream = exchange.watch_ticker(symbol).await.unwrap();
     
    while let Some(Ok(update)) = stream.next().await {
        println!("Live price: {:?}", update.last);
    }
}

§Watching Private Account Updates

use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;

async fn watch_account(exchange: &dyn WsExchange) {
    exchange.ws_connect().await.unwrap();
     
    // Watch balance updates (requires authentication)
    let mut balance_stream = exchange.watch_balance().await.unwrap();
     
    // Watch order updates
    let mut order_stream = exchange.watch_orders(None).await.unwrap();
     
    tokio::select! {
        Some(Ok(balance)) = balance_stream.next() => {
            println!("Balance update: {:?}", balance);
        }
        Some(Ok(order)) = order_stream.next() => {
            println!("Order update: {} - {:?}", order.id, order.status);
        }
    }
}

§MessageStream Type

The MessageStream<T> type alias represents an async stream of results:

pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;

This type is:

  • Pinned: Required for async iteration
  • Boxed: Allows for dynamic dispatch and type erasure
  • Send: Can be sent across thread boundaries
  • Yields Results: Each item is Result<T> to handle stream errors

§Connection States

WebSocket connections can be in one of several states:

  • Disconnected: Not connected to the server
  • Connecting: Connection in progress
  • Connected: Active connection ready for use
  • Reconnecting: Automatic reconnection in progress

§Error Handling

WebSocket operations can fail with:

  • WebSocket: Connection or protocol errors
  • Authentication: Invalid credentials for private streams
  • Network: Network connectivity issues
  • Timeout: Connection or subscription timeout

§Thread Safety

Like the Exchange trait, WsExchange requires Send + Sync bounds, ensuring compatibility with async runtimes and multi-threaded applications.

§See Also

Traits§

FullExchange
Combined trait for exchanges that support both REST and WebSocket
WsExchange
WebSocket exchange trait for real-time data streaming

Type Aliases§

MessageStream
WebSocket message stream type