Expand description
§WebSocket Exchange Trait
This module defines the WsExchange trait for real-time data streaming
via WebSocket connections, enabling live market data and account updates.
§Overview
The WsExchange trait complements the REST-based Exchange
trait by providing real-time streaming capabilities:
- Connection Management: Connect, disconnect, and monitor WebSocket state
- Public Data Streams: Real-time ticker, order book, trades, and OHLCV updates
- Private Data Streams: Live balance, order, and trade updates (requires authentication)
- Subscription Management: Subscribe/unsubscribe to specific channels
§Architecture
┌─────────────────────────────────────────────────────────────┐
│ WsExchange Trait │
├─────────────────────────────────────────────────────────────┤
│ Connection Management │
│ ├── ws_connect(), ws_disconnect() │
│ └── ws_is_connected(), ws_state() │
├─────────────────────────────────────────────────────────────┤
│ Public Data Streams │
│ ├── watch_ticker(), watch_tickers() │
│ ├── watch_order_book(), watch_trades() │
│ └── watch_ohlcv() │
├─────────────────────────────────────────────────────────────┤
│ Private Data Streams (Authenticated) │
│ ├── watch_balance() │
│ ├── watch_orders() │
│ └── watch_my_trades() │
├─────────────────────────────────────────────────────────────┤
│ Subscription Management │
│ └── subscribe(), unsubscribe(), subscriptions() │
└─────────────────────────────────────────────────────────────┘§Key Types
WsExchange: The WebSocket streaming traitMessageStream<T>: A pinned, boxed async stream yieldingResult<T>itemsFullExchange: Combined trait for exchanges supporting both REST and WebSocket
§Usage Examples
§Watching Real-Time Ticker Updates
use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;
async fn watch_ticker(exchange: &dyn WsExchange, symbol: &str) {
// Connect to WebSocket
exchange.ws_connect().await.unwrap();
// Watch ticker updates
let mut stream = exchange.watch_ticker(symbol).await.unwrap();
while let Some(ticker) = stream.next().await {
match ticker {
Ok(t) => println!("Price: {:?}", t.last),
Err(e) => eprintln!("Error: {}", e),
}
}
}§Watching Order Book Depth
use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;
async fn watch_orderbook(exchange: &dyn WsExchange, symbol: &str) {
exchange.ws_connect().await.unwrap();
let mut stream = exchange.watch_order_book(symbol, Some(10)).await.unwrap();
while let Some(result) = stream.next().await {
if let Ok(orderbook) = result {
if let (Some(best_bid), Some(best_ask)) = (
orderbook.bids.first(),
orderbook.asks.first()
) {
println!("Spread: {} - {}", best_bid.price, best_ask.price);
}
}
}
}§Monitoring Connection State
use ccxt_core::ws_exchange::WsExchange;
use ccxt_core::ws_client::WsConnectionState;
async fn ensure_connected(exchange: &dyn WsExchange) -> ccxt_core::Result<()> {
if !exchange.ws_is_connected() {
println!("Connecting to WebSocket...");
exchange.ws_connect().await?;
}
match exchange.ws_state() {
WsConnectionState::Connected => println!("Connected!"),
WsConnectionState::Connecting => println!("Still connecting..."),
WsConnectionState::Disconnected => println!("Disconnected"),
WsConnectionState::Reconnecting => println!("Reconnecting..."),
WsConnectionState::Error => println!("Error state"),
}
Ok(())
}§Using FullExchange for REST + WebSocket
use ccxt_core::ws_exchange::FullExchange;
use futures::StreamExt;
async fn hybrid_trading(exchange: &dyn FullExchange, symbol: &str) {
// Use REST API to get initial state
let ticker = exchange.fetch_ticker(symbol).await.unwrap();
println!("Initial price: {:?}", ticker.last);
// Switch to WebSocket for real-time updates
exchange.ws_connect().await.unwrap();
let mut stream = exchange.watch_ticker(symbol).await.unwrap();
while let Some(Ok(update)) = stream.next().await {
println!("Live price: {:?}", update.last);
}
}§Watching Private Account Updates
use ccxt_core::ws_exchange::WsExchange;
use futures::StreamExt;
async fn watch_account(exchange: &dyn WsExchange) {
exchange.ws_connect().await.unwrap();
// Watch balance updates (requires authentication)
let mut balance_stream = exchange.watch_balance().await.unwrap();
// Watch order updates
let mut order_stream = exchange.watch_orders(None).await.unwrap();
tokio::select! {
Some(Ok(balance)) = balance_stream.next() => {
println!("Balance update: {:?}", balance);
}
Some(Ok(order)) = order_stream.next() => {
println!("Order update: {} - {:?}", order.id, order.status);
}
}
}§MessageStream Type
The MessageStream<T> type alias represents an async stream of results:
ⓘ
pub type MessageStream<T> = Pin<Box<dyn Stream<Item = Result<T>> + Send>>;This type is:
- Pinned: Required for async iteration
- Boxed: Allows for dynamic dispatch and type erasure
- Send: Can be sent across thread boundaries
- Yields Results: Each item is
Result<T>to handle stream errors
§Connection States
WebSocket connections can be in one of several states:
Disconnected: Not connected to the serverConnecting: Connection in progressConnected: Active connection ready for useReconnecting: Automatic reconnection in progress
§Error Handling
WebSocket operations can fail with:
WebSocket: Connection or protocol errorsAuthentication: Invalid credentials for private streamsNetwork: Network connectivity issuesTimeout: Connection or subscription timeout
§Thread Safety
Like the Exchange trait, WsExchange requires Send + Sync bounds,
ensuring compatibility with async runtimes and multi-threaded applications.
§See Also
crate::exchange::Exchange: REST API traitcrate::ws_client::WsClient: Low-level WebSocket clientcrate::ws_client::WsConnectionState: Connection state enum
Traits§
- Full
Exchange - Combined trait for exchanges that support both REST and WebSocket
- WsExchange
- WebSocket exchange trait for real-time data streaming
Type Aliases§
- Message
Stream - WebSocket message stream type