Data Modeling for Streams

There is lots of information on stream processing and batch processing, but surprisingly there is not a lot of information on how to model streams in data. This post goes through a few common patterns for modeling streams, and the tradeoffs between each.

What is a stream?

Streams are often defined with the concept of a table, where a table is the state of some entity at a given time and a stream is a view of how that data changes over time. I find this definition a little confusing because the term “table” is overloaded with other terms like a database table. In this definition a “table” is defined more generally where any time we materialize the results of a stream, we create a table, whether that’s in memory or saved in some database in persistent storage. This is confusing because without context, you might think that this means that database tables are different than streams at the core, but this is not true. Database tables are just an implementation detail and we can implement a stream with database tables (like in this post) if we prefer.

Instead, I like to view a stream as a sequence of immutable events. This definition is much more general, but I find it more natural than the definition described above. When viewing streams in this manner, this entire post is really about modeling your application with immutable data.

Stream as a history

This pattern first defines some entity's current state as a table and then stores an additional table that stores the way in which this table changes over time. The latter table is our stream.

For example, let's consider an advertising platform where the advertiser can set daily budgets for their campaigns where they are charged per click of each ad.

The campaigns table stores what the current state of that advertiser campaign is at any given time, while the campaign_histories table stores change the advertiser made to that campaign over time. This pattern becomes quite useful for offline batch processing, for example determining when to disable an advertiser's campaign.

If a user clicks on an ad, we can emit the campaign_history_id corresponding to the current state of the campaign with each event. This allows us to view what the advertiser's daily budget was at the time each event was emitted. Our offline service can then periodically store how much budget is left so the server will know whether or not to serve an ad to the user or not.

This example shows a case where the stream is essential for the offline process but is not particularly useful for the main service that creates this data (the CMS that the advertiser manages their campaign in). This allows the CMS to model their data that is convenient to their own access patterns while enabling offline processes to perform operations they need to. It is also useful for other use cases such as simply giving developers debug information, or the ability to reverse certain operations.

Stream as offsets

We can also model our stream events as individual offsets of the current state of the base entity. In CQRS and event-sourcing models, this becomes quite natural. For example, consider managing a user's cart in an e-commerce store. We could model a user's cart, by having some table that stores the current state of the cart at any given time. But for analytics purposes, we want to store all the user actions the user took to analyze their behavior. This is where this model would be advantageous.

We can model this like so

The shopping_carts table stores the current state of the cart at any given time, while the add_to_carts table store each user action the user took. If a user adds an item to the cart, it will create an add to cart record with that product_id with the desired quantity. If a user removes the item to the cart, it will make an add_to_cart record with that product_id with a negative amount to denote the item was removed.

To determine the current state of the cart, we can accumulate the add_to_cart records for that shopping_cart_id. For example, we can perform the following query

SELECT sum(item_cost * quantity) as cart_subtotal
FROM add_to_carts
WHERE shopping_cart_id = 1

Assuming users perform few actions before placing their order, determining the current state of the cart should be quite inexpensive as you are aggregating over a very small number of records. This is sufficient for our main e-commerce application to perform the operations it needs to do. But if we prefer, we can denormalize this information on the base shopping_carts table to avoid having to perform this computation at read time.

Offline processes can now analyze user add_to_cart behavior in tandem with other engagement data. For example, a brand might want to know what quantity of items a user adds to their cart immediately after viewing an ad for it. They can then compare and contrast this information with organic sales, to determine if the ad influenced the user into increasing order quantity. This is where storing events as an offset is useful, where the actual amount in which something changed matters to some process.

The disadvantage of this pattern over the histories pattern is that we need to perform computation when we have to determine the state of some entity at a specific point in time. In cases like above, this is reasonable because the aggregate query never runs on a large number of records. Other use cases might have different needs, and this is where a hybrid approach may be useful.

Hybrid

Consider a financial system where a user has an account and a list of transactions.

The transactions table is now our stream, where it stores the amount in which the account's balance changes with each transaction, as well as the new account balance after the transaction is performed. This combines properties of both patterns above to get the best of both worlds at the cost of storing more data. When working with small amounts of data, this tradeoff doesn't really mean much, but it matters a lot more when you need to process GBs/TBs of data. More data means higher computation costs, resulting from a combination of moving this data around as well as the actual results we want to compute.

One particular thing to note with this pattern is that to enforce determinism in processing our stream, we require there to be some ordering of our transactions. In the example with the shopping cart, our operations are associative, so it doesn’t matter the order in which we process the events. The subtotal of the cart will be the same no matter the order in which we sum the results of each add-to-cart event. But in that example, we don’t really care what the subtotal of the cart was at a specific point in time. We only care about the subtotal of the cart when the user places their order. In our financial system, we would like to know what the user’s account balance was at any given time.

This means that each event needs to be tied to the state of the account after applying that transaction. To enforce this, we need to apply some natural ordering to the user’s transactions. In practice this is quite simple, we can use DateTimeUTC as our main ordering key and break any potential ties with transaction_id. This ensures that we have some deterministic ordering. The downside however is that without associativity, we are not free to arbitrarily parallelize operations on our stream. Luckily for our specific example, we don’t have a real need to support this functionality anyways.

Stream without a base entity

Thus far our patterns have explored defining a stream as a list of changes to some base entity over time, while storing the current state of the base entity at any given time. Sometimes we don’t need a base entity at all and simply define the stream on its own. The entity on its own, might lend itself naturally to the properties of a stream.

For example, consider something like Facebook messenger where you cannot edit messages once they are sent. The messages themselves are immutable so we can model this as a stream of messages

Here we only have one table representing our entity. The entity by nature is immutable which allows it to be modeled as a stream from the get go.

Previous
Previous

Data Storage Hardware Guide for Software Engineers

Next
Next

Snowflake to Postgres ETL