For over a year I have been thinking about better programming models for functional reactive programming, or FRP. It's a natural paradigm for financial software, which often deals with processing rich streams of data, and so the foundation of Serenity will necessarily include it. The problem is API: while I agree with and admire many elements of the Reactive Manifesto and the RX libraries that sprang from it -- rxpy included -- I do not find the API intuitive for business logic. I have experience with another model at work, but it too has limits. What has come out of that thought process is the start of a new library, Tau, which tries to model time in code using the principles of FRP, in a Pythonic way.
Terminology
As its building blocks Tau borrows from the original literature on FRP. An event is something that happens at a moment in time, while a signal is a time-varying value; each of these has a corresponding class definition in Tau. The system also requires a means of modeling relationships between events and signals. Reactive programming generally solves this through introduction of a graph or pipeline concept plus an propagation model. A push mechanism, like Tau's, propagates events by forcing re-evaluations through the graph according to a dependency order. A pull model triggers evaluations as data is requested, again evaluating according to a dependency order. A simple formula evaluation is a good example of the latter:
c = a + b
In a pull model, computing c "pulls" the latest values from a and b, potentially cascading through their dependencies to derive values of a and b, with -- as an optimization -- some concept of a "dirty" or "invalid" flag used to decide whether a re-calculation is truly required at the time of the pull operation.
Reactive implementations also differ based on whether the dependencies are implicit -- as they are in the simple formula above -- or explicit. Tau goes with an explicit graph of dependencies, represented by an object called a Network. You can have many Networks running in multiple threads; it's just a simple object for tracking dependencies, and there is no need to relate Networks unless you want to create an "edge" linking one to another.
Finally, reactive systems generally need a scheduler. For this piece Tau reuses a battle-tested and full-featured scheduler for Python, APScheduler.
Hello, world
For the kinds of applications we'd want to build with Tau -- whether it's a metrics analyzer, an IoT device data collector or a financial application -- what we really care about are time-varying values. In Tau this is called a Signal
, which adds a get_value()
method to Event. Let's take our hello, world application and make some modifications. We will give SayHello a reference to a Signal
class which holds the name to say hello to, and we will use the Network method connect(Event, Event)
to draw a dependency line from signal to SayHello. This tells Tau that you should evaluate (or "activate") SayHello every time the signal evaluates. For convenience we'll use a concrete base class of Signal called MutableSignal
which lets us just set the value programmatically, but in theory this signal could come from a websocket, a TCP channel, a file that you're tailing, or anything else.
from apscheduler.schedulers.blocking import BlockingScheduler
from tau.core import Event, Signal, MutableSignal, Network, NetworkScheduler
class SayHello(Event):
def __init__(self, name: Signal):
super().__init__()
self.name = name
def on_activate(self) -> bool:
print(f"Hello, {self.name.get_value()}!")
return True
network = Network()
signal = MutableSignal()
network.connect(signal, SayHello(signal))
scheduler = BlockingScheduler()
network_scheduler = NetworkScheduler(scheduler, network)
network_scheduler.schedule_update(signal, "world")
scheduler.start()
To schedule the action we will use NetworkScheduler's schedule_update
call, which takes a MutableSignal, a value, and an optional trigger (ImmediateTrigger is the default). This will in turn trigger SayHello, which can then use the value to print.
Extending Tau
The above is everything you need to know about Tau to get started, but just as with RX, the true power of a reactive framework comes from the "operators" or reactive building blocks supplied with the library. As Tau is very immature right now there is nothing like the 100+ operators available with rxpy, for instance, but it does have a hierarchy of types that you can use to build your own, and wherever possible it follows the RX naming and documentation. Let's take one of them, and rewrite hello_world.py, which is a bit verbose.
from apscheduler.schedulers.blocking import BlockingScheduler
from tau.core import MutableSignal, Network, NetworkScheduler
from tau.event import Lambda
network = Network()
signal = MutableSignal()
Lambda(network, lambda x: print(f"Hello, {x[0].get_value()}!")), [signal])
scheduler = BlockingScheduler()
network_scheduler = NetworkScheduler(scheduler, network)
network_scheduler.schedule_update(signal, "world")
scheduler.start()
The key piece is introducing the Lambda class:
Lambda(network, [signal], lambda x: print(f"Hello, {x[0].get_value()}!"))
which can take any function which takes a List and returns a boolean and "cast" it into an Event that runs the function with the given parameters (typically Signals) and returns True or False depending on whether you want to propagate. Note the return False can be elided thanks to Python's default translation of a None return to False.
Can we make this even simpler? Yes! NetworkScheduler by default constructs a Network, and there's a special Signal called From which takes one or more values and schedules updates for itself with those values.
from apscheduler.schedulers.blocking import BlockingScheduler
from tau.core import NetworkScheduler
from tau.event import Lambda
from tau.signal import From
scheduler = BlockingScheduler()
network_scheduler = NetworkScheduler(scheduler)
signal = From(network_scheduler, ["world"])
action = lambda x: print(f"Hello, {x[0].get_value()}!")
Lambda(network_scheduler.get_network(), [signal], action)
scheduler.start()
Finally, we can use some syntactic sugar that's particularly useful for tests or examples, creating a context manager that encapsulates a BlockingScheduler which it automatically starts and shuts down the scheduler after executing the Network set-up code inside the code block:
from tau.event import Lambda
from tau.signal import From
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager() as scheduler:
signal = From(scheduler, ["world"])
action = lambda x: print(f"Hello, {x[0].get_value()}!")
Lambda(scheduler.get_network(), [signal], action)
Reactive math
Now that we have the basics down, we can start constructing more complex graphs that do interesting work. For instance, a common RX / reactive example takes a list of value and sums them, and prints the cumulative totals. In Tau that looks like this:
from tau.event import ForEach, From
from tau.math import RunningSum
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager() as scheduler:
network = scheduler.get_network()
values = From(scheduler, [0.0, 3.2, 2.1, 2.9, 8.3, 5.7])
total = RunningSum(network, values)
ForEach(network, total, lambda x: print(f'{x:.2f}'))
Here we're passing a list to From
and using the built-in tau.math.RunningSum
to do the total. That's nice and all, but how about generating running descriptive statistics: min, max, mean and standard deviation? We can do this with a slightly more complex graph:
from tau.event import Lambda
from tau.signal import From
from tau.math import Max, Mean, Min, Stddev
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager() as scheduler:
network = scheduler.get_network()
values = From(scheduler, [0.0, 3.2, 2.1, 2.9, 8.3, 5.7])
max_value = Max(network, values)
min_value = Min(network, values)
avg = Mean(network, values)
stddev = Stddev(network, values)
def print_stats(params):
print(f"min = {min_value.get_value()}; max = {max_value.get_value()}; "
f"avg = {avg.get_value():.2f}; stddev = {stddev.get_value():.2f}")
Lambda(network, [min_value, max_value, avg, stddev], print_stats)
The Network evaluation logic ensures that the connected nodes are evaluated in topological sort order for the graph, which means we can be assured that when the Lambda at the bottom of the graph fires it will have all computed ancestor nodes available.
Reactive buffering
With the ability to schedule dynamically and represent data flows as graphs, we can now get fancier. This example runs for 30 seconds, using an Interval operator to create an infinite series of integers timed one second apart; several Do operators to print out values; and finally two different types of Buffer operators to (1) buffer every N=2 elements; and (2) buffer every T=5 seconds. The result is a pair of streams with lists of integers which you can then process in batches.
from datetime import timedelta
from tau.event import Do
from tau.signal import Interval, BufferWithCount, BufferWithTime
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager(shutdown_delay=30) as scheduler:
network = scheduler.get_network()
values = Interval(scheduler)
Do(network, values, lambda: print(f"input values: {values.get_value()}"))
buffer1 = BufferWithCount(network, values, count=2)
Do(network, buffer1, lambda: print(f"buffer1 values: {buffer1.get_value()}"))
buffer2 = BufferWithTime(network, values, timedelta(seconds=5),
scheduler=scheduler.get_native_scheduler())
Do(network, buffer2, lambda: print(f"buffer2 values: {buffer2.get_value()}"))
Map / reduce and filtering
Tau supports the Map and Scan operators defined by RX as well, e.g. let's say you have a stream of floating point numbers which you want to round to integers and then accumulate as a running sum. In Tau you can do this with a couple lines:
from tau.event import Do
from tau.signal import From, Map, Scan
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager() as scheduler:
network = scheduler.get_network()
values = From(scheduler, [0.0, 3.2, 2.1, 2.9, 8.3, 5.7])
mapper = Map(network, values, lambda x: round(x))
accumulator = Scan(network, mapper)
Do(network, accumulator, lambda: print(f"{accumulator.get_value()}"))
Note Tau does not currently support the Reduce operator because all series in Tau are infinite: you cannot "close" a Signal, for instance, and so the distinction between Scan (a running accumulation) and a reduce (a sum of all values at the end) does not make sense.
You can similarly apply a predicate rather than a mapping, and filter values. Here's how you'd print out all positive values from a stream, using the Filter operator:
from tau.event import Do
from tau.signal import From, Filter
from tau.testing import TestSchedulerContextManager
with TestSchedulerContextManager() as scheduler:
network = scheduler.get_network()
values = From(scheduler, [0.0, -3.2, 2.1, -2.9, 8.3, -5.7])
filt = Filter(network, values, lambda x: x >= 0.0)
Do(network, filt, lambda: print(f"{filt.get_value()}"))