gtat-tech-career-kickstarte.../solution/src/info/instruments_manager.py

78 lines
3.8 KiB
Python

import logging
from typing import Generator
from info.i_instruments_listener import IInstrumentsListener
from info.i_order_book_listener import IOrderBookListener
from .models import Instrument
logger = logging.getLogger(__name__)
class InstrumentsManager(IOrderBookListener):
def __init__(self) -> None:
self.instrument_by_symbol: dict[str, Instrument] = {}
self.instrument_by_order_book_id: dict[int, Instrument] = {}
self._pending_tick_size_by_order_book_id: dict[int, float] = {}
self.instruments_listener: IInstrumentsListener | None = None
def set_instruments_listener(self, instruments_listener: IInstrumentsListener) -> None:
assert self.instruments_listener is None
assert len(self.instrument_by_symbol) == 0, "Instruments manager must be empty when setting instruments listener"
self.instruments_listener = instruments_listener
def add_instrument(self, instrument: Instrument) -> None:
if instrument.symbol in self.instrument_by_symbol:
raise ValueError(f"Instrument {instrument.symbol} already exists")
if instrument.order_book_id in self.instrument_by_order_book_id:
raise ValueError(f"Instrument with order book id {instrument.order_book_id} already exists")
self.instrument_by_symbol[instrument.symbol] = instrument
self.instrument_by_order_book_id[instrument.order_book_id] = instrument
if instrument.order_book_id in self._pending_tick_size_by_order_book_id:
tick_size = self._pending_tick_size_by_order_book_id[instrument.order_book_id]
del self._pending_tick_size_by_order_book_id[instrument.order_book_id]
instrument.tick_size = tick_size
logger.debug(f"Instrument {instrument.symbol} is now complete, publishing to listeners")
self._publish_instrument(instrument)
return
def exists(self, symbol: str) -> bool:
return symbol in self.instrument_by_symbol
def get_instrument_by_symbol(self, symbol: str) -> Instrument:
instrument = self.instrument_by_symbol.get(symbol)
if instrument is None:
raise ValueError(f"Instrument {symbol} not found")
return instrument
def get_instrument_by_order_book_id(self, order_book_id: int) -> Instrument:
instrument = self.instrument_by_order_book_id.get(order_book_id)
if instrument is None:
raise ValueError(f"Instrument with order book id {order_book_id} not found")
return instrument
def get_all_complete_instruments(self) -> Generator[Instrument, None, None]:
for instrument in self.instrument_by_symbol.values():
if instrument.is_complete():
yield instrument
# IOrderBookListener implementation
def on_order_book_created(self, order_book_id: int, tick_size: float) -> None:
logger.debug(f"Order book {order_book_id} created with tick size {tick_size}")
if order_book_id in self.instrument_by_order_book_id:
instrument = self.instrument_by_order_book_id[order_book_id]
instrument.tick_size = tick_size
logger.debug(f"Instrument {instrument.symbol} is now complete, publishing to listeners")
self._publish_instrument(instrument)
return
logger.debug(f"Order book {order_book_id} created but instrument not found, storing tick size for later")
self._pending_tick_size_by_order_book_id[order_book_id] = tick_size
def on_order_book_changed(self, order_book_id: int, change_timestamp: int) -> None:
pass
def _publish_instrument(self, instrument: Instrument) -> None:
assert self.instruments_listener is not None
assert instrument.is_complete()
self.instruments_listener.on_new_complete_instrument(instrument)