After experimenting with Polygon.io I decided to try out Sharadar, which is delivered via the Quandl API.It had a very good reputation as a survivorship-bias free dataset and Quandl offered a $89/month package deal for a decade of Sharadar fundamentals, equity prices, holdings data, corporate actions and more. The Sharadar Core US Equities Bundle is now integrated with Serenity, including a complete Postgresql database schema for Sharadar's data.
Database design
The database design was done with the brilliant pgModeler database design tool. It's an almost-fully normalized representation of the CSV files available in the bundle. You can reasonably question whether this is the best approach: a direct translation of the content would be less space-efficient but faster on bulk queries.
Sqlalchemy
When I started this project I started writing my own simple object-database layer with
psycopg2 but the deeper I waded into the code the more it felt like a mistake. I wiped out the code and started over with
sqlalchemy's object-relational mapper (ORM), which I had not used before this project.
My goal was to build an object model on top of Sharadar's schema and use it both for bulk data loading from CSV and for querying. Again, this decision can reasonably be questioned. Extract-Transform-Load (ETL) code based off an object model is never going to be as fast as bulk data import, and the queries will similarly be less efficient than going to raw SQL. But for a hugely complex domain schema, providing a hierarchy of nice objects on top is tempting: it gives you code-completion in the IDE and covers up most of the details of how the joins work.
Connecting to TimescaleDB
Sqlalchemy works with a Session
object for database operations, which encapsulates transaction logic, among other things, so to get started we need to make a session. Serenity uses the TimescaleDB distribution of Postgresql, and as it's just a set of plugins on top of vanilla Postgresql you can connect to it using the same protocol:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
def create_sharadar_session(hostname: str = os.getenv('TIMESCALEDB_NODEPORT_SERVICE_HOST', 'localhost'),
port: int = os.getenv('TIMESCALEDB_NODEPORT_SERVICE_PORT', '5432'),
username: str = os.getenv('POSTGRES_SHARADAR_USER', 'sharadar'),
password: str = os.getenv('POSTGRES_SHARADAR_PASSWORD', None)) -> Session:
engine = create_engine(f'postgresql://{username}:{password}@{hostname}:{port}/')
session = sessionmaker(bind=engine)
return session()
so in Jupyter you can connect to localhost with just:
from serenity.equity.sharadar_api import create_sharadar_session
session = create_sharadar_session(password='********')
Mapping an entity
Sqlalchemy maps database entities to Python classes and columns & relationships to attributes. You start by defining a Base class, which should be shared across all entities in the schema:
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
You then create a class. Let's start with a simple one, Indicator -- a metadata table supplied by Sharadar with column descriptions and types for all tables:
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey
class Indicator(Base):
__tablename__ = 'indicators'
indicator_id = Column(Integer, primary_key=True)
table_name = Column(String(32))
indicator = Column(String(32))
is_filter = Column(Boolean)
is_primary_key = Column(Boolean)
title = Column(String(256))
description = Column(String(2048))
unit_type_id = Column(Integer, ForeignKey('unit_type.unit_type_id'))
unit_type = relationship('UnitType', lazy='joined')
The table name gets bound in a private field __tablename__
and each column is assigned to a Column()
object with the type and (if applicable) field length. The one exception is unit_type, which is an example of a relationship field: we'll look at those next.
Closer look: FOREIGN KEY relationships
If you want to create a many-to-one relationship you need to specify a couple of things:
- foreign key type
- foreign key $table.$fk_column_name
- type name to map
- lazy-loading strategy (optional)
unit_type_id = Column(Integer, ForeignKey('unit_type.unit_type_id'))
unit_type = relationship('UnitType', lazy='joined')
The above specifies an FK in indicators.unit_type_id which links up to unit_type.unit_type_id. Additionally, it specifies a "joined" type loading strategy, which means when you query for an Indicator entity it will automatically join the UnitType entity.
Simple queries
Let's add a finder method to Indicator which can look up an Indicator given a table_name and indicator. Note WHERE clause filters are expressed with simple Python syntax:
@classmethod
def find_by_name(cls, session: Session, table_name: str, indicator: str):
return session.query(Indicator).filter(Indicator.table_name == table_name,
Indicator.indicator == indicator).one_or_none()
Queries with complex joins
Let's get a bit fancier now and look at how joins work for a major table, Fundamentals.
f = session.query(Fundamentals)\
.join(Ticker)\
.join(DimensionType)\
.filter(Ticker.ticker == ticker,
DimensionType.dimension_type_code == dimension,
Fundamentals.calendar_date == as_of_date).one_or_none()
The same as before we specify a WHERE clause with filters based on simple Python operators, but additionally we explicitly join Ticker and DimensionType entities, defining filters on those entities as well. This is important, and a common error: sqlalchemy does not support chaining of properties to define queries, i.e. you cannot do something like Fundamental.ticker.ticker == ticker
.
Mapping custom types: Money
Sqlalchemy supports Postgresql's MONEY column type, but by default it maps the values as strings. This is far from ideal: for our object model we really want to be able to construct ratios and other combinations of fundamental data points, and it would be nice to build this in. We can accomplish this by using a combination of the Python money package and sqlalchemy's custom type definition mechanism:
from money import Money
from sqlalchemy import TypeDecorator
from sqlalchemy.dialects.postgresql import MONEY
class USD(TypeDecorator):
impl = MONEY
@property
def python_type(self):
raise Money
def load_dialect_impl(self, dialect):
return dialect.type_descriptor(MONEY())
def process_bind_param(self, value, dialect):
if value is None:
return value
else:
return str(value)
def process_literal_param(self, value, dialect):
raise NotImplementedError()
def process_result_value(self, value, dialect):
if value is None:
return value
else:
return Money(value.replace(',', '').replace('$', ''), 'USD')
Note: given SEC filings for U.S. stocks report GAAP in USD I am coercing the types to a USD type and presuming a certain MONEY string format. Creating a mapping mechanism that works for international stocks is left for later.
Creating timeseries from entities
For equity prices we want to pipe the output of our object query. To this we can use a neat trick: use a list comprehension to create a list of dicts out of each object from the resultset. Note here we convert the Money-typed open/high/low/close fields to a Decimal amount and then Pandas numeric types:
def get_equity_prices(session: Session, ticker: str) -> pd.DataFrame:
results = session.query(EquityPrice) \
.join(Ticker) \
.filter(Ticker.ticker == ticker).all()
df = pd.DataFrame([{
'date': result.date,
'open': pd.to_numeric(result.open_px.amount),
'high': pd.to_numeric(result.high_px.amount),
'low': pd.to_numeric(result.low_px.amount),
'close': pd.to_numeric(result.close_px.amount),
'volume': result.volume
} for result in results])
if df.empty:
raise KeyError(f'no data for ticker: {ticker}')
else:
df.set_index('date', inplace=True)
return df
You can then use the method to plot Tesla closing prices in Jupyter:
get_equity_prices(session, ticker='TSLA')[['close']].plot()
Connecting to Quandl
We now have a rich object model that can represent all the Sharadar entities. The final step is to write a bulk loader. For this we'll use Quandl's export table API, and Pandas read_csv to read the CSV file inside the downloaded ZIP file -- note a lesser-known feature of Pandas read_csv is that it automatically handles ZIP-format CSV and unzips on-the-fly, so you can just pass the ZIP file:
load_path = 'sharadar_tickers.zip'
quandl.export_table('SHARADAR/TICKERS', filename=load_path)
df = pd.read_csv(load_path)
for index, row in df.iterrows():
# ... create entities ...
ticker_entity = Ticker(table_name=table_name, ticker=ticker, ...)
session.add(ticker_entity)
session.commit()
As shown above, creating a new entity is as simple as constructing an object with the Base-class generated field-by-field constructor and then adding it to the session. On commit() it gets flushed to the database. For larger tables you may choose to incrementally commit:
if row_count % 1000 == 0:
session.commit()
row_count += 1
Next steps
The first iteration of the code bulk loads each table using Quandl's export table API, but to make full use of the data subscription we need a robust ETL pipeline which can incrementally load new data every day. In the next article in the series we'll look at applying Spotify's Luigi in combination with Kubernetes to create a daily batch. Finally, in Part 3 we'll revisit the Serenity backtester and show how to integrate Sharadar's data into simple equity trading strategies, leveraging Quantopian's pyfolio for performance analysis.
Happy coding!