An inordinate amount of time on the Serenity project recently has been spent compiling software, mostly due to Bunsen Labs being on an older Debian version, "Stretch" -- not ancient, but behind on things like compilers, Python version, etc.. First I went back-and-forth between CLang and gcc trying to get a modern C++ compiler setup, finally compiling my own gcc 9.2.x installation. I spent a while going down the rabbit hole of trying to get a decent websocket client library in C++, trying two and evaluating a third before deciding to put C++ aside for a few weeks and try to make progress elsewhere. More recently I've built Python 3.7 several times, each time finding an optional library which wasn't installed on my Linux desktop.
In short, I don't feel like I've done much at all except, well, recompile everything.
The next mini-project is in some ways the polar opposite of the C++ crypto marketdata library: I wanted to see how quickly, with how little code, I could just start capturing marketdata so I can do some analysis. The answer, thanks to the amazing libraries available in Python, is (a) very quickly; and (b) with very little code. So, with a serviceable build of Python 3.7.4 and PyCharm, let's do it.
About time
I looked at several scheduling packages in Python and settled on APScheduler, the Advanced Python Scheduler. What I liked about it, especially in comparison to Apache Airflow, was you built it up from object-oriented components, configuring just what you need. So, for instance, you can use a MongoDB backend to store jobs, but a database of some kind is not required. You can hook up to an event loop like Tornado if you so desire. And if you want a DAG-based job executor, you can pull in Spotify's Luigi. It looks like this:
from apscheduler.executors.tornado import TornadoExecutor
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.tornado import TornadoScheduler
from tornado.ioloop import IOLoop
def on_tick:
pass
if __name__ == '__main__':
scheduler = TornadoScheduler()
scheduler.add_jobstore(MemoryJobStore())
scheduler.add_executor(TornadoExecutor())
scheduler.add_job(on_tick, 'interval', minutes=1)
scheduler.start()
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
IOLoop.instance().start()
except (KeyboardInterrupt, SystemExit):
pass
Snapping Coinbase marketdata
The coinbasepro Python package wraps around the public and private API for the Coinbase Pro exchange, offering very simple object mechanisms. E.g. if you want to get the ticker with last trade details, etc.:
from coinbasepro import PublicClient
client = PublicClient()
data = client.get_product_ticker(symbol)
To make it more extensible I wrapped it in a class, encapsulating the details of how to talk to CBP:
from coinbasepro import PublicClient
from cloudwall.serenity.mdrecorder.api import MDSnapshotClient
class CoinbaseProSnapshotClient(MDSnapshotClient):
def __init__(self):
super().__init__()
self.client = PublicClient()
def snap_last_trade(self, symbol: str) -> dict:
return self.client.get_product_ticker(symbol)
Storing ticks with Arctic
Arctic is Man Group's tickstore built on top of MongoDB, open sourced in 2012. It promises impressive performance for a free product: "Arctic can query millions of rows per second per client, achieves ~10x compression on network bandwidth, ~10x compression on disk, and scales to hundreds of millions of rows per second per MongoDB instance."
What is particularly nice though is the tight integration with Pandas DataFrames, which makes Arctic very easy to use inside Jupyter notebooks for research purposes. First you create a store:
from datetime import datetime as dt
import pandas as pd
from arctic import Arctic
# Connect to the mongo-host / cluster
store = Arctic('localhost')
Then you create a library:
lib = 'COINBASE_PRO_ONE_MIN_SNAP'
tick_lib = store[lib]
From here, you can read straight into a DataFrame, and do fun things like use Plot.ly's Cufflinks to plot it:
df = tick_lib.read('BTC-USD')
import cufflinks as cf
cf.set_config_file(theme='ggplot',sharing='public',offline=True)
df['price'].iplot()
But how do we get the data in? Given our snapshotter output above, all we have to do is create a dictionary with a special key "index" equal to the timestamp we'll be using, and then write() it to a symbol:
def on_tick():
tick_logger = logging.getLogger(__name__)
try:
last = snapshotter.snap_last_trade('BTC-USD')
rxt = datetime.datetime.now(pytz.utc)
px = float(last['price'])
qty = float(last['size'])
last_row = [{'index': rxt, 'price': px, 'qty': qty}]
tick_lib.write('BTC-USD', last_row, metadata={'source': 'CoinbasePro'})
tick_logger.info("wrote latest trade to Arctic: {} @ {}".format(qty, px))
except CoinbaseAPIError:
tick_logger.info("ignoring transient error from Coinbase Pro API; will retry")
And that's it!
Note this first blog post is taking a very inefficient approach to using Arctic by storing tick-by-tick; their recommended usage pattern is to store in blocks of many rows either via a persistent queue or by running it in batch that bulk-imports ticks from flat file once or perhaps multiple times a day. The next post in this series will look at improving this and switching from one minute snaps to tick-by-tick storage of trade prints on Coinbase Pro.
Orchestrating with Docker Compose
Taken together the above gives you a simple scheduler that snaps Coinbase Pro last trade data once a minute in about 50 lines of code, including logging niceties. We can right-click and run it in PyCharm, but for a real-time scheduler really we want it running standalone. This is where Docker and Docker Compose come in: we can containerize not only our application but also the MongoDB instance that Arctic uses, and then just run the whole combination with a single command.
To start, we need a way to run the Python part of the application. This is done with this Dockerfile:
FROM python:3.7-slim-buster
COPY . /app
WORKDIR /app
RUN pip install -r requirements.txt
ENV PYTHONPATH "${PYTHONPATH}:/app"
CMD ["python", "cloudwall/serenity/mdrecorder/scheduler.py"]
and requirements.txt declaring everything we need:
APScheduler==3.6.1
arctic==1.79.2
coinbasepro==0.1.1
tornado==6.0.3
We then need to bind this all up with MongoDB. Note we create a link and a depends_on relationship from serenity_mdrecorder so Compose will create a DNS entry routing to "mongodb" and start up mongod before the main process boots up:
version: '2'
services:
serenity_mdrecorder:
build: .
container_name: "serenity_mdrecorder"
links:
- mongodb
depends_on:
- mongodb
mongodb:
image: mongo:latest
container_name: "mongodb"
volumes:
- /mnt/raid/data/behemoth:/data/db
ports:
- 27017:27017
command: mongod
The other notable customization here is the volume mapping from our local directory (/mnt/raid/data/behemoth) to /data/db. This gives the container access to the terabytes of RAID capacity in the Linux workstation. The net result runs nicely using PyCharm Pro's built-in support for Docker Compose, nicely illustrating the tree of processes, containers, etc. created:
The code
You can see the full code on GitHub: https://github.com/cloudwall/serenity-mdrecorder.