Expand description
§WebSocket Streams Guide
This guide covers real-time data streaming using the O2 Rust SDK.
The SDK provides WebSocket streaming through TypedStream<T>, which
implements tokio_stream::Stream and can be consumed with
while let Some(item) = stream.next().await.
See also:
O2Clientstreaming methods,O2WebSocketfor standalone usage.Current backend behavior:
unsubscribe_ordersis connection-global (not identity-filtered), so unsubscribing order updates removes all order subscriptions on that socket.
§Overview
All streaming methods:
- Return a
TypedStream<T>that yieldsResult<T, O2Error>items. - Share a single WebSocket connection managed by the
O2Client. - Support automatic reconnection with exponential backoff.
- Re-subscribe to channels on reconnect.
Stream items carry data and terminal errors:
Ok(update)— a normal data messageErr(O2Error::WebSocketDisconnected(_))— permanent connection loss
Lifecycle/reconnect events are delivered separately via
O2Client::subscribe_ws_lifecycle
or O2WebSocket::subscribe_lifecycle.
§Order Book Depth
Stream real-time order book updates:
use tokio_stream::StreamExt;
let market = client.get_market("fFUEL/fUSDC").await?;
let mut stream = client.stream_depth(&market.market_id, 1).await?;
while let Some(Ok(update)) = stream.next().await {
// First message is a full snapshot (action = "subscribe_depth")
// Subsequent messages are incremental updates (action = "subscribe_depth_update")
let is_snapshot = update.action == "subscribe_depth";
if is_snapshot {
if let Some(ref view) = update.view {
println!("Snapshot: {} bids, {} asks", view.bids.len(), view.asks.len());
}
} else if let Some(ref changes) = update.changes {
if let Some(bid) = changes.bids.first() {
println!("Best bid: {}", bid.price);
}
if let Some(ask) = changes.asks.first() {
println!("Best ask: {}", ask.price);
}
}
}The precision parameter controls price aggregation, matching the
REST O2Client::get_depth endpoint.
§Order Updates
Monitor your orders in real time:
use o2_sdk::Identity;
use tokio_stream::StreamExt;
let identity = Identity::ContractId(session.trade_account_id.to_string());
let mut stream = client.stream_orders(&[identity]).await?;
while let Some(Ok(update)) = stream.next().await {
for order in &update.orders {
let status = if order.close { "CLOSED" } else { "OPEN" };
let filled = format!(
"{}/{}",
order.quantity_fill.unwrap_or(0),
order.quantity,
);
println!(
"[{}] {:?} {}: {}",
status,
order.side,
order.order_id,
filled,
);
if order.cancel {
println!(" Canceled");
}
}
}§Trade Feed
Stream all trades for a market:
use tokio_stream::StreamExt;
let market = client.get_market("fFUEL/fUSDC").await?;
let mut stream = client.stream_trades(&market.market_id).await?;
while let Some(Ok(update)) = stream.next().await {
for trade in &update.trades {
println!(
"{:?} {} @ {}",
trade.side,
trade.quantity,
trade.price,
);
}
}§Balance Updates
Monitor balance changes in real time:
use o2_sdk::Identity;
use tokio_stream::StreamExt;
let identity = Identity::ContractId(session.trade_account_id.to_string());
let mut stream = client.stream_balances(&[identity]).await?;
while let Some(Ok(update)) = stream.next().await {
for entry in &update.balance {
println!(
"Balance: {} (locked: {}, unlocked: {})",
entry.trading_account_balance,
entry.total_locked,
entry.total_unlocked,
);
}
}§Nonce Monitoring
Useful for detecting nonce changes from other sessions or external transactions:
use o2_sdk::Identity;
use tokio_stream::StreamExt;
let identity = Identity::ContractId(session.trade_account_id.to_string());
let mut stream = client.stream_nonce(&[identity]).await?;
while let Some(Ok(update)) = stream.next().await {
println!(
"Nonce changed: {} (account={})",
update.nonce,
update.contract_id,
);
}§Running Multiple Streams
Use tokio::join! or tokio::spawn to run multiple streams concurrently:
use o2_sdk::Identity;
use tokio_stream::StreamExt;
let market = client.get_market("fFUEL/fUSDC").await?;
let identity = Identity::ContractId(session.trade_account_id.to_string());
let mut depth_stream = client.stream_depth(&market.market_id, 1).await?;
let mut order_stream = client.stream_orders(&[identity.clone()]).await?;
let mut trade_stream = client.stream_trades(&market.market_id).await?;
let depth_task = tokio::spawn(async move {
while let Some(Ok(update)) = depth_stream.next().await {
if let Some(ref changes) = update.changes {
if let Some(bid) = changes.bids.first() {
println!("Best bid: {}", bid.price);
}
}
}
});
let order_task = tokio::spawn(async move {
while let Some(Ok(update)) = order_stream.next().await {
for order in &update.orders {
let status = if order.close { "closed" } else { "open" };
println!("Order {}: {}", order.order_id, status);
}
}
});
let trade_task = tokio::spawn(async move {
while let Some(Ok(update)) = trade_stream.next().await {
for trade in &update.trades {
println!(
"Trade: {} @ {}",
trade.quantity,
trade.price,
);
}
}
});
// Run all streams concurrently
tokio::join!(depth_task, order_task, trade_task);Note: All streams share a single WebSocket connection, managed internally by the
O2WebSocketclient withinO2Client.
§Handling Reconnections
For non-snapshot streams, monitor lifecycle events and refresh state on reconnect:
use o2_sdk::WsLifecycleEvent;
use tokio_stream::StreamExt;
let market = client.get_market("fFUEL/fUSDC").await?;
let mut stream = client.stream_depth(&market.market_id, 1).await?;
let mut lifecycle = client.subscribe_ws_lifecycle().await?;
loop {
tokio::select! {
Some(result) = stream.next() => {
match result {
Ok(update) => {
// Process the depth update
}
Err(o2_sdk::O2Error::WebSocketDisconnected(msg)) => {
println!("Permanently disconnected: {}", msg);
break;
}
Err(e) => {
println!("Stream error: {}", e);
}
}
}
Ok(evt) = lifecycle.recv() => {
if let WsLifecycleEvent::Reconnected { .. } = evt {
// Connection was re-established and subscriptions restored.
// Re-fetch state if your strategy requires a fresh snapshot.
println!("Reconnected — refreshing local state");
}
}
}
}§Configuration
Customize reconnection behavior via WsConfig:
use o2_sdk::{O2WebSocket, WsConfig};
use std::time::Duration;
let config = WsConfig {
base_delay: Duration::from_secs(1), // Base reconnect delay
max_delay: Duration::from_secs(60), // Maximum reconnect delay
max_attempts: 10, // Max reconnect attempts (0 = infinite)
ping_interval: Duration::from_secs(30), // Heartbeat interval
pong_timeout: Duration::from_secs(60), // Pong timeout before reconnect
};
let ws = O2WebSocket::connect_with_config("wss://ws.o2.app", config).await?;Reconnection uses exponential backoff to avoid thundering herd effects.
§Graceful Shutdown
Always disconnect the WebSocket when done to cleanly release resources:
client.disconnect_ws().await?;For standalone O2WebSocket instances:
ws.disconnect().await?;