State
States are persistent objects that facilitate the collection and retrieval of data generated by streams of records. With states, dataflows can build tables, compute aggregates, join datasets, perform anomaly detections, etc.
In technical terms, states are materialized following CQRS architecture, where a builder service reads a stream of records and creates a materialized view for one or more readers. The builder service is separate from the reader services.
State Types
States are key/value
types, defined as the keyword keyed-state
, where:
- key is the unique identifier for each object in the state. The key must be a
string
. - value can be primitive or arrow-row.
You should choose between primitive and arrow-row based on the data types and the operations you intend to perform on the state. Use primitive values to store nested objects for key-based look-ups, and arrow-row values to join datasets or perform SQL operations.
Primitive Value
The primitive
values allows you to define objects using primitive types definitions. You may think of primitive states as key/value stores, such as MongoDB. The advantages of this format is that it allows hierarchical objects. The following example shows a state object where the value is a 32 bit unsigned integer.
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: primitive
properties:
value: i32
Note: In this beta release, the system only supports i32
, with full support for hierachical objects coming soon.
Arrow-Row Value
The arrow-row
values are defined using arrow types definitions. The arrow-row states are stored in arrow dataframe format. You may think of an arrow state as databases, such as Postgres. The advantage of this format is that is accessible by third-party libraries such as Polars. Anarrow-row
state is defined as follows:
states:
temperature:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
sensor:
type: string
temperature:
type: f32
Check out the types section for addition information on arrow types.
State Definition & Usage
The state objects are defined in the states
section of an sdf-package.yaml file, or inside a service
section in the dataflow.yaml file. States defined in a package file are imported in a dataflow file.
Inline Definitions (dataflow.yaml)
Inline states are defined in a dataflow.yaml file. We'll use an example to show how to use define and use states in a dataflow file.
In most cases, a state processing operation has 3-steps
in 2 different services:
Step 1: Define State
In this example, we define a count-words
service and a count-per-word
state. As this service defines the state, it is also responsible for updating it.
services:
count-words:
states:
count-per-word:
type: keyed-state
properties:
key:
type: string
value:
type: arrow-row
properties:
count:
type: i32
Note, that state type could have been defined in the types
section.
Step 2: Update State
Service responsible for updating the state object must have an update-state
operator. This is where the state is updated.
update-state:
run: |
fn increment_word_count(word: String) -> Result<()> {
let mut count = count_per_word();
count.value += 1;
count.update()?;
Ok(())
}
In this example we look-up the previous count, increment it and update it.
Except for window processing where the service concludes with a flush
operation, the update_state
operation is the last operation of the service.
Step 3: Lookup State
Next we'll define a service that listens to a data stream of words and returns their count. This service is a separate flow triggered by another data stream.
In this example, we define a service lookup-word
that receives words from a data stream, looks-up each word in count-per-word
state, and converts the result into a json
value.
lookup-word:
states:
count-per-word:
from: count-words.count-per-word
transforms:
- operator: map
run: |
run: |
fn query_word_count(word: String) -> Result<WordCount> {
let df = count_per_word();
let count = df.sql(&format!("select * from `count-per-word` where _key = '{}'", word))?;
let rows = count.rows()?;
let columns = count.schema(["_key","count"])?;
match &columns[..] {
[k,v] => {
if rows.next() {
let c = rows.i32(&v)?;
return Ok(WordCount{word, count: c})
} else {
return Ok(WordCount{word, count: 0})
}
},
_ => panic!("unexpected schema"),
}
}
When a service accesses a state, it needs to define the target location, which in our example is count-words
. Use an operator map
to match the word
from the stream with a value from count_per_word
. If the value exists, the result is the word and associated count, otherwise zero.
The look-up sends the result to the next stage of data data pipeline.
States & Packages
State can be defined in a package sdf-package.yaml file. The apporach allows you to implement and test states independently from the dataflow.yaml file.
We'll use an example to show how to implement, test, and import a state from a package file.