In the previous two articles of this series on the Serenity cryptocurrency trading system we built a polling REST marketdata recorder and a Pandas-based local tick store. In this next article we are going to upgrade that system to support real-time trade capture from multiple exchanges through their websocket interfaces: Coinbase Pro and Binance, in this case. This will give us fidelity down to the individual trade print without having to poll at a rate that the exchange would (rightly) reject.
The first challenge this presents is how to batch load the ticks. Our bi-temporal tick store is optimized for bulk insert and select, not update -- in this regard it's similar to Arctic, which we looked at previously. And just like Arctic, the preferred model is to first append to a persistent queue of some kind, and then read all the ticks once a day into the historical store.
Ideally what I wanted was something like Peter Lawrey's Chronicle Queue. It might be possible to use it directly with the help of the JPype library, but as it happens Jon Turner ported it to Python a few years ago; his pychro project, idle now, forms a good starting point for simpler, single-threaded implementation.
A memory-mapped journal file
Memory-mapping a file in Python is easy. Given a Path object and a size (we're going to use 64MB):
mmap_file = mmap_path.open(mode='r+b')
mm = mmap.mmap(mmap_file.fileno(), max_size)
you get back an object which looks like a regular byte array. To make it easy to append, we are going to build a simple abstraction around it which has the concept of a current pointer, an advance() operator, and a leading, embedded length integer which we can use to recover the current append location. The init method is straightforward except for the use of struct.unpack()
to read a long integer from the first four bytes of the memory map:
class MMap:
def __init__(self, mm):
self.mm = mm
self.start_pos = 4
self.pos = self.start_pos
self.len = ~struct.unpack('i', self.mm[0:self.start_pos])[0]
then we need some iterator-like methods:
def get_pos(self):
return self.pos
def next_pos(self, distance):
ret = self.pos
self.advance(distance)
return ret
def next_slice(self, distance):
ret = slice(self.pos, self.pos + distance)
self.advance(distance)
return ret
def advance(self, step: int):
self.pos += step
self.update_length()
def seek_end(self):
self.pos = len(self) + self.start_pos
and a mechanism to update the length, and clean up:
def update_length(self):
self.len = self.pos - self.start_pos
self.mm[0:4] = struct.pack('i', ~self.len)
def close(self):
if self.mm:
self.mm.close()
self.mm = None
The final operations enable our wrapper to look on the outside similar to a list or array:
def __getitem__(self, item):
return self.mm[item]
def __setitem__(self, key, value):
self.mm[key] = value
def __len__(self):
return self.len
def __del__(self):
return self.close()
Using this primitive we can now build on top readers and writers, for instance the read_double() operation can be expressed in terms of an unpack operation of type 'd' and a request to get a next slice of the array of 8 bytes:
def read_double(self) -> float:
return self._unpack_next('d', 8)
def _unpack_next(self, pattern: str, num_bytes: int):
return struct.unpack(pattern, self.mm[self.mm.next_slice(num_bytes)])[0]
while the corresponding write operation is very similar, packing 8 bytes into the next slice:
def write_double(self, value: float):
self._pack_next('d', 8, value)
def _pack_next(self, pattern: str, num_bytes: int, value):
self._check_space(num_bytes)
mm = self._get_current_mmap()
mm[mm.next_slice(num_bytes)] = struct.pack(pattern, value)
You can see the rest of the code in journal.py.
Tornado Websocket client
The next piece we need is a base class which models typical Websocket-based marketdata subscription: all the protocols I've seen so far start with an initial JSON subscribe message sent when connecting and then each tick is a JSON message that comes back on the socket. For this we'll use the Tornado event loop and its corresponding websocket module.
The other thing they have in common is the connections periodically disconnect, so we will also run a one-second keep-alive timer, again taking advantage of Tornado's event loop:
APPLICATION_JSON = 'application/json'
DEFAULT_KEEPALIVE_TIMEOUT_MILLIS = 1000
DEFAULT_CONNECT_TIMEOUT_SEC = 60
DEFAULT_REQUEST_TIMEOUT_SEC = 60
class WebsocketSubscriber(ABC):
logger = logging.getLogger(__name__)
def __init__(self, symbol: str, journal: Journal, loop: IOLoop,
keep_alive_timeout: int,
connect_timeout: int,
request_timeout: int):
self.symbol = symbol
self.appender = journal.create_appender()
self.connect_timeout = connect_timeout
self.request_timeout = request_timeout
self._ws_connection = None
self.loop = loop
# noinspection PyTypeChecker
PeriodicCallback(self._keep_alive, keep_alive_timeout).start()
The heart of the code is an async connect() method which starts a loop reading messages off the wire:
async def connect(self):
url = self._get_url()
self.logger.info("connecting to {} and subscribing to {} trades".format(url, self.symbol))
headers = httputil.HTTPHeaders({'Content-Type': APPLICATION_JSON})
request = httpclient.HTTPRequest(url=url,
connect_timeout=self.connect_timeout,
request_timeout=self.request_timeout,
headers=headers)
# noinspection PyAttributeOutsideInit
self._ws_connection = await websocket.websocket_connect(request)
self.send(json.dumps(self._create_subscribe_msg()))
while True:
msg = await self._ws_connection.read_message()
if msg is None:
self._on_connection_close()
break
self._on_message(msg)
plus a simple method to write messages back:
def send(self, data: str):
if not self._ws_connection:
raise RuntimeError('Websocket connection is closed.')
self._ws_connection.write_message(data)
These two operators plus a little JSON parsing and formatting code is enough to give us our full base class. You can see the rest in subscriber.py. Using it is simple: here's the complete Binance subscriber -- all we have to do is provide a URL, create a subscribe JSON message as a dictionary, and process the ticks by writing into our journal:
class BinanceSubscriber(WebsocketSubscriber):
def __init__(self, symbol: str, journal: Journal, loop: IOLoop = IOLoop.instance(),
keep_alive_timeout: int = DEFAULT_KEEPALIVE_TIMEOUT_MILLIS,
connect_timeout: int = DEFAULT_CONNECT_TIMEOUT_SEC,
request_timeout: int = DEFAULT_REQUEST_TIMEOUT_SEC):
super().__init__(symbol, journal, loop, keep_alive_timeout, connect_timeout, request_timeout)
def _get_url(self):
return 'wss://stream.binance.com:9443/stream'
def _create_subscribe_msg(self):
return {
"method": "SUBSCRIBE",
"params": ["btcusdt@trade"],
"id": 1
}
def _on_message_json(self, msg):
if 'stream' in msg:
self.appender.write_double(datetime.datetime.utcnow().timestamp())
self.appender.write_long(msg['data']['E'])
self.appender.write_string(msg['data']['s'])
self.appender.write_boolean(msg['data']['m'])
self.appender.write_double(float(msg['data']['q']))
self.appender.write_double(float(msg['data']['p']))
Scheduling the upload with APScheduler
We covered APScheduler previously, showing how you can quickly construct a scheduler application from your own callback functions and a set of helpful classes. This is the entire main() method:
if __name__ == '__main__':
init_logging()
scheduler = TornadoScheduler()
scheduler.add_jobstore(MemoryJobStore())
scheduler.add_executor(TornadoExecutor())
scheduler.add_job(upload_coinbase_ticks_daily, CronTrigger(hour=0, minute=15, second=0, timezone='UTC'))
scheduler.add_job(upload_binance_ticks_daily, CronTrigger(hour=0, minute=15, second=10, timezone='UTC'))
scheduler.start()
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
logger = logging.getLogger(__name__)
logger.info("starting Tornado")
IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
pass
and taking Binance as an example you can see how we use the Journal and JournalReader to loop through the events we wrote in our websocket on_message() callback and use the LocalTickstore developed in the last post:
def upload_binance_ticks_daily():
func_logger = logging.getLogger(__name__)
upload_date = datetime.datetime.utcnow().date() - datetime.timedelta(1)
symbol = 'BTC-USDT'
journal = Journal(Path('/behemoth/journals/BINANCE_TRADES/' + symbol))
reader = journal.create_reader(upload_date)
length = reader.get_length()
records = []
while reader.get_pos() < length:
time = reader.read_double()
trade_id = reader.read_long()
product_id = reader.read_string()
side = 'buy' if reader.read_boolean() else 'sell'
size = reader.read_double()
price = reader.read_double()
record = {
'time': datetime.datetime.fromtimestamp(time),
'trade_id': trade_id,
'product_id': product_id,
'side': side,
'size': size,
'price': price
}
records.append(record)
func_logger.info("uploading journaled Binance ticks to Behemoth for UTC date " + str(upload_date))
df = pd.DataFrame(records)
df.set_index('time', inplace=True)
func_logger.info("extracted {} trade records".format(len(df)))
tickstore = LocalTickstore(Path('/behemoth/db/BINANCE_TRADES'), 'time')
tickstore.insert(symbol, BiTimestamp(upload_date), df)
tickstore.close()
func_logger.info("inserted {} records".format(len(df)))
Plotting price & trade sizes
Switching over to Jupyter, after UTC midnight every day we can get access to the latest trade data:
trades_tickstore = LocalTickstore(Path('/mnt/raid/data/behemoth/db/COINBASE_PRO_TRADES'), 'time')
trades = trades_tickstore.select('BTC-USD', start=dt(2015, 7, 20), end=dt(2019, 11, 30))
and then plot it -- here we use Plot.ly to create a nice double price and trade size plot:
# Create figure with secondary y-axis
fig = make_subplots(specs=[[{"secondary_y": True}]])
# Add traces
fig.add_trace(
go.Scatter(x=trades.index, y=trades['price'], name="price ($)"),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=trades.index, y=trades['size'], name="size (BTC)"),
secondary_y=True,
)
# Add figure title
fig.update_layout(
title_text="BTC-USD price & volumes"
)
fig.show()