Apache Flink: Architecture, State Management, and Why Everyone Uses RocksDB
A deep dive into Apache Flink's architecture — how it processes unbounded streams, manages state with RocksDB, checkpoints for fault tolerance, and why Kafka Streams and Apache Samza converged on the exact same design.
Apache Flink processes billions of events per day at Alibaba (4.5 billion events during 11.11 — a single 24-hour shopping event), Netflix, Uber, and LinkedIn. It is not just a streaming framework. It is a stateful computation engine — and understanding the "stateful" part is what separates a surface-level understanding from one that lets you actually design and operate these systems.
This post covers how Flink works: its core architecture, how it manages state, why it uses RocksDB as the local storage layer, how checkpointing achieves fault tolerance, and why Kafka Streams and Apache Samza independently converged on the same RocksDB-based design.
What Flink Is
Apache Flink is a distributed framework for processing unbounded data streams in real time. Three things define it:
Unbounded: the input never ends. Unlike batch processing, which has a defined start and finish, a stream is an infinite sequence of events. Flink processes each event as it arrives rather than waiting for a complete dataset.
Stateful: Flink operators can remember information across events. A deduplication operator remembers which IDs it has seen. A windowed aggregation remembers a running count. Without state, a stream processor can only filter and transform individual events — useful, but limited.
Fault-tolerant: Flink can recover from crashes without losing processed state or reprocessing events from the beginning. This is the hard part, and it is what RocksDB and checkpointing solve.
Three APIs
Flink exposes three levels of abstraction:
| API | Abstraction | When to use |
|---|---|---|
| DataStream API | Individual events and operators | Full control, complex stateful logic |
| Table API / SQL | Relational queries over streams | Analytics, aggregations, joins |
| CEP (Complex Event Processing) | Pattern matching across event sequences | Fraud detection, anomaly detection |
Core Architecture
Flink has two types of processes: the JobManager and the TaskManagers.
┌─────────────────────────────────────────────────────────┐
│ JobManager │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ Scheduler │ │ Checkpoint │ │ ResourceMgr │ │
│ │ (DAG→tasks) │ │ Coordinator │ │ (slots) │ │
│ └──────────────┘ └──────────────┘ └───────────────┘ │
└─────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ TaskManager 1│ │ TaskManager 2│ │ TaskManager 3│
│ │ │ │ │ │
│ [slot][slot]│ │ [slot][slot]│ │ [slot][slot]│
│ [slot][slot]│ │ [slot][slot]│ │ [slot][slot]│
│ │ │ │ │ │
│ RocksDB │ │ RocksDB │ │ RocksDB │
│ (local disk)│ │ (local disk)│ │ (local disk)│
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└───────────────────┴───────────────────┘
│
▼
┌─────────────────┐
│ Checkpoint │
│ Storage │
│ (S3 / HDFS) │
└─────────────────┘
JobManager
The single master process. Its responsibilities:
- Scheduling: Receives a job (a DAG of operators), breaks it into parallel tasks, and assigns tasks to TaskManager slots
- Checkpoint coordination: Triggers snapshots of distributed state at configurable intervals
- Failure recovery: When a TaskManager crashes, reassigns its tasks and restores state from the last checkpoint
- High availability: In production, JobManager runs in active/standby mode — ZooKeeper or Kubernetes coordinates the failover
TaskManagers
Worker processes, one per machine or container. Each TaskManager has a fixed number of task slots. Each slot runs one parallel instance of a Flink operator.
Parallelism is the key scaling concept: if a job is configured with parallelism 8, Flink runs 8 parallel instances of each operator, spread across available slots. For stateful operators keyed by ad_id, Flink consistently routes all events with the same ad_id to the same parallel instance — this is what makes per-key state correct.
The Flink Job Graph
A Flink job is a directed acyclic graph (DAG) of operators connected by streams.
Source (Kafka)
│ stream of AdClickEvent
▼
Operator: Deduplication
│ stream of AdClickEvent (deduplicated)
▼
Operator: Enrichment
│ stream of EnrichedClickEvent
▼
Operator: Window Aggregation (1-min tumbling)
│ stream of AggregatedClickMetric
▼
Sink (ClickHouse)
Each edge is a network shuffle when the key changes between operators (events are rerouted to the correct parallel instance), or a local forward when the parallelism and key are the same (no network hop).
Event Time vs Processing Time
Flink distinguishes between when an event happened and when the system processed it:
- Event time: the timestamp embedded in the event itself (when the click occurred)
- Processing time: when the Flink operator receives the event
Event-time processing is correct but requires watermarks. Processing-time is simpler but wrong for out-of-order events.
Watermarks are progress markers injected into the stream. A watermark with value T asserts that all events with event_time < T have arrived. When a watermark crosses a window boundary, Flink fires the window aggregation. Events arriving after the watermark — late data — are handled by a configurable allowed lateness or routed to a side output.
Stream timeline (event_time):
[10:00:05] [10:00:12] [10:00:31] [10:00:58] [10:01:02] ← late by 4s for 10:00 window
↑
Watermark: 10:00:32 (max_event_time - 30s)
Window 10:00–10:01 fires when watermark passes 10:01:00
Late event [10:00:58] arrives at 10:01:34 → watermark is 10:01:04 → within 30s allowed lateness → included
Late event [09:59:55] arrives at 10:01:40 → beyond 30s → side output
State in Flink
State is what makes Flink more than a transformation pipeline. Two types:
Keyed State
State that is scoped to a key. Every event is associated with a key (e.g. ad_id). Flink routes all events with the same key to the same parallel operator instance. That instance holds state only for its assigned keys.
State primitives available per key:
| Primitive | Stores | Use case |
|---|---|---|
ValueState<T> | A single value | Current count, last seen timestamp |
ListState<T> | An ordered list of values | Event buffer for joins |
MapState<K, V> | A key-value map | Per-country click counts |
ReducingState<T> | A single value, continuously reduced | Running sum |
AggregatingState<IN, OUT> | A single aggregated value with custom logic | HyperLogLog accumulator |
Operator State
State scoped to an entire operator instance, not a key. Used mainly by source operators to track consumption position.
Example: Kafka source offset tracking
Each Kafka source operator instance consumes a subset of Kafka partitions. Its operator state stores the current offset for each assigned partition. On checkpoint, these offsets are snapshotted. On recovery, the source resumes from the checkpointed offsets — not from the beginning of the topic.
Operator State schema (Kafka source):
{
"partition_offsets": {
"raw-clicks-0": 8294710,
"raw-clicks-1": 8301042,
"raw-clicks-2": 8288934,
"raw-clicks-3": 8305201
}
}
RocksDB: The State Backend
This is the engine that makes large-scale stateful processing possible.
The Problem Without RocksDB
Flink needs to store keyed state somewhere. The naive approach: a Java HashMap in the TaskManager's JVM heap.
Problems with heap state:
- Bounded by RAM. A fraud detection job tracking 50 million active user sessions cannot fit in heap memory
- GC pressure. Large heaps mean long garbage collection pauses — pauses that stall the stream processing pipeline
- Not durable. A JVM crash loses all in-memory state. Recovering requires reprocessing from the beginning of the Kafka topic, which could mean hours of catch-up
What RocksDB Provides
RocksDB is an embedded key-value store developed at Facebook. "Embedded" means it runs inside the TaskManager process — there is no separate server, no network hop for state reads.
It is built on an LSM tree (Log-Structured Merge-tree):
Write path:
Event → MemTable (in-memory write buffer)
│ when full (default 64MB)
▼
L0 SST file (on disk, immutable)
│ background compaction
▼
L1 → L2 → ... SST files (sorted, merged)
Read path:
Key lookup → check MemTable → check L0 → check L1 → ... (Bloom filter skips most levels)
Why this works for Flink:
- Hot keys (recently accessed) stay in the MemTable — RAM speed
- Cold keys spill to disk automatically — no memory limit
- Writes are sequential (append to MemTable, flush to SST) — fast even on spinning disk
- Bloom filters on each SST file make reads fast: if a key is not in a level, the bloom filter says so without reading the file
How Flink Maps State to RocksDB
Each Flink TaskManager that runs a stateful operator gets its own RocksDB instance on local disk. Flink encodes state as RocksDB key-value pairs with a specific layout:
RocksDB key structure:
┌──────────────────┬──────────────────┬───────────────┬────────────────┐
│ key_group_id │ serialised_key │ namespace │ state_name │
│ (2 bytes) │ (variable) │ (window ID) │ (variable) │
└──────────────────┴──────────────────┴───────────────┴────────────────┘
RocksDB value:
┌──────────────────────────────────────────────┐
│ serialised_state_value │
└──────────────────────────────────────────────┘
key_group_id: Flink divides the key space into groups and assigns groups to parallel instances. When you rescale (add more parallelism), groups are redistributed. This is what enables rescaling without full state reinitialization.namespace: For windowed state, this is the window start and end time. The samead_idkey has separate state entries for each active window.state_name: Identifies which named state variable this entry belongs to (e.g."click_count","device_map").
Checkpointing: Fault Tolerance Without Reprocessing
Checkpointing is how Flink achieves exactly-once processing semantics despite failures.
How It Works: Chandy-Lamport Distributed Snapshots
Flink's checkpointing is based on the Chandy-Lamport algorithm for consistent distributed snapshots. The key insight: by injecting a special marker (checkpoint barrier) into the data stream, you can capture a consistent snapshot of distributed state without stopping the pipeline.
Step 1: JobManager triggers checkpoint N
Step 2: Checkpoint barrier injected into Kafka source partitions
Step 3: Barrier flows through the operator graph alongside data
Source ──[data][data][BARRIER N][data]──→ Operator A
│
when BARRIER arrives:
1. snapshot own state to S3
2. forward BARRIER downstream
│
▼
Operator B
│
same: snapshot → forward
│
▼
Sink
Step 4: When all operators have acknowledged, JobManager marks checkpoint N complete
Step 5: On failure → restore all operators from checkpoint N state + replay Kafka from checkpoint N offsets
Incremental Checkpoints with RocksDB
A naive checkpoint snapshots the entire RocksDB state to S3. For large state (hundreds of GB), this is expensive and slow.
RocksDB's SST files are immutable once written. Flink exploits this: an incremental checkpoint only uploads SST files that are new or changed since the last checkpoint.
Checkpoint 1: upload SST files A, B, C (full snapshot)
Checkpoint 2: SST file D is new → upload only D
Checkpoint 3: compaction merged C + D → E → upload E, mark C and D as no longer needed
Checkpoint 4: no new files → upload nothing (only metadata updated)
For a 100GB state store with low churn, incremental checkpoints may upload only a few MB per checkpoint interval. This makes 30-second checkpoint intervals practical even at large scale.
Checkpoint Configuration
yaml
Kafka Streams: The Same Pattern
Kafka Streams is an embedded stream processing library — it runs inside your application process, not on a separate cluster. Despite this architectural difference, it made the same core state management decision as Flink: local RocksDB backed by a Kafka changelog topic.
State Store Architecture
Kafka Streams Application (JVM process)
Stream Thread 1
├─ Task 0-0 (consumes partition 0)
│ └─ StateStore: RocksDB instance at /tmp/kafka-streams/store-name/0-0/
│ backed by Kafka topic: store-name-changelog (partition 0)
│
└─ Task 0-1 (consumes partition 1)
└─ StateStore: RocksDB instance at /tmp/kafka-streams/store-name/0-1/
backed by Kafka topic: store-name-changelog (partition 1)
Durability: Changelog Topics vs Checkpoints
The key difference between Flink and Kafka Streams is how they make RocksDB state durable:
| Aspect | Apache Flink | Kafka Streams |
|---|---|---|
| Durability mechanism | Checkpoint snapshots to S3/HDFS | Write-ahead log to Kafka changelog topic |
| Recovery method | Restore RocksDB from S3 snapshot | Replay changelog topic into new RocksDB |
| Recovery speed (large state) | Fast — binary restore from snapshot | Slow — must replay all events in changelog |
| Recovery speed (small state) | Similar | Similar |
| External dependency | S3/HDFS for checkpoints | Kafka (already present) |
| State isolation | Flink cluster manages it | Kafka partitions manage it |
Kafka Streams advantage: No external storage dependency — durability is built on Kafka, which is already in your stack. Simpler operational model.
Flink advantage: Much faster recovery for large state. Replaying a 100GB changelog topic takes hours. Restoring a 100GB RocksDB snapshot from S3 takes minutes (parallel download of SST files).
Kafka Streams State Store Types
Persistent stores (RocksDB-backed):
KeyValueStore — single key lookups and range scans
WindowStore — windowed key lookups, keyed by (key, window_start)
SessionStore — session-windowed state
In-memory stores (for small state, no RocksDB):
KeyValueStore (in-memory variant)
WindowStore (in-memory variant)
Kafka Streams Topology (equivalent to Flink job graph)
Source: KStream from topic "raw-clicks"
│
▼
Processor: Deduplicate
StateStore: KeyValueStore<click_id, Long> (RocksDB, TTL 24h)
Logic: if store.get(click_id) != null → suppress
else → store.put(click_id, timestamp); forward
│
▼
Processor: Windowed Count
StateStore: WindowStore<String, Long> (RocksDB, 1-min windows)
Logic: store.fetch(ad_id, windowStart, windowEnd)
store.put(ad_id, count + 1, windowStart)
│
▼
Sink: KStream to topic "aggregated-clicks"
Apache Samza: The Pioneer
Before Flink became dominant, LinkedIn built Apache Samza — and Samza introduced the "local RocksDB + Kafka changelog" pattern that both Flink and Kafka Streams later adopted.
Samza's contribution: proving that embedded, local state with a durable log-based backup was the right model for stream processing at scale. Before Samza, the dominant approach was remote state in Redis or Cassandra — with the latency and operational complexity that entailed.
Samza's architecture maps closely to Kafka Streams:
Samza Container (JVM process, similar to Kafka Streams thread)
└─ Stream Task (one per Kafka partition)
└─ LocalTable: RocksDB instance (local disk)
backed by: Kafka changelog topic
The difference: Samza is a deployed framework (like Flink), not an embedded library. You run Samza containers on YARN or Kubernetes, separately from your application. Kafka Streams runs inside your application.
Why This Pattern Won
Three alternatives and why they were rejected:
Remote State (Redis, Cassandra)
Every state read and write crosses a network. At 50,000 events/second per task, that is 50,000 network round-trips per second. Redis latency is ~1ms; Cassandra ~5ms. Your event processing throughput becomes bounded by the state store's network throughput, not your compute.
For simple transformations this is acceptable. For anything stateful at high throughput — windowed aggregations, deduplication, joins — it is not.
Heap-Only State (Java HashMap)
Fast, but:
- Bounded by JVM heap size (typically 4–32GB, with GC pressure above that)
- Not durable — a crash loses everything
- GC pauses cause processing stalls
Works for small state in development or low-scale production. Fails for large, long-running state.
HDFS / S3 for Every Write
Durable, but write latency is 20–200ms per operation. Unusable for per-event state updates.
RocksDB (LSM-tree, local disk)
- In-process — no network hop
- MemTable gives RAM-speed access to hot keys
- SST files on disk handle cold keys, no memory limit
- Background compaction keeps read performance bounded
- Immutable SST files enable efficient incremental checkpointing
- Proven at scale: used by Flink, Kafka Streams, Samza, TiKV, CockroachDB, MyRocks (MySQL RocksDB storage engine at Meta)
The same properties that make RocksDB right for stream processing state also make it right for embedded databases in distributed systems more broadly. It is not a coincidence that every major stream processing framework independently chose it.
Flink Use Cases by Pattern
| Use Case | Flink Pattern | State Type | Scale |
|---|---|---|---|
| Ad click aggregation | Tumbling window, keyed by ad_id | AggregatingState per window | Kafka → Flink → ClickHouse |
| Fraud detection | CEP pattern matching, session windows | MapState (user transaction history) | < 200ms latency requirement |
| Real-time ML features | Sliding window statistics | ValueState (running mean/variance) | Point-in-time correct features |
| Database CDC processing | Flink CDC source, upsert sink | No custom state needed | Debezium → Flink → DWH |
| IoT alerting | Threshold + complex event detection | ValueState (last N readings) | Millions of device streams |
| Sessionisation | Session window, keyed by user_id | ListState (events in session) | E-commerce clickstream |
Further Reading
Apache Flink:
Kafka Streams:
Apache Samza:
RocksDB:
Production:
