Source code for o2_sdk.api

"""REST API client for the O2 Exchange.

Typed wrappers for every endpoint from the O2 API. All methods return
typed response objects and raise O2Error on failures.
"""

from __future__ import annotations

import asyncio
import logging
import time
from typing import Any

import aiohttp

from .config import NetworkConfig
from .errors import O2Error, RateLimitExceeded, raise_for_error
from .models import (
    AccountCreateResponse,
    AccountInfo,
    ActionsResponse,
    AggregatedAsset,
    Balance,
    Bar,
    DepthSnapshot,
    FaucetResponse,
    MarketsResponse,
    MarketSummary,
    MarketTicker,
    Order,
    OrdersResponse,
    ReferralInfo,
    SessionResponse,
    Trade,
    WhitelistResponse,
    WithdrawResponse,
)

logger = logging.getLogger("o2_sdk.api")


[docs] class O2Api: """Low-level REST API client for the O2 Exchange.""" def __init__(self, config: NetworkConfig, session: aiohttp.ClientSession | None = None): self._config = config self._session = session self._owns_session = session is None async def _ensure_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() self._owns_session = True return self._session
[docs] async def close(self) -> None: if self._owns_session and self._session and not self._session.closed: await self._session.close()
async def _request( self, method: str, path: str, *, json: dict | None = None, params: dict | None = None, headers: dict | None = None, base_url: str | None = None, max_retries: int = 3, ) -> Any: session = await self._ensure_session() url = (base_url or self._config.api_base) + path hdrs = {"Content-Type": "application/json"} if headers: hdrs.update(headers) for attempt in range(max_retries): if params: logger.debug("%s %s params=%s", method, path, params) elif json is not None: logger.debug("%s %s body=%s", method, path, json) else: logger.debug("%s %s", method, path) t0 = time.monotonic() try: async with session.request( method, url, json=json, params=params, headers=hdrs ) as resp: data = await resp.json(content_type=None) elapsed_ms = (time.monotonic() - t0) * 1000 # Rate limit: check both code 1003 and HTTP 429 is_rate_limited = ( isinstance(data, dict) and data.get("code") == 1003 ) or resp.status == 429 if is_rate_limited: if attempt < max_retries - 1: wait = 2 ** (attempt + 1) logger.warning( "Rate limited on %s %s, retrying in %ds (attempt %d/%d)", method, path, wait, attempt + 1, max_retries, ) await asyncio.sleep(wait) continue raise RateLimitExceeded( message=( data.get("message", "Rate limit exceeded") if isinstance(data, dict) else "Rate limit exceeded" ), code=1003, ) if resp.status >= 400 and isinstance(data, dict): code = data.get("code") message = data.get("message") or data.get("error") or f"HTTP {resp.status}" logger.debug( "%s %s -> %d error (code=%s) %.0fms: %s", method, path, resp.status, code, elapsed_ms, message, ) if code is not None: from .errors import ERROR_CODE_MAP error_cls = ERROR_CODE_MAP.get(code, O2Error) # Augment revert messages even on code-based errors — # the backend sometimes returns code=1000 (InternalError) # with the revert info buried in the message field. reason = data.get("reason") receipts = data.get("receipts") if "Revert" in message or "revert" in message or "Panic" in message: from .onchain_revert import augment_revert_reason message = augment_revert_reason(message, reason, receipts) raise error_cls(message=message, code=code) if ("message" in data or "error" in data) and "tx_id" not in data: raise_for_error(data) else: logger.debug("%s %s -> %d %.0fms", method, path, resp.status, elapsed_ms) return data except aiohttp.ClientError as e: elapsed_ms = (time.monotonic() - t0) * 1000 if attempt < max_retries - 1: logger.debug( "%s %s -> network error, retrying (attempt %d/%d, %.0fms): %s", method, path, attempt + 1, max_retries, elapsed_ms, e, ) await asyncio.sleep(2**attempt) continue raise O2Error(message=str(e)) from e # ----------------------------------------------------------------------- # Market Data # -----------------------------------------------------------------------
[docs] async def get_markets(self) -> MarketsResponse: data = await self._request("GET", "/v1/markets") return MarketsResponse.from_dict(data)
[docs] async def get_market_summary(self, market_id: str) -> MarketSummary: data = await self._request("GET", "/v1/markets/summary", params={"market_id": market_id}) return MarketSummary.from_dict(data)
[docs] async def get_market_ticker(self, market_id: str) -> MarketTicker: data = await self._request("GET", "/v1/markets/ticker", params={"market_id": market_id}) return MarketTicker.from_dict(data)
[docs] async def get_depth( self, market_id: str, precision: int = 10, limit: int | None = None, ) -> DepthSnapshot: params: dict[str, Any] = {"market_id": market_id, "precision": precision} if limit is not None: params["limit"] = limit data = await self._request("GET", "/v1/depth", params=params) snapshot = DepthSnapshot.from_dict(data) # Client-side truncation: even if the backend doesn't support the # limit parameter yet, we honour it here so callers always get the # expected number of levels. if limit is not None: snapshot = DepthSnapshot( bids=snapshot.bids[:limit], asks=snapshot.asks[:limit], market_id=snapshot.market_id, ) return snapshot
[docs] async def get_trades( self, market_id: str, direction: str = "desc", count: int = 50, start_timestamp: int | None = None, start_trade_id: str | None = None, contract: str | None = None, ) -> list[Trade]: params: dict[str, Any] = { "market_id": market_id, "direction": direction, "count": count, } if start_timestamp is not None: params["start_timestamp"] = start_timestamp if start_trade_id is not None: params["start_trade_id"] = start_trade_id if contract is not None: params["contract"] = contract data = await self._request("GET", "/v1/trades", params=params) if isinstance(data, list): return [Trade.from_dict(t) for t in data] return [Trade.from_dict(t) for t in data.get("trades", [])]
[docs] async def get_trades_by_account( self, market_id: str, contract: str, direction: str = "desc", count: int = 50, start_timestamp: int | None = None, start_trade_id: str | None = None, ) -> list[Trade]: params: dict[str, Any] = { "market_id": market_id, "contract": contract, "direction": direction, "count": count, } if start_timestamp is not None: params["start_timestamp"] = start_timestamp if start_trade_id is not None: params["start_trade_id"] = start_trade_id data = await self._request("GET", "/v1/trades_by_account", params=params) if isinstance(data, list): return [Trade.from_dict(t) for t in data] return [Trade.from_dict(t) for t in data.get("trades", [])]
_VALID_RESOLUTIONS = frozenset( { "1s", "1m", "2m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "3d", "1w", "1M", "3M", } )
[docs] async def get_bars( self, market_id: str, from_ts: int, to_ts: int, resolution: str = "1h", ) -> list[Bar]: """Fetch OHLCV candle bars. Args: market_id: Market identifier. from_ts: Start timestamp in **milliseconds** (not seconds). to_ts: End timestamp in **milliseconds** (not seconds). resolution: Bar resolution. Valid values: ``1s``, ``1m``, ``2m``, ``3m``, ``5m``, ``15m``, ``30m``, ``1h``, ``2h``, ``4h``, ``6h``, ``8h``, ``12h``, ``1d``, ``3d``, ``1w``, ``1M``, ``3M``. Raises: ValueError: If *resolution* is not one of the valid values. """ if resolution not in self._VALID_RESOLUTIONS: raise ValueError( f"Invalid bar resolution {resolution!r}. " f"Valid values: {sorted(self._VALID_RESOLUTIONS)}" ) params: dict[str, Any] = { "market_id": market_id, "from": from_ts, "to": to_ts, "resolution": resolution, } data = await self._request("GET", "/v1/bars", params=params) if isinstance(data, list): return [Bar.from_dict(b) for b in data] return [Bar.from_dict(b) for b in data.get("bars", [])]
# ----------------------------------------------------------------------- # Account & Balance # -----------------------------------------------------------------------
[docs] async def create_account(self, owner_address: str) -> AccountCreateResponse: data = await self._request( "POST", "/v1/accounts", json={"identity": {"Address": owner_address}}, ) return AccountCreateResponse.from_dict(data)
[docs] async def get_account( self, owner: str | None = None, trade_account_id: str | None = None, ) -> AccountInfo: params: dict[str, str] = {} if owner: params["owner"] = owner elif trade_account_id: params["trade_account_id"] = trade_account_id data = await self._request("GET", "/v1/accounts", params=params) return AccountInfo.from_dict(data)
[docs] async def get_balance( self, asset_id: str, contract: str | None = None, address: str | None = None, ) -> Balance: params: dict[str, str] = {"asset_id": asset_id} if contract: params["contract"] = contract elif address: params["address"] = address data = await self._request("GET", "/v1/balance", params=params) return Balance.from_dict(data)
# ----------------------------------------------------------------------- # Orders # -----------------------------------------------------------------------
[docs] async def get_orders( self, market_id: str, contract: str | None = None, account: str | None = None, direction: str = "desc", count: int = 20, is_open: bool | None = None, start_timestamp: int | None = None, start_order_id: str | None = None, ) -> OrdersResponse: params: dict[str, Any] = { "market_id": market_id, "direction": direction, "count": count, } if contract: params["contract"] = contract elif account: params["account"] = account if is_open is not None: params["is_open"] = str(is_open).lower() if start_timestamp is not None: params["start_timestamp"] = start_timestamp if start_order_id is not None: params["start_order_id"] = start_order_id data = await self._request("GET", "/v1/orders", params=params) return OrdersResponse.from_dict(data)
[docs] async def get_order(self, market_id: str, order_id: str) -> Order: data = await self._request( "GET", "/v1/order", params={"market_id": market_id, "order_id": order_id} ) # API wraps order in an "order" key order_data = data.get("order", data) if isinstance(data, dict) else data return Order.from_dict(order_data)
# ----------------------------------------------------------------------- # Session Management # -----------------------------------------------------------------------
[docs] async def create_session(self, owner_id: str, session_request: dict) -> SessionResponse: data = await self._request( "PUT", "/v1/session", json=session_request, headers={"O2-Owner-Id": owner_id}, ) return SessionResponse.from_dict(data)
[docs] async def submit_actions(self, owner_id: str, actions_request: dict) -> ActionsResponse: data = await self._request( "POST", "/v1/session/actions", json=actions_request, headers={"O2-Owner-Id": owner_id}, ) result = ActionsResponse.from_dict(data) if not result.success: raise_for_error(data) return result
# ----------------------------------------------------------------------- # Account Operations # -----------------------------------------------------------------------
[docs] async def withdraw(self, owner_id: str, withdraw_request: dict) -> WithdrawResponse: data = await self._request( "POST", "/v1/accounts/withdraw", json=withdraw_request, headers={"O2-Owner-Id": owner_id}, ) return WithdrawResponse.from_dict(data)
# ----------------------------------------------------------------------- # Analytics # -----------------------------------------------------------------------
[docs] async def whitelist_account(self, trade_account_id: str) -> WhitelistResponse: data = await self._request( "POST", "/analytics/v1/whitelist", json={"tradeAccount": trade_account_id}, ) return WhitelistResponse.from_dict(data)
[docs] async def get_referral_info(self, code: str) -> ReferralInfo: data = await self._request("GET", "/analytics/v1/referral/code-info", params={"code": code}) return ReferralInfo.from_dict(data)
# ----------------------------------------------------------------------- # Aggregated endpoints # -----------------------------------------------------------------------
[docs] async def get_aggregated_assets(self) -> list[AggregatedAsset]: data = await self._request("GET", "/v1/aggregated/assets") if isinstance(data, list): return [AggregatedAsset.from_dict(a) for a in data] return [AggregatedAsset.from_dict(a) for a in data.get("assets", [])]
[docs] async def get_aggregated_orderbook( self, market_pair: str, depth: int = 500, level: int = 2 ) -> dict: return dict( await self._request( "GET", "/v1/aggregated/orderbook", params={"market_pair": market_pair, "depth": depth, "level": level}, ) )
[docs] async def get_aggregated_summary(self) -> list[dict]: data = await self._request("GET", "/v1/aggregated/summary") return data if isinstance(data, list) else data.get("summary", [])
[docs] async def get_aggregated_ticker(self) -> list[dict]: data = await self._request("GET", "/v1/aggregated/ticker") return data if isinstance(data, list) else data.get("ticker", [])
[docs] async def get_aggregated_trades(self, market_pair: str) -> list[Trade]: data = await self._request( "GET", "/v1/aggregated/trades", params={"market_pair": market_pair} ) items = data if isinstance(data, list) else data.get("trades", []) return [Trade.from_dict(t) for t in items]
# ----------------------------------------------------------------------- # Faucet (testnet/devnet/sandbox only) # -----------------------------------------------------------------------
[docs] async def mint_to_address(self, address: str) -> FaucetResponse: if not self._config.faucet_url: raise O2Error(message="Faucet not available on this network") data = await self._request( "POST", "", json={"address": address}, base_url=self._config.faucet_url, ) return FaucetResponse.from_dict(data)
[docs] async def mint_to_contract(self, contract_id: str) -> FaucetResponse: if not self._config.faucet_url: raise O2Error(message="Faucet not available on this network") data = await self._request( "POST", "", json={"contract": contract_id}, base_url=self._config.faucet_url, ) return FaucetResponse.from_dict(data)