Labor Day's coding project was to figure out how to connect to the GDAX cryptocurrency exchange's websocket feed, building on some work done previously to connect via the REST API. As the latter had used Netty for its HTTP client and Jackson for the JSON support I wanted to do the same for Websockets to be consistent.
GDAX
GDAX is Coinbase's cryptocurrency trading platform, which currently offers Bitcoin, Ether and Litecoin trading. In addition to its Web-based trading GUI it supports REST, Websockets and even FIX-based connectivity for its various services, and the online API documentation is decent. Their primary client platform appears to be Node.JS, with Java supported just by an unofficial client for the REST interface only, gdax-java.
The exchange operates 24x7 out of the Amazon AWS U.S. East datacenters, and for marketdata you can connect freely to their public endpoint at wss://ws-feed.gdax.com. This is typical of the cryptocurrency exchanges: most of them are Web-native and do not charge or even require registration to get access to their marketdata, though you do have to roll your own code, and not every exchange offers as rich a feed as GDAX.
GDAX has level 3 marketdata: full book snapshots plus incremental updates on Websockets with a sequence number to tie the two together. The messages themselves are also quite detailed, e.g. here is a sample trade event:
{
"type": "match",
"trade_id": 10,
"sequence": 50,
"maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8",
"taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1",
"time": "2014-11-07T08:19:27.028459Z",
"product_id": "BTC-USD",
"size": "5.23512",
"price": "400.23",
"side": "sell"
}
GDAX has a maker-taker fee model, with a 0.25% fee charged to liquidity takers (aggressive orders) and nothing charged to market makers (passive orders). When a trade happens they tell you which order was the aggressor and for side they tell you which side the market maker traded on. In this case we know that this match happened because a buyer crossed the spread and paid a fee. (Given Bitcoin is now worth more than 10x what the buyer paid back in 2014, it seems it was worth it.)
Netty
Netty's design is built around Channels and a pipeline of ChannelHandlers which perform different functions. Conceptually, the secure Websockets protocol (WSS) can be thought of a stack of:
- SSL handler
- HTTP handler
- Websocket frame handler
- your frame handler
and what's nice about Netty is the API reflects this stack. We start by defining an SSL context:
SslContext sslCtx;
try {
sslCtx = SslContextBuilder
.forClient()
.trustManager(getClass().getResourceAsStream("gdax.pem"))
.build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
Note here I elected to make the trust for GDAX's SSL certificate explicit so the code can only securely connect to SSL on gdax.com.
We then need a Websocket handler:
WebSocketClientProtocolHandler wsProtocolHandler =
new WebSocketClientProtocolHandler(
WebSocketClientHandshakerFactory.newHandshaker(
new URI("wss://" + host), WebSocketVersion.V13, null,
true, new DefaultHttpHeaders()));
This is mostly boilerplate, but it's worth calling out the boolean parameter in newHandshaker()
. Here we are allowing the handshaker to negotiate extensions with the remote server, e.g. permessage-deflate -- more on this later.
And now the pipeline. We construct it using a fluent API, which just as described above builds up from the basics of a socket-based network channel to add SSL, HTTP and finally Websockets:
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(sslCtx.newHandler(ch.alloc()));
p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));
p.addLast(wsProtocolHandler);
}
});
b.connect();
All we need to do now is add our own logic, which is just another handler at the end. In this case every time we get an incoming message all we do is print its text to console:
p.addLast(new SimpleChannelInboundHandler<TextWebSocketFrame>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
System.out.println(frame.text());
}
});
That's great, but it doesn't quite complete the protocol. GDAX requires that we write a subscribe message to the Websocket channel first. Initially I just connected with Bootstrap#connect()
and wrote the message, which didn't work ... except when I paused it for a period of time in the debugger. That suggested a timing-related problem, and after re-reading the Netty examples I realized that the timing difference had to do with whether the subscription message got sent before or after the Websocket client & server had finished their handshake. Thankfully there is another method we can override in our handler, and a hook that will get called post-handshake:
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent
.HANDSHAKE_COMPLETE) {
SubscribeRequest request = new SubscribeRequest(factory, products);
TextWebSocketFrame frame = request.encodeFrame();
ctx.channel().writeAndFlush(frame);
} else {
super.userEventTriggered(ctx, evt);
}
}
So all that's left is (a) a means of parsing incoming messages; and (b) a means of generating that subscribe message, which brings us to Jackson.
Streaming JSON with Jackson
The Jackson framework includes DOM-like and data binding mechanisms for JSON, but they are built on top of a simpler, streaming API that is more analogous to an XML pull parser. As we are going to process a lot of marketdata messages we probably do not want to build a DOM tree in memory or do Reflection-based mapping of fields just to be able to pick out a few values.
The starting point for Jackson streaming is com.fasterxml.jackson.core.JsonFactory
. Writing is done by creating a generator, and afterward you just generate the JSON elements with writeXxx() methods. Since the goal here is to build a TextWebSocketFrame, which takes a String, we just write to a StringWriter:
TextWebSocketFrame encodeFrame() throws IOException {
StringWriter w = new StringWriter();
JsonGenerator generator = factory.createGenerator(w);
generator.writeStartObject();
generator.writeFieldName("type");
generator.writeString("subscribe");
generator.writeFieldName("product_ids");
generator.writeStartArray();
products.forEach(product -> {
try {
generator.writeString(product.getCode());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
generator.writeEndArray();
generator.writeEndObject();
generator.flush();
String txt = w.toString();
return new TextWebSocketFrame(true, 0, txt);
}
Parsing is a bit more complicated, because I wanted to differentiate parsing based on the type
attribute in the JSON message. The model is a pull parser, which each nextToken()
call followed by a getXxx() method to get different fields:
void parseFrame(JsonParser parser, Map<String, ProductId> productIdMap) throws IOException {
JsonToken token;
while ((token = parser.nextToken()) != JsonToken.END_OBJECT) {
switch (token) {
case FIELD_NAME:
String fieldName = parser.getCurrentName();
switch (fieldName) {
case "sequence":
parser.nextToken();
sequence = parser.getValueAsLong();
break;
case "product_id":
parser.nextToken();
String productCode = parser.getValueAsString();
productId = productIdMap.get(productCode);
break;
case "side":
parser.nextToken();
String sideCode = parser.getValueAsString();
if ("buy".equals(sideCode)) {
side = Side.BUY;
} else {
side = Side.SELL;
}
break;
case "size":
parser.nextToken();
size = new BigDecimal(parser.getValueAsString());
break;
case "price":
parser.nextToken();
price = new BigDecimal(parser.getValueAsString());
break;
default:
break;
}
default:
break;
}
}
}
Final note: message compression
You can see the complete example on Github, and in there you may come across a warning about io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
. As mentioned above, you can let client & server negotiate a set of extensions, and one of the server extensions GDAX supports is permessage-deflate
, and you can get it with:
addLast(WebSocketClientCompressionHandler.class)
Unfortunately I found during testing that the version used by GDAX and Netty is not quite compatible (a very similar issue to this one from 2016), and I had to remove it.