In the last post on Serenity we looked at the Tau functional reactive programming library. In this post we will move beyond simple math processing with Tau and start connecting it to I/O, in particular the Websocket channels commonly used for distributing marketdata from cryptocurrency exchanges. To do this we'll need to make some big changes to Tau to support asyncio.
Async Tau: Hello, world!
With the latest version of Tau I've removed the dependency on APScheduler and integrated it with Python's built-in asyncio framework. As such, Hello, world! looks a tiny bit different:
import asyncio
from tau.core import NetworkScheduler
from tau.event import Do
from tau.signal import From
async def main():
scheduler = NetworkScheduler()
signal = From(scheduler, ["world"])
Do(scheduler.get_network(), signal, lambda: print(f"Hello, {signal.get_value()}!"))
asyncio.run(main())
Most notably you need to run an async method to bootstrap Tau -- to make sure the event loop is up. Be sure to check out the latest code for the details.
Linking Tau & websockets
The websockets library provides a natural asyncio-based API for creating Websocket clients & servers, so it's a natural fit to the new model in Tau. Let's subscribe to BTC/USD trade data on the Phemex exchange, which we covered previously in the Goddess of Fame post. To demonstrate how Tau interacts with it, what we'll do is post the messages received on the socket onto a MutableSignal
and then just print them with a Do
operator:
import asyncio
import json
import websockets
from tau.core import NetworkScheduler, MutableSignal
from tau.event import Do
async def subscribe_trades(message_callback):
uri = "wss://phemex.com/ws"
async with websockets.connect(uri) as websocket:
subscribe_msg = {
'id': 1,
'method': 'trade.subscribe',
'params': ['BTCUSD']
}
await websocket.send(json.dumps(subscribe_msg))
while True:
scheduler.schedule_update(message_callback, await websocket.recv())
messages = MutableSignal()
scheduler = NetworkScheduler()
Do(scheduler.get_network(), messages, lambda: print(f"{messages.get_value()}"))
asyncio.get_event_loop().run_until_complete(subscribe_trades(messages))
At this point you are probably asking yourself: why not just call print(await websocket.recv()
inside the white loop? Though there are plenty of ways to crack this problem, this is the essence of why Tau exists: in the code above we've completely separated the business logic (printing) from the infrastructure code for an async WSS client by creating an abstraction for the "what to do with the real-time events?" code. And we've done it in a way that supports functionally reactive pipelines and graphs, allowing arbitrarily-sophisticated code outside of the main event processing loop.
Phemex sends us several types of messages on the stream, but let's say we only care about the type=incremental messages. This can be done without touching the Websocket code, and can make use of the built-in Map and Filter operators:
scheduler = NetworkScheduler()
network = scheduler.get_network()
json_messages = Map(network, messages, lambda x: json.loads(x))
incr_messages = Filter(network, json_messages,
lambda x: x.get('type', None) == 'incremental')
Do(network, incr_messages, lambda: print(f"{messages.get_value()}"))
And you can have an arbitrarily-complex Network, a whole graph of business logic, e.g. you could connect a logger, a market data recorder and a routine to generate one minute bins to the incr_messages
object and Tau would ensure that those three nodes all get called on every valid trade message. We can now rewrite with Tau Serenity's marketdata recorder, which we previously covered in the Streaming Marketdata Recorder post -- see phemex.py.
Building a Docker image
In the previous iteration of the Serenity MD Recorder, there were multiple Dockerfiles, one per recorder, with a single docker-compose.yaml
tying them together. With the switch to Kubernetes, we will need to build and push a single docker image with the serenity-mdrecorder code. This will be deployed to kdowney1974/serenity-mdrecorder on Docker Hub.
cd serenity/serenity-mdrecorder
docker build -t serenity-mdrecorder .
docker tag serenity-mdrecorder kdowney1974/serenity-mdrecorder:v1
docker push kdowney1974/serenity-mdrecorder:v1
You can test it out by logging into the container -- it will start to journal trades in the container's transient filesystem:
docker run -it kdowney1974/serenity-mdrecorder:v1 bash
root@62dde895c160:/app# python cloudwall/serenity/mdrecorder/phemex.py
2020-04-07 01:25:31,384 - cloudwall.serenity.mdrecorder.journal - INFO - initializing journal file at /behemoth/journals/PHEMEX_TRADES/BTCUSD/20200407/journal.dat
2020-04-07 01:25:31,450 - __main__ - INFO - journaling ticks to /behemoth/journals/PHEMEX_TRADES/BTCUSD
Obviously that's not what we want. What we want is a mapping to real, persistent storage as well as a mechanism to ensure the recorder is always running. This brings us back to Kubernetes.
Deploying with microk8s on Ubuntu
After hitting some limitations with Kubernetes-in-Docker (kind, covered in the blog post "Inception") as well as Docker Compose, I decided to switch to Canonical's microk8s on Ubuntu Server 18.04 for the Serenity market data recorder, and go all-in with Kubernetes.
Installation is done via snap:
# install core package from "stable" channel
sudo snap install microk8s --classic
# enable some add-ons
sudo microk8s.enable dashboard
sudo microk8s.enable dns
sudo microk8s.enable helm
and then you can use the microk8s.kubectl
command just like the regular kubectl.
Deploying our first Tau-based recorder
Returning to Serenity, we should now have a good foundation to deploy our first marketdata recorder, using the new Tau-based Phemex recorder as an example. The first thing we need is a storage mapping for Behemoth so we have persistent storage:
kind: PersistentVolume
apiVersion: v1
metadata:
name: behemoth-pv-volume
labels:
type: local
app: behemoth
spec:
storageClassName: manual
capacity:
storage: 50Gi
accessModes:
- ReadWriteMany
hostPath:
path: "/mnt/raid/data/behemoth"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: behemoth-pv-claim
labels:
app: behemoth
spec:
storageClassName: manual
accessModes:
- ReadWriteMany
resources:
requests:
storage: 50Gi
and then we can reference both the container we already built and the storage objects:
apiVersion: apps/v1
kind: Deployment
metadata:
name: phemex-recorder
labels:
app: phemex-recorder
spec:
replicas: 1
selector:
matchLabels:
app: phemex-recorder
template:
metadata:
labels:
app: phemex-recorder
spec:
containers:
- name: phemex-recorder
image: kdowney1974/serenity-mdrecorder:v1
command: ["python"]
args: ["cloudwall/serenity/mdrecorder/phemex.py"]
volumeMounts:
- mountPath: /behemoth
name: behemoth
volumes:
- name: behemoth
persistentVolumeClaim:
claimName: behemoth-pv-claim
The key thing is we can do the same for coinbase.py, scheduler.py and binance.py, the existing other recorder processes, just by referencing the same storage and changing the args we pass to the Python script. This lets a single Dockerfile provide code for multiple microservices.
Once it's all installed with microk8s.kubectl apply
you can list the pods and see it all running:
kdowney@charger:~/dev/shadows/serenity$ sudo microk8s.kubectl get pods
NAME READY STATUS RESTARTS AGE
binance-recorder-99579bc77-jf2fw 1/1 Running 0 28m
coinbase-recorder-6ff59745c9-n2nsc 1/1 Running 0 28m
phemex-recorder-56d9d4f6d8-swcjq 1/1 Running 0 48m
scheduler-c4cd65cf4-mnsc5 1/1 Running 0 30m
timescaledb-7f55d47c8-24ngc 1/1 Running 9 24d