Previously I built a local tickstore (covered in the blog post "Building Behemoth") based on HDF5, pandas and RAID storage. While the RAID storage gives me a degree of confidence, depending on a single Linux server sitting on the floor next to my desk is not ideal. Furthermore, it would be even better if researchers didn't have to care about whether they have the data locally or not: if we can build a caching data layer on top of Azure, the synchronization can happen automatically inside a Jupyter notebook.
The AzureBlobTickstore
aims to solve this problem using a combination of the diskcache LRU caching package and Microsoft's SDK for Azure. (Note that a similar approach would work for S3 buckets; that's left as an exercise for the reader.)
Getting started
To connect to Azure we're going to need to create a storage account and get its connection string from the Azure portal. I created an account called "cloudwall" -- below shows you how to get to the Access Keys, which is where you'll find the connection string you need to initialize your blob service connection.
You can then connect to the storage API:
def __init__(self, connect_str: str, db_name: str, cache_dir: Path = Path('/var/tmp/abs_lru_cache'), timestamp_column: str = 'date'):
self.storage = BlobServiceClient.from_connection_string(connect_str)
self.container_name = db_name.replace('_', '-').lower()
self.container_client = self.storage.get_container_client(self.container_name)
We'll need two objects: a blob service client and a container client. The latter can be used to create a new container for blobs:
# try and create the container
try:
self.storage.create_container(self.container_name)
except ResourceExistsError:
self.logger.info(f'container {self.container_name} already exists')
One caveat: container names are fairly restricted. You need to construct a name with numbers, letters and single dashes only, and it must be lowercase.
Creating the cache
You can create a diskcache Cache
with a one-liner:
self.cache = Cache(str(self.cache_dir))
Once you've done this you have a fast, local disk cache which supports get() and set() of byte arrays.
Inserting data to Azure (and the cache)
As a refresher on the Tickstore
abstraction, this class provides a bi-temporal interface to a tick store that is partitioned by year/month/day on disk or, in this case, in the blob storage. The ticks are represented by a pandas DataFrame
-- and so our first task is to convert those ticks into a portable format. Here's how you do that with PyArrow:
# encode DataFrame as parquet byte stream
table = pyarrow.Table.from_pandas(df=ticks)
buf = pyarrow.BufferOutputStream()
pyarrow.parquet.write_table(table, buf)
# update cache
data_bytes = bytes(buf.getvalue())
self.cache.set(blob_path, data_bytes)
What we've done here is build an in-memory table and then used PyArrow's native streaming mechanism to write it into a buffer. We then can just take those bytes and inject them into our LRU cache. Next up we want to do the same for Azure:
# upload to Azure
self.logger.info(f'uploading to Azure: {blob_path}')
blob_client = self.storage.get_blob_client(container=self.container_name, blob=str(blob_path))
try:
blob_client.upload_blob(io.BytesIO(data_bytes), overwrite=False)
except ResourceExistsError:
self.logger.info(f'skipping upload of existing blob: {blob_path}')
Note the use of BytesIO instead of a file so we can read from an array of bytes, and the optimization where we try and create the blob and then ignore ResourceExistsError
-- this is faster than doing a table scan of blobs with the list_blobs()
operation to determine if the blob_path is already there. Once we've uploaded we can use Microsoft's very nice Storage Explorer to browse our blobs. See how it's interpreted the slash-delimited names as a folder hierarchy:
Fast selects
The Tickstore lets you select a datetime range of data. Under the hood what it does is query a Pandas MultiIndex which helps it quickly list out the paths (blobs) containing the ticks matching the select criteria. We already implemented most of the hard parts in the previous article on Behemoth, so we'll focus on the part specific to the Azure caching implementation.
def read(self, logical_path: str) -> pd.DataFrame:
logical_prefix = f'azure:{self.container_name}'
key = logical_path[len(logical_prefix) + 1:]
cached_data = self.cache.get(key)
if cached_data is not None:
self.logger.info(f'reading ticks from cache: {key}')
return pd.read_parquet(io.BytesIO(cached_data))
else:
blob_client = self.storage.get_blob_client(container=self.container_name, blob=key)
tick_blob = blob_client.download_blob()
tick_data = tick_blob.readall()
self.cache.set(key, tick_data)
return pd.read_parquet(io.BytesIO(tick_data))
Again note we're using BytesIO, this time with the read_parquet()
operation, and we're again using the blob client to facilitate the download.
Doing some analysis
Now that we have a means of loading an existing TickStore into the cloud, let's load it into Jupyter! Since this is bleeding-edge Serenity code and not yet on PyPi, you'll first need to update the PYTHONPATH to point to your local shadow:
import os
import sys
sys.path.insert(0, os.path.abspath('/Users/kdowney/dev/shadows/serenity/src'))
Then install the necessary dependencies in your Anaconda environment (or whatever virtual environment you are using for Jupyter:
pip install azure-storage-blob
pip install diskcache
pip install pyarrow
and you should be able to connect:
from serenity.tickstore.tickstore import AzureBlobTickstore
connect_str = '****'
db = 'PHEMEX_TRADES'
cloud_tickstore = AzureBlobTickstore(connect_str, db, timestamp_column='time')
and select:
df = cloud_tickstore.select('BTCUSD', datetime.datetime(2020, 4, 20),
datetime.datetime(2020, 8, 1))
df.head()
Now we're cooking with gas, folks. Let's do some basic time series analysis, starting by converting our 1.3 million row DataFrame into 5 minute bins with OHLC (open/high/low/close) format. As the data has some N/A values we'll ffill as well -- focusing on the close price, which we'll be analyzing.
phemex_5m = df['price'].resample('5Min').ohlc()
phemex_5m.close = phemex_5m.close.fillna(method="ffill")
phemex_5m.close.isna().sum()
We can get some descriptive statistics, and plot it as well:
We can construct a probability plot -- clearly Bitcoin prices are not normally distributed!
And we can look at the ACF (Autocorrelation Function) and PACF (Partial Autocorrelation Function) to get a sense of to what degree Bitcoin prices are correlated to the prices that come before. The ACF shows influence on current price going back up to 40 lags, so we'll focus in on PACF instead, which as expected shows that a 5 minute bin's price is highly correlated to the previous 5 minute bin closing price, with weak correlation two and five lags back: