Event stream processing unplugged - part 1
Introduction
- The basics of unbounded event processing
- Functional reactive programming introducing concepts like map, filtering, groupBy, flatMap
- Constructing directd acyclic execution graphs
- Managing event streams of heterogenous types
- Imperative event programming, integrating application code
- Input and output
- Building, testing and deploying
- Auditing, logging and debugging event execution flow
Unplugged from what?
- Threads - No RxJava, Flow, co-routines
- Messaging infrastructure - No kafka streams, Amazon kinesis
- Grid processing - No Hazelcast jet, Akka streams
Real world trading experience
- If you are involved with algorithmic trading you will probably come across this pattern
- Almost all banks/funds have some home grown stream processing derivative they use
- The learning curve is steep but worth it
- Very useful once understood, the danger is all problems are nails for the stream processing hammer
- Hand coding listeners/events becomes untenable over time creating a large technical debt. Forcing the use of a stream processing library.
- There is usually an existing messaging and threading infrastructure that cannot be changed. Processing logic engines must integrate easily
- Single threaded applications with little shared state reading from lock free queues are generally faster,
- Applications that are driven purely from events, are deterministic, easier to reason about, simpler to test and replay from an hsitoric data source
- Determining event flow in the processing engine requires tool support, without it developer productivity plummets.
Hello world event stream
To get started here is five minute hello world event stream primer, before exploring a more complex example.
Listen to two unbounded streams of data, extract a value from a stream when a new event is received. Apply a binary function to the two values, in this case add. If the sum is greater than 100 then log the value to the console. The function is stateless but the streams move independently and must preserve the last value received to apply as an argument to the function. The function and dependent filter are exercised if either event stream updates.
Code example
01 public class HelloWorld {
02 public static void main(String[] args) {
03 //builds the EventProcessor
04 EventProcessor eventProcessor = Fluxtion.interpret(cfg -> {
05 var data1Stream = subscribe(Data1.class)
06 .console("rcvd -> {}")
07 .mapToDouble(Data1::value);
08
09 subscribe(Data2.class)
10 .console("rcvd -> {}")
11 .mapToDouble(Data2::value)
12 .map(Double::sum, data1Stream)
13 .filter(d -> d > 100)
14 .console("OUT: sum {} > 100");
15 });
16 //init and send events
17 eventProcessor.init();
18 //no output < 100
19 eventProcessor.onEvent(new Data1(20.5));
20 //no output < 100
21 eventProcessor.onEvent(new Data2(63));
22 //output > 100 - log to console
23 eventProcessor.onEvent(new Data1(56.8));
24 }
25
26 public record Data1(double value) {
27 }
28
29 public record Data2(double value) {
30 }
31 }
Execution output
1 rcvd -> Data1[value=20.5]
2 rcvd -> Data2[value=63.0]
3 rcvd -> Data1[value=56.8]
4 OUT: sum 119.8 > 100
Comparison to java 8 streams
- The data is unbounded, in java streams finite sets of data are processed and a terminal operation triggers the processing.With the event processor a new event initiates a process cycle.
- The event processor is long lived and state is held in the graph, such as the last event value recieved. Streams are essentially stateless objects that are transient and cannot be re-used.
- Streams are single pipelines with a single execution path, an event processor is a graph of nodes with an arbitrary number of execution paths.
- Streams only handle a single event type, the event processor supports an arbitrary number of input event types.
- Event processors have a lifeycle, they can be started and stopped streams are one shot in operation.
Processing graph
The hello world event processor graph as a diagram:
Fluxtion
Realtime trade calcualtion
Requirements
- Publish to an application component, mark to market, positions and overall profit for a set of traded instruments.
- Values are updated whenever there is a new trade, an instrument price update or a reset action
- Batches of trades should result in a single update.
- The system supports a reset action returning all values to their intial state.
- Only publish values that have changed
Input events from the application
TradeInstrument prices
Reset signal
Outputs published to listeners
Calculations
- One FX trade creates two position chnages one for each currency, positive for a buy negative for a sell.
- Currency positions from each trade contributes to a cumulative position for that currency.
- The position for a currency is used to calculate the market value relative to another currency, in this case USD.
- If no currency->USD rate is available the mark to market value is NaN for that currency position.
- The sum of all currency mark to market values is the overall protit or loss of the trading position.
Solution
- Building the graph - describes and constructs an instance of the event processor that supports the business logic.
- Public service methods - a set of methods for use by the enclosing application, each method results in one or more events pushed to the graph.
- Static helper functions - stateless functions that are invoked by various nodes in the event processor
Building the graph
01 private void buildProcessor(SEPConfig config) {
02 var resetTrigger = subscribeToSignal("reset");
03 var publishTrigger = subscribeToSignal("publish");
04
05 var assetPosition = subscribe(Trade.class)
06 .flatMap(Trade::tradeLegs)
07 .groupBy(TradeLeg::id, TradeLeg::amount, Aggregates.doubleSum())
08 .resetTrigger(resetTrigger);
09
10 var assetPriceMap = subscribe(PairPrice.class)
11 .map(TradingCalculator::toCrossRate)
12 .groupBy(Trade.AssetPrice::id, Trade.AssetPrice::price)
13 .resetTrigger(resetTrigger);
14
15 var posDrivenMtmStream = assetPosition.map(GroupByStreamed::keyValue)
16 .map(TradingCalculator::markToMarketPosition, assetPriceMap.map(GroupBy::map))
17 .updateTrigger(assetPosition);
18
19 var priceDrivenMtMStream = assetPriceMap.map(GroupByStreamed::keyValue)
20 .map(TradingCalculator::markToMarketPrice, assetPosition.map(GroupBy::map))
21 .updateTrigger(assetPriceMap);
22
23 //Mark to market to sink as a map
24 var mtm = posDrivenMtmStream.merge(priceDrivenMtMStream)
25 .groupBy(KeyValue::getKey, KeyValue::getValueAsDouble)
26 .resetTrigger(resetTrigger)
27 .map(GroupBy::map)
28 .updateTrigger(publishTrigger)
29 .filter(Predicates.hasMapChanged())
30 .sink("mtm");
31
32 //Positions to sink as a map
33 assetPosition.map(GroupBy::map)
34 .updateTrigger(publishTrigger)
35 .filter(Predicates.hasMapChanged())
36 .sink("positions");
37
38 //sum of mtm is profit
39 mtm.mapToDouble(TradingCalculator::totalProfit)
40 .filter(Predicates.hasDoubleChanged())
41 .sink("profit");
42 }
line 2-3 Because data is always live we sometimes need to override when events are propogated from a node. The subscribitions connect a node to a signal event, keyed by a string key. These nodes are used later in the graph for triggering publishing and resetting behaviour.
line 5-8 Publishes a positon map. Subscribes to Trade objects and uses a flatmap to create an iteration over the trade legs. A groupby maps with a cumulative sum function for each key and stores the result in a map. This is the whole position calculation. GroupBy is a stateful node, holding the keys and values in a map. GroupBy node can be reset and the underlying map is cleared, the reset trigger is connected to the reset signal node above.
line 10-12 Publishes an asset price to base currency map. Subscribes to PairPrice and uses groupBy to partition into a map keyed by asset currency. A map function calls into a static user function TradingCalculator#toCrossRate to calculate the mark to market rate.
line 15-17 Creates a stream of mark to market updates for an asset, triggered when the assetPosition has updated. TradingCalculator#markToMarketPosition is a stateless binary function and would trigger if either input changes, so we override the update trigger
line 19-21 Creates a stream of mark to market updates for an asset, triggered when the assetPriceMap has updated
line 24-30 Publishes the mark to map to a sink when there is a change in the map. The two market to market streams are merged into a single update stream. The update is merged into a map, and the whole map is published downstream(line 27) to an extrnal sink. A filter gates the update to the sink and checks for a change to the previous published version. Triggers for resetting and overriding the update are provided. Batches of trades are handled the update is published when the batch is finished processing.
line 33-36 Publishes position map to a sink. Update trigger is overriden and changed filter is applied to the output.
A note on input and output:
- Subscriptions feed data from the application into the event processor
- Sinks push data from a node in the event processor to an external consumer
Mapping to events
01 public TradingCalculator() {
02 streamProcessor = Fluxtion.interpret(this::buildProcessor);
03 streamProcessor.init();
04 }
05
06 public void processTrade(Trade trade) {
07 System.out.println("\nrcvd trade -> " + trade);
08 streamProcessor.onEvent(trade);
09 streamProcessor.publishSignal("publish");
10 }
11
12 public void priceUpdate(PairPrice price) {
13 System.out.println("\nrcvd price -> " + price);
14 streamProcessor.onEvent(price);
15 streamProcessor.publishSignal("publish");
16 }
17
18 public void reset() {
19 System.out.println("\nreset");
20 streamProcessor.publishSignal("reset");
21 streamProcessor.publishSignal("publish");
22 }
23
24 public void markToMarketConsumer(Consumer<Map<String, Double>> listener) {
25 streamProcessor.addSink("mtm", listener);
26 }
27
28 public void positionsConsumer(Consumer<Map<String, Double>> listener) {
29 streamProcessor.addSink("positions", listener);
30 }
31
32 public void profitConsumer(DoubleConsumer listener) {
33 streamProcessor.addSink("profit", listener);
34 }
Helper functions
01 public static KeyValue<String, Double> markToMarketPrice(
02 KeyValue<String, Double> assetPrice, Map<String, Double> assetPositionMap) {
03 if (assetPrice == null || assetPositionMap.get(assetPrice.getKey()) == null) {
04 return null;
05 }
06 return new KeyValue<>(assetPrice.getKey(), assetPositionMap.get(assetPrice.getKey()) * assetPrice.getValue());
07 }
08
09 public static KeyValue<String, Double> markToMarketPosition(
10 KeyValue<String, Double> assetPosition, Map<String, Double> assetPriceMap) {
11 if (assetPosition == null) {
12 return null;
13 }
14 if (assetPosition.getKey().equals(baseCurrency)) {
15 return new KeyValue<>(assetPosition.getKey(), assetPosition.getValue());
16 }
17 if(assetPriceMap == null){
18 return new KeyValue<>(assetPosition.getKey(), Double.NaN);
19 }
20 return new KeyValue<>(
21 assetPosition.getKey(),
22 assetPriceMap.getOrDefault(assetPosition.getKey(), Double.NaN) * assetPosition.getValue());
23 }
24
25 public static double totalProfit(Map<String, Double> m) {
26 return m.values().stream().mapToDouble(Double::doubleValue).sum();
27 }
28
29 public static AssetPrice toCrossRate(PairPrice pairPrice) {
30 if (pairPrice.id().startsWith(baseCurrency)) {
31 return (new AssetPrice(pairPrice.id().substring(3), 1.0 / pairPrice.price()));
32 }
33 return (new AssetPrice(pairPrice.id().substring(0, 3), pairPrice.price()));
34 }
Running the example
Main program
01 public static void main(String[] args) {
02 TradingCalculator tradingCalculator = new TradingCalculator();
03 //add listeners for output
04 tradingCalculator.markToMarketConsumer(
05 m -> System.out.println("Asset mark to market\t:" + m));
06 tradingCalculator.positionsConsumer(
07 m -> System.out.println("Asset positions\t\t\t:" + m));
08 tradingCalculator.profitConsumer(
09 d -> System.out.println("Total trading profit\t:" + d));
10 //send trades and rates
11 tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
12 tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
13 tradingCalculator.processTrade(Trade.sold("EURCHF", 120d, 100d));
14 tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
15 tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.2));
16 tradingCalculator.processTrade(Trade.bought("GBPJPY", 20d, 26000d));
17 tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.0));
18 //reset
19 tradingCalculator.reset();
20 //trade batch after reset
21 tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
22 tradingCalculator.priceUpdate(new PairPrice("GBPUSD", 1.25));
23 tradingCalculator.priceUpdate(new PairPrice("USDJPY", 202));
24 tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.25));
25 tradingCalculator.processTrades(
26 Trade.bought("EURUSD", 20d, 11d),
27 Trade.bought("GBPJPY", 20d, 26000d),
28 Trade.sold("EURCHF", 120d, 100d)
29 );
30 }
Execution output
- Position calculations
- Mark to makrket calcualtions
- Total profit calculation
- Only publishing changes to sinks
- Resetting to zero state
- Handling sets of trades as a batch
01 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
02 Asset mark to market :{EUR=NaN, USD=-130.0}
03 Total trading profit :NaN
04 Asset positions :{EUR=250.0, USD=-130.0}
05
06 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
07 Asset mark to market :{EUR=NaN, USD=-260.0}
08 Asset positions :{EUR=500.0, USD=-260.0}
09
10 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
11 Asset mark to market :{CHF=NaN, EUR=NaN, USD=-260.0}
12 Asset positions :{CHF=100.0, EUR=380.0, USD=-260.0}
13
14 rcvd price -> PairPrice[id=EURUSD, price=1.5]
15 Asset mark to market :{CHF=NaN, EUR=570.0, USD=-260.0}
16
17 rcvd price -> PairPrice[id=USDCHF, price=1.2]
18 Asset mark to market :{CHF=83.33333333333334, EUR=570.0, USD=-260.0}
19 Total trading profit :393.33333333333337
20
21 rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
22 Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=570.0, GBP=NaN, USD=-260.0}
23 Total trading profit :NaN
24 Asset positions :{CHF=100.0, JPY=-26000.0, EUR=380.0, GBP=20.0, USD=-260.0}
25
26 rcvd price -> PairPrice[id=EURUSD, price=1.0]
27 Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=380.0, GBP=NaN, USD=-260.0}
28
29 reset
30 Asset mark to market :{}
31 Total trading profit :0.0
32 Asset positions :{}
33
34 rcvd price -> PairPrice[id=EURUSD, price=1.5]
35
36 rcvd price -> PairPrice[id=GBPUSD, price=1.25]
37
38 rcvd price -> PairPrice[id=USDJPY, price=202.0]
39
40 rcvd price -> PairPrice[id=USDCHF, price=1.25]
41
42 Trade batch - start
43 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=20.0], contra=TradeLeg[id=USD, amount=-11.0]]
44 rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
45 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
46 Trade batch - complete
47 Asset mark to market :{CHF=80.0, JPY=-128.7128712871287, EUR=-150.0, GBP=25.0, USD=-11.0}
48 Total trading profit :-184.7128712871287
49 Asset positions :{CHF=100.0, JPY=-26000.0, EUR=-100.0, GBP=20.0, USD=-11.0}
50
51 Process finished with exit code 0
Comparing to an imperative solution
As with any imperative versus functional approach the same arguments hold true, where appropriate functional solutions are advantageous. Functional solutions are not always the best fit for every problem when compared to imperative approaches but in this case I think the balance is in favour of the functional style.
State
Dispatch
Triggering
Control signals
Testing
Composing complexity
Complexity and functions are composed in the construction phase, This is easy to alter and update as the pattern is well understood. In the imperative solution is freeform, any approach could have been formulated and built. This will require time to understand and fix bugs for anyone unfamiliar with the code base.
Conditional logic
There are no explicit conditional statements in the functional solution. In the imperative approach the code would be littered with if/else statements, each of these has to be understood and behave correctly. Poor conditional logic is a rich source of bugs
Reasoning about the logic
Because a repeatable construction pattern is followed there is virtually no learning curve for a new developer to understand the code and fix bugs. Because the triggering and order of execution are predictable there is no need to understand a custom event dispatch solution, again making the learning curve easier.
Code size
The core imperative solutino maybe smaller in code, but the other requirements such as listener registration, filters, state management, declaraing variables, exposing methods for testing etc. will create code noise and probably make the solution larger than the functional definition
Conclusion
I hope this article has created interest in event stream programming and demonstrated its use can be quite easy if we uncouple non-core concerns. Comparing the imperative approach to the functional approach shows the value of former in this case. As complexity rises the functional case becomes compelling.
Please download the example and play with it, I welcome any feedback on the library.
Comments
Post a Comment