Source code for o2_sdk.client

"""High-level O2 Exchange client.

Orchestrates wallet management, account lifecycle, session management,
trading, market data, and WebSocket streaming.
"""

from __future__ import annotations

import asyncio
import logging
import time
from collections.abc import AsyncIterator, Sequence

from .api import O2Api
from .config import Network, NetworkConfig, get_config
from .crypto import (
    EvmWallet,
    Signer,
    Wallet,
    generate_evm_wallet,
    generate_wallet,
    load_evm_wallet,
    load_wallet,
    raw_sign,
)
from .encoding import (
    action_to_call,
    build_actions_signing_bytes,
    build_session_signing_bytes,
    build_withdraw_signing_bytes,
)
from .errors import InvalidRequest, O2Error, SessionExpired
from .models import (
    AccountInfo,
    Action,
    ActionsResponse,
    Balance,
    BalanceUpdate,
    Bar,
    BoundedMarketOrder,
    CancelOrderAction,
    CancelOrderRequestAction,
    CreateOrderAction,
    CreateOrderRequestAction,
    DepthSnapshot,
    DepthUpdate,
    FaucetResponse,
    Id,
    LimitOrder,
    Market,
    MarketActionGroup,
    MarketActions,
    MarketsResponse,
    NonceUpdate,
    NumericInput,
    Order,
    OrderSide,
    OrderType,
    OrderUpdate,
    SessionInfo,
    SettleBalanceAction,
    SettleBalanceRequestAction,
    Trade,
    TradeUpdate,
    WithdrawResponse,
)
from .websocket import ConnectionEvent, O2WebSocket

logger = logging.getLogger("o2_sdk.client")


class MarketActionsBuilder:
    """Fluent builder for high-level market-scoped action batches."""

    def __init__(self, market: str | Market):
        self._market = market
        self._actions: list[
            CreateOrderRequestAction | CancelOrderRequestAction | SettleBalanceRequestAction
        ] = []

    def settle_balance(self) -> MarketActionsBuilder:
        self._actions.append(SettleBalanceRequestAction())
        return self

    def cancel_order(self, order_id: str | Id) -> MarketActionsBuilder:
        self._actions.append(CancelOrderRequestAction(order_id=order_id))
        return self

    def create_order(
        self,
        side: OrderSide,
        price: NumericInput,
        quantity: NumericInput,
        order_type: OrderType | LimitOrder | BoundedMarketOrder = OrderType.SPOT,
    ) -> MarketActionsBuilder:
        self._actions.append(
            CreateOrderRequestAction(
                side=side,
                price=price,
                quantity=quantity,
                order_type=order_type,
            )
        )
        return self

    def build(self) -> MarketActionGroup:
        return MarketActionGroup(market=self._market, actions=list(self._actions))


def _validate_depth_precision(precision: int | str) -> None:
    """Raise :class:`InvalidRequest` if *precision* is outside 1--18.

    Shared validator for both REST ``get_depth`` and WebSocket ``stream_depth``.
    Valid range: 1--18.
    """
    try:
        p = int(precision)
    except (TypeError, ValueError) as err:
        raise InvalidRequest(
            message=f"Invalid depth precision {precision!r}. Must be an integer in range 1-18."
        ) from err
    if p < 1 or p > 18:
        raise InvalidRequest(
            message=(
                f"Invalid depth precision {p}. Valid range: 1-18 (powers of 10). "
                "Precision 0 is not supported — use get_depth() via REST for exact prices."
            ),
        )


[docs] class O2Client: """High-level client for the O2 Exchange. Orchestrates wallet management, account lifecycle, session management, trading operations, market data retrieval, and WebSocket streaming. """ def __init__( self, network: Network = Network.TESTNET, custom_config: NetworkConfig | None = None, ): self._config = custom_config or get_config(network) self._network = network self.api = O2Api(self._config) self._ws: O2WebSocket | None = None self._markets_cache: MarketsResponse | None = None self._nonce_cache: dict[str, int] = {} self._session: SessionInfo | None = None
[docs] async def close(self) -> None: """Close all connections.""" await self.api.close() if self._ws: await self._ws.disconnect()
async def __aenter__(self) -> O2Client: return self async def __aexit__(self, *exc: object) -> None: await self.close() @property def session(self) -> SessionInfo | None: """Return the currently active session, if any.""" return self._session
[docs] def set_session(self, session: SessionInfo) -> None: """Set or restore the active session used by trading calls.""" self._session = session
[docs] def clear_session(self) -> None: """Clear the active session.""" self._session = None
# ----------------------------------------------------------------------- # Wallet management # -----------------------------------------------------------------------
[docs] @staticmethod def generate_wallet() -> Wallet: """Generate a new Fuel-native wallet.""" return generate_wallet()
[docs] @staticmethod def generate_evm_wallet() -> EvmWallet: """Generate a new EVM-compatible wallet.""" return generate_evm_wallet()
[docs] @staticmethod def load_wallet(private_key_hex: str) -> Wallet: """Load a Fuel-native wallet from a private key.""" return load_wallet(private_key_hex)
[docs] @staticmethod def load_evm_wallet(private_key_hex: str) -> EvmWallet: """Load an EVM-compatible wallet from a private key.""" return load_evm_wallet(private_key_hex)
# ----------------------------------------------------------------------- # Account lifecycle (idempotent) # -----------------------------------------------------------------------
[docs] async def setup_account(self, wallet: Signer) -> AccountInfo: """Set up a trading account idempotently. 1. Check if account exists (GET /v1/accounts) 2. Create if needed (POST /v1/accounts) 3. Mint via faucet if testnet/devnet (handle cooldown gracefully) 4. Whitelist account 5. Return AccountInfo Safe to call on every bot startup. """ # Step 1: Check for existing account account = await self.api.get_account(owner=wallet.b256_address) if not account.exists: # Step 2: Create account logger.info("Creating trading account for %s", wallet.b256_address) result = await self.api.create_account(wallet.b256_address) account = await self.api.get_account(trade_account_id=result.trade_account_id) trade_account_id = account.trade_account_id if trade_account_id is None: raise O2Error(message="Account must have a trade_account_id") # Step 3: Faucet (non-mainnet only). Skip if balance is already non-zero. if self._config.faucet_url: has_balance = await self._has_any_balance(trade_account_id) if has_balance: logger.debug("Skipping faucet mint for %s (non-zero balance)", trade_account_id) else: minted = await self._retry_mint_to_contract(trade_account_id) if not minted: logger.warning("Faucet mint failed after retries (non-fatal)") # Step 4: Whitelist (required on configured networks) if self._config.whitelist_required: whitelisted = await self._retry_whitelist_account(trade_account_id) if not whitelisted: raise O2Error( message=( "Failed to whitelist account after retries. " "Account setup cannot continue on this network." ) ) return account
async def top_up_from_faucet(self, owner: Signer) -> FaucetResponse: """Mint test assets to the owner's trading account contract. This is useful for explicit testnet/devnet top-ups after account setup. """ account = await self.api.get_account(owner=owner.b256_address) if not account.trade_account_id: raise O2Error(message="Account not found. Call setup_account() first.") return await self.api.mint_to_contract(account.trade_account_id) async def _has_any_balance(self, trade_account_id: str) -> bool: try: markets = await self._get_markets_cached() balance = await self.api.get_balance( asset_id=markets.base_asset_id, contract=trade_account_id, ) return int(balance.trading_account_balance) > 0 except Exception as e: logger.debug("Balance probe failed for %s: %s", trade_account_id, e) # If balance check fails, fall back to attempting faucet mint. return False async def _retry_whitelist_account(self, trade_account_id: str) -> bool: delays = [0, 2, 5] last_error = "" for idx, delay in enumerate(delays): if delay > 0: await asyncio.sleep(delay) try: wl = await self.api.whitelist_account(trade_account_id) if wl.already_whitelisted: logger.info("Account already whitelisted") else: logger.info("Account whitelisted successfully") return True except Exception as e: last_error = str(e) if idx < len(delays) - 1: logger.warning( "whitelist attempt %d failed for %s: %s (retrying)", idx + 1, trade_account_id, last_error, ) logger.error( "whitelist failed after %d attempts for %s: %s", len(delays), trade_account_id, last_error, ) return False async def _retry_mint_to_contract(self, trade_account_id: str) -> bool: attempts = 4 last_error = "" for idx in range(attempts): if idx > 0: lower = last_error.lower() wait_secs = ( 65 if ("cooldown" in lower or "rate limit" in lower or "too many" in lower) else 5 ) await asyncio.sleep(wait_secs) try: resp = await self.api.mint_to_contract(trade_account_id) if resp.success: logger.info("Faucet mint successful") return True last_error = resp.error or "Unknown faucet error" except Exception as e: last_error = str(e) if idx < attempts - 1: logger.warning( "faucet attempt %d failed for %s: %s (retrying)", idx + 1, trade_account_id, last_error, ) logger.error( "faucet failed after %d attempts for %s: %s", attempts, trade_account_id, last_error, ) return False # ----------------------------------------------------------------------- # Session management # -----------------------------------------------------------------------
[docs] async def create_session( self, owner: Signer, markets: list[str | Market], expiry_days: int = 30, ) -> SessionInfo: """Create a trading session. Args: owner: A signer for the owner account (Wallet, EvmWallet, ExternalSigner, ExternalEvmSigner, or any :class:`Signer`) markets: List of market pair strings/IDs or Market objects expiry_days: Session expiry in days (default 30) Returns: SessionInfo with session keys and trading state """ logger.info("Creating session for markets=%s, expiry_days=%d", markets, expiry_days) # Resolve markets markets_resp = await self._get_markets_cached() contract_ids: list[str] = [] for m_name in markets: market = ( m_name if isinstance(m_name, Market) else self._resolve_market(markets_resp, m_name) ) if market.contract_id not in contract_ids: contract_ids.append(market.contract_id) chain_id = markets_resp.chain_id_int # Get account info and nonce account = await self.api.get_account(owner=owner.b256_address) if not account.exists: raise O2Error(message="Account not found. Call setup_account() first.") nonce = account.nonce logger.debug("Session nonce=%d, chain_id=%d", nonce, chain_id) # Generate session wallet session_wallet = generate_wallet() # Build signing bytes contract_id_bytes = [bytes.fromhex(c[2:]) for c in contract_ids] expiry = int(time.time()) + (expiry_days * 24 * 60 * 60) signing_bytes = build_session_signing_bytes( nonce=nonce, chain_id=chain_id, session_address=session_wallet.address_bytes, contract_ids=contract_id_bytes, expiry=expiry, ) # Sign with owner (delegates to Signer.personal_sign which handles # Fuel vs EVM message framing internally) logger.debug( "Signing session with owner.personal_sign, payload=%d bytes", len(signing_bytes) ) signature = owner.personal_sign(signing_bytes) # Submit session request session_request = { "contract_id": account.trade_account_id, "session_id": {"Address": session_wallet.b256_address}, "signature": {"Secp256k1": "0x" + signature.hex()}, "contract_ids": contract_ids, "nonce": str(nonce), "expiry": str(expiry), } resp = await self.api.create_session(owner.b256_address, session_request) # Cache the nonce (session creation increments it) if account.trade_account_id is None: raise O2Error(message="Account must have a trade_account_id") self._nonce_cache[account.trade_account_id] = nonce + 1 logger.info( "Session created: session_id=%s, account=%s", resp.session_id, resp.trade_account_id, ) session = SessionInfo( session_id=resp.session_id, trade_account_id=resp.trade_account_id, contract_ids=resp.contract_ids, session_expiry=resp.session_expiry, session_private_key=session_wallet.private_key, owner_address=owner.b256_address, nonce=nonce + 1, ) self._session = session return session
# ----------------------------------------------------------------------- # Trading # -----------------------------------------------------------------------
[docs] def actions_for(self, market: str | Market) -> MarketActionsBuilder: """Create a fluent builder for high-level market actions.""" return MarketActionsBuilder(market)
[docs] async def create_order( self, market: str | Market, side: OrderSide, price: NumericInput, quantity: NumericInput, order_type: OrderType | LimitOrder | BoundedMarketOrder = OrderType.SPOT, settle_first: bool = True, collect_orders: bool = True, session: SessionInfo | None = None, ) -> ActionsResponse: """Place an order with automatic encoding, signing, and nonce management. Args: market: Market pair, market_id/contract_id, or Market model side: OrderSide.BUY or OrderSide.SELL price: Human-readable numeric input (auto-scaled) quantity: Human-readable numeric input (auto-scaled) order_type: OrderType.SPOT, OrderType.MARKET, OrderType.FILL_OR_KILL, OrderType.POST_ONLY, LimitOrder(...), or BoundedMarketOrder(...) settle_first: If True, prepend SettleBalance action collect_orders: If True, return created order details session: Optional explicit session override. Uses active client session if omitted. Returns: ActionsResponse with tx_id and optional orders """ if isinstance(order_type, OrderType): ot_label = order_type.value elif isinstance(order_type, LimitOrder): ot_label = "Limit" else: ot_label = "BoundedMarket" logger.info( "Creating %s %s order: market=%s price=%s qty=%s", side.value, ot_label, market, price, quantity, ) session = self._require_session(session) market_obj = await self._resolve_market_like_async(market) # Scale price and quantity scaled_price = market_obj.scale_price(price) scaled_quantity = market_obj.scale_quantity(quantity) # Adjust quantity for FractionalPrice constraint scaled_quantity = market_obj.adjust_quantity(scaled_price, scaled_quantity) logger.debug( "Scaled order: price=%d qty=%d (market_id=%s)", scaled_price, scaled_quantity, market_obj.market_id, ) # Validate market_obj.validate_order(scaled_price, scaled_quantity) # Build the order type for the action action_ot: OrderType | LimitOrder | BoundedMarketOrder if isinstance(order_type, LimitOrder): action_ot = LimitOrder( price=market_obj.scale_price(order_type.price), timestamp=order_type.timestamp if order_type.timestamp is not None else int(time.time()), ) elif isinstance(order_type, BoundedMarketOrder): action_ot = BoundedMarketOrder( max_price=market_obj.scale_price(order_type.max_price), min_price=market_obj.scale_price(order_type.min_price), ) else: action_ot = order_type # Build actions typed_actions: list[Action] = [] if settle_first: typed_actions.append(SettleBalanceAction(to=session.trade_account_id)) typed_actions.append( CreateOrderAction( side=side, price=str(scaled_price), quantity=str(scaled_quantity), order_type=action_ot, ) ) return await self.batch_actions( actions=[MarketActions(market_id=market_obj.market_id, actions=typed_actions)], collect_orders=collect_orders, session=session, )
[docs] async def cancel_order( self, order_id: str | Id, market: str | Market | None = None, market_id: str | None = None, session: SessionInfo | None = None, ) -> ActionsResponse: """Cancel an order.""" session = self._require_session(session) logger.info("Cancelling order %s", order_id) if market_id is None: if market is None: raise ValueError("Either market or market_id must be provided") market_obj = await self._resolve_market_like_async(market) market_id = market_obj.market_id actions = [ MarketActions( market_id=market_id, actions=[CancelOrderAction(order_id=Id(str(order_id)))], ) ] return await self.batch_actions(actions=actions, session=session)
[docs] async def cancel_all_orders( self, market: str | Market, session: SessionInfo | None = None ) -> list[ActionsResponse]: """Cancel all open orders for a market. Fetches up to 200 open orders and cancels them in batches of 5. Returns a list of ActionsResponse (one per batch). """ session = self._require_session(session) logger.info("Cancelling all open orders for market=%s", market) market_obj = await self._resolve_market_like_async(market) orders_resp = await self.api.get_orders( market_id=market_obj.market_id, contract=session.trade_account_id, direction="desc", count=200, is_open=True, ) if not orders_resp.orders: logger.info("No open orders to cancel") return [] results: list[ActionsResponse] = [] # Cancel in chunks of 5 for i in range(0, len(orders_resp.orders), 5): chunk = orders_resp.orders[i : i + 5] cancel_actions: list[Action] = [CancelOrderAction(order_id=o.order_id) for o in chunk] actions = [MarketActions(market_id=market_obj.market_id, actions=cancel_actions)] resp = await self.batch_actions(actions=actions, session=session) results.append(resp) return results
[docs] async def settle_balance( self, market: str | Market, session: SessionInfo | None = None ) -> ActionsResponse: """Settle balance for a market.""" session = self._require_session(session) market_obj = await self._resolve_market_like_async(market) actions = [ MarketActions( market_id=market_obj.market_id, actions=[SettleBalanceAction(to=session.trade_account_id)], ) ] return await self.batch_actions(actions=actions, session=session)
[docs] async def batch_actions( self, actions: Sequence[MarketActions | MarketActionGroup], collect_orders: bool = False, session: SessionInfo | None = None, ) -> ActionsResponse: """Submit a batch of actions with automatic signing and nonce management. Args: actions: List of MarketActions (market-grouped typed actions) collect_orders: If True, return created order details session: Optional explicit session override. Uses active client session if omitted. """ session = self._require_session(session) # Check session expiry before submitting on-chain if session.session_expiry: try: expiry_ts = int(session.session_expiry) if time.time() >= expiry_ts: raise SessionExpired( message="Session has expired. Create a new session before submitting actions." ) except ValueError: pass # Non-numeric expiry format, skip check markets_resp = await self._get_markets_cached() # Convert typed/high-level actions to wire dicts once actions_dicts = await self._normalize_market_actions(session, actions) # Get current nonce nonce = await self._get_nonce(session.trade_account_id) logger.debug("Submitting actions with nonce=%d, actions=%s", nonce, actions_dicts) # Convert actions to calls calls: list[dict] = [] for market_group in actions_dicts: m_id = market_group["market_id"] market_info = self._get_market_info_by_id(markets_resp, m_id) for action in market_group["actions"]: call = action_to_call(action, market_info) calls.append(call) # Build signing bytes and sign with session key signing_bytes = build_actions_signing_bytes(nonce, calls) if session.session_private_key is None: raise O2Error(message="Session must have a private key") logger.debug( "Signing %d actions (%d bytes) with session key", len(calls), len(signing_bytes) ) signature = raw_sign(session.session_private_key, signing_bytes) # Submit request = { "actions": actions_dicts, "signature": {"Secp256k1": "0x" + signature.hex()}, "nonce": str(nonce), "trade_account_id": session.trade_account_id, "session_id": session.session_id.to_dict(), "collect_orders": collect_orders, } if session.owner_address is None: raise O2Error(message="Session must have an owner address") try: result = await self.api.submit_actions(session.owner_address, request) # Increment nonce on success self._nonce_cache[session.trade_account_id] = nonce + 1 session.nonce = nonce + 1 logger.info("Actions submitted: tx_id=%s, nonce=%d->%d", result.tx_id, nonce, nonce + 1) return result except O2Error as e: logger.warning("Actions failed (nonce=%d): %s", nonce, e) # Nonce increments even on revert, so re-fetch await self.refresh_nonce(session) raise
async def _normalize_market_actions( self, session: SessionInfo, actions: Sequence[MarketActions | MarketActionGroup], ) -> list[dict]: normalized: list[dict] = [] for group in actions: if isinstance(group, MarketActions): normalized.append(group.to_dict()) continue market = await self._resolve_market_like_async(group.market) resolved_actions: list[Action] = [] for action in group.actions: try: if isinstance(action, CreateOrderRequestAction): scaled_price = market.scale_price(action.price) scaled_quantity = market.scale_quantity(action.quantity) scaled_quantity = market.adjust_quantity(scaled_price, scaled_quantity) market.validate_order(scaled_price, scaled_quantity) order_type = action.order_type normalized_ot: OrderType | LimitOrder | BoundedMarketOrder if isinstance(order_type, LimitOrder): normalized_ot = LimitOrder( price=market.scale_price(order_type.price), timestamp=order_type.timestamp, ) elif isinstance(order_type, BoundedMarketOrder): normalized_ot = BoundedMarketOrder( max_price=market.scale_price(order_type.max_price), min_price=market.scale_price(order_type.min_price), ) else: normalized_ot = order_type resolved_actions.append( CreateOrderAction( side=action.side, price=str(scaled_price), quantity=str(scaled_quantity), order_type=normalized_ot, ) ) elif isinstance(action, CancelOrderRequestAction): resolved_actions.append( CancelOrderAction(order_id=Id(str(action.order_id))) ) elif isinstance(action, SettleBalanceRequestAction): resolved_actions.append(SettleBalanceAction(to=session.trade_account_id)) else: raise O2Error(message=f"Unsupported action type: {type(action).__name__}") except ValueError as e: raise O2Error(message=str(e)) from e normalized.append( MarketActions(market_id=market.market_id, actions=resolved_actions).to_dict() ) return normalized def _require_session(self, session: SessionInfo | None = None) -> SessionInfo: if session is not None: return session if self._session is None: raise O2Error( message=( "No active session. Call create_session() first, or pass " "session=... explicitly." ) ) return self._session # ----------------------------------------------------------------------- # Market data # -----------------------------------------------------------------------
[docs] async def get_markets(self) -> list[Market]: """Get all available markets.""" resp = await self._get_markets_cached() return resp.markets
[docs] async def get_market(self, symbol_pair: str) -> Market: """Get a specific market by pair symbol (e.g., "FUEL/USDC").""" resp = await self._get_markets_cached() return self._resolve_market(resp, symbol_pair)
[docs] async def get_depth( self, market: str | Market, precision: int = 1, limit: int | None = None, ) -> DepthSnapshot: """Get order book depth for a market. Args: market: Market pair string (e.g. ``"fFUEL/fUSDC"``) or :class:`Market` object. precision: Price grouping level, from ``1`` (most precise) to ``18`` (most grouped). Default ``1``. At level 1, prices are at or near their exact values. Higher levels round prices into larger buckets — useful for a visual depth chart but too coarse for trading. Same scale as :meth:`stream_depth`. limit: Maximum number of price levels per side (bids/asks). ``None`` (default) returns the full order book. Raises: InvalidRequest: If *precision* is outside the valid range 1--18. """ _validate_depth_precision(precision) precision = 10 ** int(precision) market_obj = await self._resolve_market_like_async(market) return await self.api.get_depth(market_obj.market_id, precision, limit=limit)
[docs] async def get_trades( self, market: str | Market, count: int = 50, account: AccountInfo | str | Id | None = None, start_timestamp: int | None = None, start_trade_id: str | Id | None = None, ) -> list[Trade]: """Get recent trades for a market. Args: market: Market pair string or Market object. count: Number of trades to return (max 50). account: Optional AccountInfo, trade_account_id string, or Id to filter trades for a specific account. Strings are validated as hex. start_timestamp: Cursor for pagination — timestamp of the last trade from the previous page. Must be paired with ``start_trade_id``. start_trade_id: Cursor for pagination — trade ID of the last trade from the previous page. Must be paired with ``start_timestamp``. Strings are validated as hex. """ market_obj = await self._resolve_market_like_async(market) validated_tid = ( Id(start_trade_id) if isinstance(start_trade_id, str) and not isinstance(start_trade_id, Id) else start_trade_id ) if account is not None: if isinstance(account, str) and not isinstance(account, Id): contract = Id(account) elif isinstance(account, Id): contract = account else: if account.trade_account_id is None: msg = "AccountInfo has no trade_account_id" raise ValueError(msg) contract = account.trade_account_id return await self.api.get_trades_by_account( market_obj.market_id, contract=contract, count=count, start_timestamp=start_timestamp, start_trade_id=validated_tid, ) return await self.api.get_trades( market_obj.market_id, count=count, start_timestamp=start_timestamp, start_trade_id=validated_tid, )
[docs] async def get_bars( self, market: str | Market, resolution: str, from_ts: int, to_ts: int, ) -> list[Bar]: """Get OHLCV bars for a market. Args: market: Market pair (e.g. ``"ETH/USDC"``) or :class:`Market` object. resolution: Bar resolution (e.g. ``"1m"``, ``"1h"``, ``"1d"``). from_ts: Start timestamp in **milliseconds** (not seconds). to_ts: End timestamp in **milliseconds** (not seconds). """ market_obj = await self._resolve_market_like_async(market) return await self.api.get_bars(market_obj.market_id, from_ts, to_ts, resolution)
[docs] async def get_ticker(self, market: str | Market) -> dict: """Get real-time ticker for a market.""" market_obj = await self._resolve_market_like_async(market) resp = await self.api.get_market_ticker(market_obj.market_id) return resp.data
# ----------------------------------------------------------------------- # Account data # -----------------------------------------------------------------------
[docs] async def get_balances(self, account: AccountInfo | str | Id) -> dict[str, Balance]: """Get balances keyed by asset symbol. Args: account: AccountInfo, trade_account_id string, or Id. Strings are validated as hex. """ if isinstance(account, str) and not isinstance(account, Id): trade_account_id = Id(account) elif isinstance(account, Id): trade_account_id = account else: if account.trade_account_id is None: msg = "AccountInfo has no trade_account_id" raise ValueError(msg) trade_account_id = account.trade_account_id markets_resp = await self._get_markets_cached() result: dict[str, Balance] = {} seen_assets: set[str] = set() for m in markets_resp.markets: for asset_info in (m.base, m.quote): if asset_info.asset in seen_assets: continue seen_assets.add(asset_info.asset) try: balance = await self.api.get_balance( asset_id=asset_info.asset, contract=trade_account_id, ) result[asset_info.symbol] = balance except Exception: pass return result
[docs] async def get_orders( self, market: str | Market, account: AccountInfo | str | Id, is_open: bool | None = None, count: int = 20, start_timestamp: int | None = None, start_order_id: str | Id | None = None, ) -> list[Order]: """Get orders for an account on a market. Args: market: Market pair string or Market object. account: AccountInfo, trade_account_id string, or Id. Strings are validated as hex. is_open: Filter by open/closed status. ``None`` returns all. count: Number of orders to return (max 200). start_timestamp: Cursor for pagination — timestamp of the last order from the previous page. Must be paired with ``start_order_id``. start_order_id: Cursor for pagination — order ID of the last order from the previous page. Must be paired with ``start_timestamp``. Strings are validated as hex. """ if isinstance(account, str) and not isinstance(account, Id): trade_account_id = Id(account) elif isinstance(account, Id): trade_account_id = account else: if account.trade_account_id is None: msg = "AccountInfo has no trade_account_id" raise ValueError(msg) trade_account_id = account.trade_account_id validated_oid = ( Id(start_order_id) if isinstance(start_order_id, str) and not isinstance(start_order_id, Id) else start_order_id ) market_obj = await self._resolve_market_like_async(market) resp = await self.api.get_orders( market_id=market_obj.market_id, contract=trade_account_id, direction="desc", count=count, is_open=is_open, start_timestamp=start_timestamp, start_order_id=validated_oid, ) return resp.orders
[docs] async def get_order(self, market: str | Market, order_id: str | Id) -> Order: """Get a specific order. Strings are validated as hex.""" if isinstance(order_id, str) and not isinstance(order_id, Id): order_id = Id(order_id) market_obj = await self._resolve_market_like_async(market) return await self.api.get_order(market_obj.market_id, order_id)
# ----------------------------------------------------------------------- # WebSocket streaming # ----------------------------------------------------------------------- async def _ensure_ws(self) -> O2WebSocket: if self._ws is None: self._ws = O2WebSocket(self._config) await self._ws.connect() return self._ws async def stream_lifecycle(self) -> AsyncIterator[ConnectionEvent]: """Stream WebSocket connection lifecycle events. Yields :class:`ConnectionEvent` objects whenever the connection state changes (connected, disconnected, reconnecting, reconnected, closed). Use this to detect reconnects and re-sync critical state from the REST API — messages received during the disconnect window are lost. Example:: async for event in client.stream_lifecycle(): if event.state == ConnectionState.RECONNECTED: # Re-sync balances, open orders, etc. balances = await client.get_balances(account) elif event.state == ConnectionState.CLOSED: break """ ws = await self._ensure_ws() async for event in ws.stream_lifecycle(): yield event
[docs] async def stream_depth( self, market: str | Market, precision: int = 1 ) -> AsyncIterator[DepthUpdate]: """Stream order book depth updates. Args: market: Market pair (e.g. ``"ETH/USDC"``) or :class:`Market` object. precision: Price grouping level, from ``1`` (most precise) to ``18`` (most grouped). Default ``1``. At level 1, prices are at or near their exact values. Higher levels round prices into larger buckets. Same scale as :meth:`get_depth`. Raises: InvalidRequest: If *precision* is outside the valid range 1--18. """ _validate_depth_precision(precision) wire_precision = str(10 ** int(precision)) market_obj = await self._resolve_market_like_async(market) ws = await self._ensure_ws() async for update in ws.stream_depth(market_obj.market_id, wire_precision): yield update
[docs] async def stream_orders(self, account: AccountInfo | str) -> AsyncIterator[OrderUpdate]: """Stream order updates for an account.""" trade_account_id = account if isinstance(account, str) else account.trade_account_id ws = await self._ensure_ws() identities = [{"ContractId": trade_account_id}] async for update in ws.stream_orders(identities): yield update
[docs] async def stream_trades(self, market: str | Market) -> AsyncIterator[TradeUpdate]: """Stream trade updates for a market.""" market_obj = await self._resolve_market_like_async(market) ws = await self._ensure_ws() async for update in ws.stream_trades(market_obj.market_id): yield update
[docs] async def stream_balances(self, account: AccountInfo | str) -> AsyncIterator[BalanceUpdate]: """Stream balance updates for an account.""" trade_account_id = account if isinstance(account, str) else account.trade_account_id ws = await self._ensure_ws() identities = [{"ContractId": trade_account_id}] async for update in ws.stream_balances(identities): yield update
[docs] async def stream_nonce(self, account: AccountInfo | str) -> AsyncIterator[NonceUpdate]: """Stream nonce updates for an account.""" trade_account_id = account if isinstance(account, str) else account.trade_account_id ws = await self._ensure_ws() identities = [{"ContractId": trade_account_id}] async for update in ws.stream_nonce(identities): yield update
# ----------------------------------------------------------------------- # Withdrawals # -----------------------------------------------------------------------
[docs] async def withdraw( self, owner: Signer, asset: str, amount: float, to: str | None = None, ) -> WithdrawResponse: """Withdraw funds from the trading account. Args: owner: A signer for the owner account (Wallet, EvmWallet, ExternalSigner, ExternalEvmSigner, or any :class:`Signer`) asset: Asset symbol (e.g., "USDC") or asset_id amount: Human-readable amount to withdraw to: Destination address (defaults to owner address) """ logger.info("Withdrawing %s %s", amount, asset) markets_resp = await self._get_markets_cached() account = await self.api.get_account(owner=owner.b256_address) if not account.exists: raise O2Error(message="Account not found") nonce = account.nonce destination = to or owner.b256_address # Resolve asset asset_id, decimals = self._resolve_asset(markets_resp, asset) scaled_amount = int(amount * (10**decimals)) logger.debug( "Withdraw: asset_id=%s, scaled_amount=%d, nonce=%d", asset_id, scaled_amount, nonce ) # Build withdraw signing bytes using shared encoding function signing_bytes = build_withdraw_signing_bytes( nonce=nonce, chain_id=markets_resp.chain_id_int, to_discriminant=0, # Address discriminant to_address=bytes.fromhex(destination[2:]), asset_id=bytes.fromhex(asset_id[2:]), amount=scaled_amount, ) logger.debug("Signing withdrawal, payload=%d bytes", len(signing_bytes)) signature = owner.personal_sign(bytes(signing_bytes)) withdraw_request = { "trade_account_id": account.trade_account_id, "signature": {"Secp256k1": "0x" + signature.hex()}, "nonce": str(nonce), "to": {"Address": destination}, "asset_id": asset_id, "amount": str(scaled_amount), } return await self.api.withdraw(owner.b256_address, withdraw_request)
# ----------------------------------------------------------------------- # Nonce management # -----------------------------------------------------------------------
[docs] async def get_nonce(self, trade_account_id: str) -> int: """Get the current nonce for a trading account.""" return await self._get_nonce(trade_account_id)
[docs] async def refresh_nonce(self, session: SessionInfo) -> int: """Re-fetch nonce from the API (manual resync).""" old_nonce = self._nonce_cache.get(session.trade_account_id) account = await self.api.get_account(trade_account_id=session.trade_account_id) nonce = account.nonce self._nonce_cache[session.trade_account_id] = nonce session.nonce = nonce logger.debug( "Nonce refreshed: %s -> %s (account=%s)", old_nonce, nonce, session.trade_account_id ) return nonce
async def _get_nonce(self, trade_account_id: str) -> int: if trade_account_id in self._nonce_cache: return self._nonce_cache[trade_account_id] account = await self.api.get_account(trade_account_id=trade_account_id) nonce = account.nonce self._nonce_cache[trade_account_id] = nonce return nonce # ----------------------------------------------------------------------- # Internal helpers # ----------------------------------------------------------------------- async def _get_markets_cached(self) -> MarketsResponse: if self._markets_cache is None: self._markets_cache = await self.api.get_markets() return self._markets_cache def _resolve_market(self, markets_resp: MarketsResponse, name_or_id: str) -> Market: """Resolve a market by pair name or hex ID.""" for m in markets_resp.markets: if m.market_id == name_or_id or m.contract_id == name_or_id: return m if m.pair == name_or_id: return m raise O2Error(message=f"Market not found: {name_or_id}") async def _resolve_market_async(self, name_or_id: str) -> Market: markets_resp = await self._get_markets_cached() return self._resolve_market(markets_resp, name_or_id) async def _resolve_market_like_async(self, market: str | Market) -> Market: if isinstance(market, Market): return market return await self._resolve_market_async(market) def _get_market_info_by_id(self, markets_resp: MarketsResponse, market_id: str) -> dict: """Get market info dict needed by action_to_call.""" for m in markets_resp.markets: if m.market_id == market_id: return { "contract_id": m.contract_id, "market_id": m.market_id, "base": {"asset": m.base.asset, "decimals": m.base.decimals}, "quote": {"asset": m.quote.asset, "decimals": m.quote.decimals}, "accounts_registry_id": markets_resp.accounts_registry_id, } raise O2Error(message=f"Market ID not found: {market_id}") def _resolve_asset(self, markets_resp: MarketsResponse, symbol_or_id: str) -> tuple[str, int]: """Resolve an asset symbol or ID to (asset_id, decimals).""" for m in markets_resp.markets: if m.base.symbol == symbol_or_id or m.base.asset == symbol_or_id: return m.base.asset, m.base.decimals if m.quote.symbol == symbol_or_id or m.quote.asset == symbol_or_id: return m.quote.asset, m.quote.decimals raise O2Error(message=f"Asset not found: {symbol_or_id}")