In this next edition of the series on building the Serenity cryptocurrency trading system I'm going to focus more on the research part of the problem, specifically how to collect, query and analyze Bitcoin tick data. We will look at how to use Pandas and its in-built support for the HDF5 storage format to build a simple tick database, Behemoth; Behemoth will run on the RAID array on my desktop Linux box, but the intent is to scale it to the cloud with the Azure Storage API in the future.
Introductions are in order
Makas Tzavellas and I met when working together in Shanghai at Morgan Stanley ten years ago, and as he's recently joined as a collaborator in GitHub you'll be seeing mentions of him and his work in the blog going forward.
Behemoth vs. Arctic
In the last article we looked at using Man AHL's Arctic tickstore to capture one minute snapshots of marketdata. Unfortunately this approach is not ideal: not only is polling inefficient, but you cannot generate proper OHLC (Open/High/Low/Close) one-minute tick bins just by observing periodically; you need to capture every trade print for that. And once frequency goes up we have a problem: Arctic really prefers to ingest blocks of data, not a tick-at-a-time, which means we need some way to "stage" the ticks on local disk and then use our scheduler only to do periodic uploads.
Makas pointed out that that using Parquet or HDF5 for the on-disk store could form the basis for the entire tickstore, not just a staging area, and so I started working on code to do just that, with the idea that in the worst case if it did not perform well we could still use it for staging data into Arctic when we tackle the next phase, the real-time trade print feed.
Design
The quants I know are near-religious on two topics: collecting as much data as possible, and ensuring that their results are reproducible weeks or even months after initial tests. The latter point leads to a bitemporal database design, one where you never delete or update data: you append new versions that supersede the older ones, but preserve the ability to read the data "as-of" an earlier time. Behemoth thus needs to be designed from the start with this idea.
The other important point is partitioning of data. This is to prepare for parallel processing: we should make it easy to extract data along logical partitioning lines like symbol & date. To this end Behemoth stores data on disk in date splays with a version number on each file, e.g. $db/YYYY/MM/DD/$symbol_$version.h5. Given a list of files we can then stitch together a complete timeseries using existing Pandas API.
But how can we quickly identify which files we want? We need a high-performance index. This leads to the final design point in Behemoth: we are going to use a Pandas DataFrame with a multi-level index (symbol / date) to help us quickly identify which file(s) we should load given a query and an as-of time. We will store this index using HDF5 with compression, same as the tick data.
It's worth pointing out at this point that Behemoth is a terrible general database. It lacks locking, atomic writes, a network protocol for remote access and concurrent access support. The index choice in particular is very limiting. What we trade away in features we gain back in simplicity: the entire LocalTickstore is around 200 lines of Python with comments.
LocalTickstore implementation
LocalTickstore implements three key operations: select, insert and delete. You can see the full code on GitHub: tickstore.py.
Insertion
Let's start by looking at the insert operation, for now ignoring the details of how the supporting class DataFrameIndex works: we just give it a function which composes the splay path according to our convention.
The last line is the most interesting. We are using Pandas support for writting HDF5 via PyTables, and chose blosc compression because of its balance between read/write performance and compression ratio.
def insert(self, symbol: str, ts: BiTimestamp, ticks: pd.DataFrame):
self._check_closed('insert')
as_at_date = ts.as_at()
# compose a splay path based on YYYY/MM/DD, symbol and version and pass in as a functor
# so it can be populated with the bitemporal version
def create_write_path(version):
return self.base_path.joinpath('{}/{:02d}/{:02d}/{}_{:04d}.h5'.format(as_at_date.year,
as_at_date.month,
as_at_date.day,
symbol, version))
write_path = self.index.insert(symbol, as_at_date, create_write_path)
# do the tick write, with blosc compression
write_path.parent.mkdir(parents=True, exist_ok=True)
ticks.to_hdf(str(write_path), 'ticks', mode='w', append=False, complevel=9, complib='blosc')
Digging a bit into the DataFrameIndex, the main part of the logic for insertion revolves around updating the index according to our bitemporal conventions. Essentially the index is append-only: new versions are new rows tagged with end_time = BiTimestamp.latest_as_of, and insertion of a new row automatically updates the previous row's end_time flag to current timestamp; we'll see how this convention helps us efficiently select the version we want in a moment.
def insert(self, symbol: str, as_at_date: datetime.date, create_write_path_func) -> Path:
# if there's at least one entry in the index for this (symbol, as_at_date
# increment the version and set the start/end times such that the previous
# version is logically deleted and the next version becomes latest
if self.df.index.isin([(symbol, as_at_date)]).any():
all_versions = self.df.loc[[(symbol, as_at_date)]]
start_time = datetime.datetime.utcnow()
end_time = BiTimestamp.latest_as_of
prev_version = all_versions['version'][-1]
version = prev_version + 1
all_versions.loc[(all_versions['version'] == prev_version), 'end_time'] = start_time
self.df.update(all_versions)
else:
start_time = BiTimestamp.start_as_of
end_time = BiTimestamp.latest_as_of
version = 0
write_path = create_write_path_func(version)
Deletion
Logical delete is entirely done in the index -- we don't touch the HDF5 tickstore -- and basically involves selecting the last version row and updating its end_time to be current time:
def delete(self, symbol: str, as_at_date: datetime.date):
if self.df.index.isin([(symbol, as_at_date)]).any():
all_versions = self.df.loc[[(symbol, as_at_date)]]
start_time = datetime.datetime.utcnow()
# logically delete by setting the most recent version to end now; note this means that deletes
# don't have a version number or row, so may want to think about this
prev_version = all_versions['version'][-1]
all_versions.loc[(all_versions['version'] == prev_version), 'end_time'] = start_time
self.df.update(all_versions)
To be honest I'm not entirely happy with this scheme as it doesn't create a version number for the delete, and does not create an audit trail if we added an undelete operation later, but it should serve our needs for now.
Selection
Finally, let's get some data out of the Tickstore with as-of date semantics. The first part of the operation has to be done against the index: extracting all matching paths for our start date / end date / as of date query. This relies on a very cool feature in Pandas where you can create a mask with what is effectively a where clause. Here we're looking for dates between our start and end date (inclusive) where the as_of_time falls in between the start and end time. Here is where the bitemporal scheme kicks in: because we set the latest row's end_time to what is effectively the end of time, the latest row is always selected when we pass in the current time. Similarly, the first row is a start of time marker.
def select(self, symbol: str, start: datetime.datetime, end: datetime.datetime,
as_of_time: datetime.datetime) -> pd.DataFrame:
# short circuit if symbol missing
if not self.df.index.get_level_values('symbol').contains(symbol):
return pd.DataFrame()
# find all dates in range where as_of_time is between start_time and end_time
symbol_data = self.df.loc[symbol]
mask = (symbol_data.index.get_level_values('date') >= start) \
& (symbol_data.index.get_level_values('date') <= end) \
& (symbol_data['start_time'] <= as_of_time) \
& (symbol_data['end_time'] >= as_of_time)
selected = self.df.loc[symbol][mask]
return selected
Downloading Coinbase Pro data
The final piece of the puzzle is relatively straightforward: downloading historical data from Coinbase Pro via their REST API. Once again the coinbasepro
package from pip serves us well. This code handles chunking of the calls to the API, because Coinbase Pro limits the number of historical ticks you can download at one time, and progressively builds a list of rates which we can turn into a DataFrame and write to LocalTickstore:
def download(self, symbol: str, start_date, end_date):
client = cbp.PublicClient()
# if dates passed on command line they will be of type string
if type(start_date) == str:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
if type(end_date) == str:
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# start date stepping
delta = timedelta(days=1)
while start_date <= end_date:
all_raw_rates = []
# load data 4 hours at a time, up until 23:59:00
for h in range(0, 24, 4):
start = start_date.strftime('%Y-%m-%d') + ' {:02d}:{:02d}:00.000'.format(h, 0)
if h + 4 == 24:
h = 23
end_minute = 59
else:
h = h + 4
end_minute = 0
stop = start_date.strftime('%Y-%m-%d') + ' {:02d}:{:02d}:00.000'.format(h, end_minute)
print('downloading ' + start + ' - ' + stop)
raw_rates = client.get_product_historic_rates(symbol, start=start, stop=stop)
all_raw_rates.extend(raw_rates)
sleep(1)
if len(all_raw_rates) > 0:
# convert one day's data into pandas, and convert all the decimal typed fields
# from the coinbasepro API into float; h5py doesn't support decimal serialization
hist_rates = pd.DataFrame(all_raw_rates)
hist_rates.set_index('time', inplace=True)
hist_rates['open'] = hist_rates['open'].astype(float)
hist_rates['high'] = hist_rates['high'].astype(float)
hist_rates['low'] = hist_rates['low'].astype(float)
hist_rates['close'] = hist_rates['close'].astype(float)
hist_rates['volume'] = hist_rates['volume'].astype(float)
# force ascending sort on time
hist_rates.sort_index(inplace=True)
# write HDF5 with compression
print('writing historical rates to Tickstore')
self.tickstore.insert(symbol, BiTimestamp(start_date), hist_rates)
start_date += delta
Trying it out
After downloading all the data since 2015 I then pulled it into Jupyter:
tickstore = LocalTickstore(Path('/mnt/raid/data/behemoth/db/COINBASE_PRO_ONE_MIN_BINS'), 'time')
df = tickstore.select('BTC-USD', start=dt(2015, 7, 20), end=dt(2019, 10, 31))
This is reasonably fast, but not great: 5.5 seconds to load up four years of splayed one-minute bin data. Now that we have it in memory, let's try it out and do a four year analysis of volatility vs. time of day and trade volumes. First we need to set up Jupyter with Cufflinks:
import cufflinks as cf
cf.set_config_file(theme='ggplot',sharing='public',offline=True)
and prepare hourly-bins with hour of day as well as volatility of the closing price:
hist_1h = df.resample('1H').agg({'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
})
hist_1h['hour_of_day'] = hist_1h.index.hour
hist_1h['month_of_year'] = hist_1h.index.month
hist_1h['pct_change'] = hist_1h['close'].pct_change(1)
hist_1h['volatility'] = hist_1h['pct_change'].rolling('1D').std()*(252**0.5)
then plot both volatility & a bubble plot of distribution by hour & size:
hist_1h['volatility'].iplot()
hist_1h.iplot(kind='bubble', size='volume', x='hour_of_day', y='volatility')
The result:
Integrating CI pipelines
Makas took on setting up Microsoft Teams & Azure DevOps so we could have an environment for collaboration. As a longtime Linux developer and Mac user I have to admit I was impressed with what Microsoft has built here, especially the depth of integration with their GitHub acquisition.
They have a nice, clean Kanban-style board:
and most importantly an entirely containerized CI tool called Pipelines which integrates with GitHub, so you can very easily set it up to trigger on GitHub commits: