A Kafka producer that lands 100,000 messages per second is completely worthless if the consumer behind it falls behind and never catches up. I’ve watched a team celebrate hitting a new producer throughput record at 2 a.m., only to get paged at 6 a.m. because their downstream consumer group had accumulated forty million unprocessed messages overnight, the retention window was about to evict the oldest ones, and nobody had set up lag alerts. The producer was perfect. The consumer was a disaster. The data, by morning, was gone.
This is the uncomfortable reality of Kafka in production: producers are mostly stateless and forgiving, but consumers are where the actual distributed systems problems live. Consumers have to track what they’ve read, coordinate with peers, survive rebalances without losing work, handle deserialization failures, decide what “done” means, and do all of this while keeping pace with a firehose that never slows down. Get it wrong and you either drop messages, reprocess them endlessly, or fall so far behind that your “real-time” pipeline becomes a batch job with extra steps.
This post is the consumer-side companion to the Kafka producer guide for multivariate time-series ingestion, which covered Avro schemas, partitioning strategy, and producer configuration for collecting server metrics. If you’ve already read that, you have a topic full of Avro-encoded records sitting on a broker, waiting for someone to pick them up. This post is about that someone. We’ll cover consumer groups, the rebalance protocol, offset commits, the three delivery guarantees, how the polling loop actually works, Schema Registry deserialization, dead letter queues, lag monitoring, and a full working Python implementation using confluent-kafka-python.
Why Consumers Are the Hard Part of Kafka
When you write a Kafka producer, the broker does most of the hard work for you. You hand it a record, it acknowledges, it figures out which partition to write to, it replicates, and it hands you a committed offset. If the producer crashes mid-batch, the client library retries idempotently, and when the process comes back, it doesn’t need to remember anything beyond its own configuration. Producers are almost pure functions: data in, acknowledgment out.
Consumers are not pure functions. A consumer has to answer, every single second, a question the producer never has to: “where was I?” That state lives in the __consumer_offsets internal topic, but the consumer has to decide when to write to it, what to write, and what to do if its understanding of “where I was” disagrees with the broker’s. It also has to share work with its peers — and those peers might join, leave, crash, or lag at any moment. When they do, the group rebalances, partitions get yanked out from under running code, and whatever in-memory state your handler accumulated has to be either committed, flushed, or safely abandoned.
Add deserialization to the mix and it gets worse. Your producer wrote Avro bytes with a Schema Registry ID prefix. Your consumer has to decode those bytes, match the schema, and handle the case where the producer used a new schema version that the consumer has never seen. Now add error handling: what do you do with a record your code just can’t process? Retry forever and block the partition? Skip and lose it? Route it somewhere for a human to look at?
And finally, the thing that kills more Kafka deployments than all the above combined: lag. Your consumer group is “working” — no errors, no crashes, CPU looks fine — but you’re processing 8,000 messages per second and the producers are writing 12,000. You’re falling behind four thousand messages every second. If nobody notices for a day, that’s 345 million messages of backlog, and you won’t catch up without either throwing more consumers at it or letting retention delete what you haven’t read yet. Silent lag is the number one cause of Kafka data loss in practice, and it’s purely a consumer-side problem.
The rest of this post is about how to get each of these concerns right, one at a time, with code that works.
How Consumer Groups and Partition Assignment Work
The consumer group is the unit of parallelism in Kafka. When you start a consumer, you give it a group.id. Every consumer with the same group ID forms a single logical subscriber, and Kafka guarantees that each partition of the subscribed topics is delivered to exactly one member of that group at a time. Two consumers in the same group will never see the same partition. Two consumers in different groups will both receive every message, independently — that’s how you fan out to multiple downstream systems from a single topic.
Inside a group, there’s always one broker designated as the group coordinator. The coordinator’s job is to track group membership, handle joins and leaves, run the rebalance protocol, and persist committed offsets. When your consumer calls subscribe() and starts polling, it sends a JoinGroup request to the coordinator, which either admits it into an existing group or starts a new one. One consumer in the group is elected as the group leader, and it’s the leader (not the coordinator) that actually computes the partition assignment. It runs the configured partition.assignment.strategy locally and sends the result back to the coordinator, which then distributes it to all members.
This design has one consequence that surprises newcomers and causes many production outages: you cannot have more working consumers than partitions in a group. If your topic has six partitions and you start eight consumers in the same group, two of them will sit idle, consuming nothing. They’re not broken — they joined the group, they got zero partitions, and they’ll wait to take over if someone else dies. This is why partition count is the hard ceiling on consumer parallelism, and why the producer-side decision about num.partitions is so consequential downstream.
The assignment strategy — partition.assignment.strategy — controls how the group leader divides partitions among members. Kafka ships with four built-in strategies, and the difference between them matters a lot if you’re running a group with dozens of consumers or if rebalances are frequent.
| Strategy | Behavior | Rebalance Cost | When to Use |
|---|---|---|---|
| Range | Per-topic contiguous ranges. Default for historical compatibility. | Stop-the-world | Legacy workloads, or when you specifically want co-partitioning across topics for joins. |
| RoundRobin | Distributes evenly across all subscribed partitions. | Stop-the-world | Stateless processing where balance matters more than locality. |
| Sticky | Balanced, but preserves as much of the prior assignment as possible. | Stop-the-world (reduced churn) | Warm caches, expensive state rebuild, or large groups. |
| CooperativeSticky | Sticky plus incremental/cooperative rebalancing. | Non-stop; only moved partitions pause | Recommended default for new deployments. Safer scaling and rolling restarts. |
The Rebalance Protocol: Eager vs Cooperative
A rebalance is the process by which a consumer group redistributes partitions among its members. Rebalances happen for a handful of reasons: a consumer joins the group, a consumer leaves cleanly, a consumer dies (its session times out), the subscribed topic’s partition count changes, or you manually trigger it. From a correctness standpoint, rebalances are the single most dangerous event in a consumer’s life. From a latency standpoint, they’re often your worst-case latency outlier.
Originally, Kafka used eager rebalancing, also called “stop-the-world.” When a rebalance is triggered, every member of the group revokes all of its partitions, sends a JoinGroup, waits for the leader to compute the new assignment, and then receives its new set. During that window — which can stretch from hundreds of milliseconds to tens of seconds in unhealthy clusters — nobody is processing anything. If you have a group with 200 consumers and one of them is a little slow to respond to JoinGroup, the other 199 are idle. Worse, once the rebalance completes, some of them get the same partitions back, so the revoke-and-reassign was pure overhead.
Cooperative rebalancing, introduced in KIP-429 and stable since Kafka 2.4, fixes this. Instead of revoking all partitions at once, the protocol runs in two phases. In the first phase, every member reports its current ownership. The leader computes the new assignment and identifies only the partitions that actually need to move — from consumer X to consumer Y. Then only those partitions are revoked. The consumers that aren’t losing anything keep processing the whole time. A second phase then assigns the moved partitions to their new owners. The total rebalance time may actually be longer end-to-end, but the observable pause on each individual partition drops dramatically.
To enable cooperative rebalancing, set partition.assignment.strategy to cooperative-sticky. You can run a mixed group temporarily during migration by listing both strategies — Kafka will negotiate down to the common one — but the goal is to end up with everyone on the cooperative strategy.
max.poll.interval.ms because your processing loop stalled. Each kick-and-rejoin triggers a full group rebalance. You can see this as periodic latency spikes and endless “Group is rebalancing” log lines. The fix is almost never increasing the timeout — it’s fixing the slow handler or reducing max.poll.records.
There’s a second, subtler consequence of rebalances: your in-memory state becomes invalid the moment a partition is revoked. If you’ve been accumulating per-partition buffers, counts, or dedupe caches, you need to flush or commit them before the partition leaves. The on_revoke callback is where that happens, and getting it right is one of the most common sources of data-loss bugs in Kafka consumers.
Offset Management and Delivery Semantics
Every message in a Kafka partition has a monotonic offset — 0, 1, 2, 3, and so on. A consumer’s job is to read from a starting offset, process the records, and periodically tell the broker “I’ve processed up to offset N on partition P.” That commit is stored in the internal __consumer_offsets topic, keyed by (group, topic, partition). When a consumer restarts or a rebalance moves a partition to a new owner, the new owner reads that committed offset and resumes from there.
The key decision is when to commit. Kafka exposes two modes:
- Auto-commit (
enable.auto.commit=true): the client library commits offsets in the background everyauto.commit.interval.ms(default 5 seconds). It commits whatever was returned by the most recentpoll(), regardless of whether your code actually finished processing those records. Simple, but dangerous: if your process crashes after the offset was committed but before your handler completed, those records are lost. If it crashes before the next commit, you reprocess the last five seconds. - Manual commit (
enable.auto.commit=false): you callcommit()explicitly, either synchronously or asynchronously. You decide when “done” means done. This is the only mode you should use in production if correctness matters.
Out of that single decision grows the entire “delivery semantics” conversation, which is really a conversation about what order you put your commits in relative to your side effects.
At-most-once means you commit the offset before processing the record. If your code crashes between the commit and the side effect, the record is lost forever. The broker thinks you handled it, and your next poll will skip past. You get zero duplicates, but you accept that some records will silently disappear. People choose this rarely, and when they do, it’s usually for high-volume metrics where a few dropped samples are tolerable and duplicates would blow up some downstream counter.
At-least-once means you process first, then commit. If you crash between processing and committing, the record will be re-delivered on restart and processed again. This is the default for nearly every pipeline. The cost is that your handler has to be idempotent, or you need a downstream sink that can absorb duplicates — an upsert into a keyed table, a dedupe window, a content hash. For the server-metrics pipeline in the companion producer post, an InfluxDB sink is naturally idempotent because writes with the same timestamp+tags+field overwrite.
Exactly-once is the holy grail and it actually works in Kafka — but only under specific conditions. For Kafka-to-Kafka pipelines, the producer-consumer transaction API lets you atomically commit both the output records and the input offsets as a single transaction. Any consumer downstream reads with isolation.level=read_committed and only sees records from committed transactions. For Kafka-to-external-system pipelines, exactly-once requires either an idempotent sink (so at-least-once is effectively exactly-once) or a two-phase commit protocol between Kafka and the sink, which almost nobody implements by hand — they use Kafka Connect with a transactional sink, or Apache Flink with its own checkpoint-and-commit machinery.
Inside the Polling Loop
The beating heart of any Kafka consumer is the polling loop. Every call to consumer.poll(timeout) does three jobs: it fetches records from the broker, it sends heartbeats to the group coordinator, and it runs rebalance callbacks if the group state changed. If you don’t call poll() often enough, the coordinator assumes your consumer is dead and kicks it out of the group.
There are three timeouts that govern this dance, and their interaction is where most consumer bugs come from:
| Config | Default | What It Controls |
|---|---|---|
session.timeout.ms |
45000 (45s) | Max time the coordinator will wait for a heartbeat before declaring the consumer dead and triggering a rebalance. |
heartbeat.interval.ms |
3000 (3s) | How often the background heartbeat thread pings the coordinator. Must be well below session timeout. |
max.poll.interval.ms |
300000 (5 min) | Max time between two consecutive poll() calls. If you exceed this, the consumer is kicked from the group even if heartbeats are still flowing. |
max.poll.records |
500 | Maximum records returned per poll() call. Combined with max.poll.interval.ms, this caps how long you can spend processing one batch. |
fetch.min.bytes |
1 | Minimum bytes a broker should accumulate before responding. Larger values improve throughput at the cost of latency. |
fetch.max.wait.ms |
500 | How long a broker will wait to accumulate fetch.min.bytes before responding anyway. |
Since Kafka 0.10.1, heartbeats are sent from a background thread independent of poll(), which is why max.poll.interval.ms exists as a separate guardrail. Without it, a consumer could wedge inside a slow handler for an hour, never poll, never process anything, but still send heartbeats and keep its partitions locked. The max.poll.interval.ms catches exactly that case: if you don’t call poll() frequently enough, you’re out of the group regardless of how chatty your heartbeat thread is.
The intuitive mental model is: “poll often, process quickly, commit explicitly.” If your handler is slow, reduce max.poll.records so each batch is smaller, or move heavy work off the polling thread and onto a worker pool — with a bounded queue so you still call poll() frequently. Never, ever increase max.poll.interval.ms as a first resort, because you’re just making your detect-dead-consumer latency worse without fixing the underlying problem.
A Full Production-Ready Python Consumer
Here’s a full working consumer using confluent-kafka-python, which wraps the battle-tested librdkafka C library and is the right choice for any serious Python workload. It connects to the broker, uses Schema Registry for Avro deserialization (matching the companion producer), processes messages manually, commits offsets after successful processing, routes failures to a DLQ topic, and shuts down gracefully on SIGTERM. It also registers a rebalance listener so we can flush state on revoke.
First, a minimal set of config values. These live in environment variables so the same binary runs in dev and prod.
# consumer_config.py
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class ConsumerConfig:
bootstrap_servers: str
schema_registry_url: str
group_id: str
topic: str
dlq_topic: str
auto_offset_reset: str = "earliest"
@classmethod
def from_env(cls) -> "ConsumerConfig":
return cls(
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
schema_registry_url=os.environ["SCHEMA_REGISTRY_URL"],
group_id=os.environ.get("KAFKA_GROUP_ID", "metrics-consumer"),
topic=os.environ.get("KAFKA_TOPIC", "server-metrics"),
dlq_topic=os.environ.get("KAFKA_DLQ_TOPIC", "server-metrics-dlq"),
auto_offset_reset=os.environ.get("AUTO_OFFSET_RESET", "earliest"),
)
Now the main consumer. Read this top to bottom — the structure is the production template you want to clone for any new consumer.
# metrics_consumer.py
import json
import logging
import signal
import sys
import time
from typing import Any
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException, TopicPartition
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from consumer_config import ConsumerConfig
log = logging.getLogger("metrics_consumer")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
class MetricsConsumer:
def __init__(self, cfg: ConsumerConfig):
self.cfg = cfg
self._running = True
self.consumer = Consumer({
"bootstrap.servers": cfg.bootstrap_servers,
"group.id": cfg.group_id,
"auto.offset.reset": cfg.auto_offset_reset,
# Correctness: manual commit after successful processing.
"enable.auto.commit": False,
# Cooperative rebalancing: safer scaling, less stop-the-world.
"partition.assignment.strategy": "cooperative-sticky",
# Session timeouts tuned for a well-behaved handler.
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
"max.poll.interval.ms": 300000,
# Throughput / latency tuning.
"fetch.min.bytes": 1024 * 64, # 64 KB
"fetch.max.wait.ms": 250,
"max.partition.fetch.bytes": 1024 * 1024, # 1 MB
# Only see committed transactional records if the producer uses txns.
"isolation.level": "read_committed",
# Give the consumer a stable client id for lag tooling and logs.
"client.id": f"{cfg.group_id}-{int(time.time())}",
})
# Schema Registry wiring. The producer in the companion post
# wrote Avro with a magic byte + schema ID prefix; this decodes it.
sr_client = SchemaRegistryClient({"url": cfg.schema_registry_url})
self.deserializer = AvroDeserializer(
schema_registry_client=sr_client,
# schema_str=None lets the deserializer fetch by ID from each message.
)
# DLQ producer. Stateless from our point of view; just a sink.
self.dlq_producer = Producer({
"bootstrap.servers": cfg.bootstrap_servers,
"enable.idempotence": True,
"acks": "all",
"compression.type": "zstd",
"linger.ms": 20,
})
signal.signal(signal.SIGTERM, self._on_signal)
signal.signal(signal.SIGINT, self._on_signal)
def _on_signal(self, signum, frame):
log.info("received signal %s, shutting down", signum)
self._running = False
def _on_assign(self, consumer, partitions):
log.info("assigned partitions: %s",
[(p.topic, p.partition) for p in partitions])
# If you kept local state keyed by partition, restore it here.
def _on_revoke(self, consumer, partitions):
log.info("revoked partitions: %s",
[(p.topic, p.partition) for p in partitions])
# Last chance to flush in-memory state before partitions move away.
try:
consumer.commit(asynchronous=False)
except KafkaException as e:
log.warning("final commit on revoke failed: %s", e)
def _on_lost(self, consumer, partitions):
# Triggered when the consumer has lost ownership without a clean revoke
# (e.g. session timeout). Do NOT commit — the offsets are no longer ours.
log.warning("partitions lost: %s",
[(p.topic, p.partition) for p in partitions])
def run(self) -> None:
self.consumer.subscribe(
[self.cfg.topic],
on_assign=self._on_assign,
on_revoke=self._on_revoke,
on_lost=self._on_lost,
)
try:
while self._running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
self._handle_kafka_error(msg.error())
continue
try:
payload = self._deserialize(msg)
self._handle_record(payload, msg)
# Store offset; commit below will use it.
# store_offsets + periodic commit keeps throughput high
# compared to committing after every single record.
self.consumer.store_offsets(message=msg)
except PoisonPillError as e:
log.error("poison pill on %s[%d]@%d: %s",
msg.topic(), msg.partition(), msg.offset(), e)
self._route_to_dlq(msg, reason=str(e))
# Advance past the bad record so we don't block the partition.
self.consumer.store_offsets(message=msg)
except RetriableError as e:
log.warning("retriable error, will replay: %s", e)
# Do NOT store offset — next poll will retry the same record.
time.sleep(1.0)
# Commit roughly every second in batches for throughput.
self._maybe_commit()
finally:
self._shutdown()
def _deserialize(self, msg) -> dict[str, Any]:
try:
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
value = self.deserializer(msg.value(), ctx)
if value is None:
raise PoisonPillError("deserialized to None")
return value
except Exception as e:
raise PoisonPillError(f"deserialization failed: {e}") from e
def _handle_record(self, payload: dict[str, Any], msg) -> None:
# ---- YOUR BUSINESS LOGIC LIVES HERE ----
# Must be idempotent (at-least-once semantics).
# Example: upsert into InfluxDB / TimescaleDB / Iceberg by (host, timestamp).
host = payload.get("host")
ts = payload.get("timestamp")
cpu = payload.get("cpu_percent")
if not host or ts is None:
raise PoisonPillError("missing required fields host/timestamp")
log.debug("ingest host=%s ts=%s cpu=%s", host, ts, cpu)
_last_commit_ts = 0.0
def _maybe_commit(self) -> None:
now = time.monotonic()
if now - self._last_commit_ts >= 1.0:
try:
self.consumer.commit(asynchronous=True)
self._last_commit_ts = now
except KafkaException as e:
log.warning("async commit failed: %s", e)
def _handle_kafka_error(self, err) -> None:
if err.code() == KafkaError._PARTITION_EOF:
return # benign
log.error("kafka error: %s", err)
if not err.retriable():
raise KafkaException(err)
def _route_to_dlq(self, msg, reason: str) -> None:
headers = [
("original_topic", msg.topic().encode()),
("original_partition", str(msg.partition()).encode()),
("original_offset", str(msg.offset()).encode()),
("error_reason", reason.encode()),
("failed_at", str(int(time.time() * 1000)).encode()),
]
self.dlq_producer.produce(
topic=self.cfg.dlq_topic,
key=msg.key(),
value=msg.value(), # preserve raw bytes for forensic replay
headers=headers,
)
self.dlq_producer.poll(0)
def _shutdown(self) -> None:
log.info("flushing DLQ producer")
self.dlq_producer.flush(10)
log.info("committing final offsets")
try:
self.consumer.commit(asynchronous=False)
except KafkaException as e:
log.warning("final commit failed: %s", e)
self.consumer.close()
log.info("consumer closed cleanly")
class PoisonPillError(Exception):
"""Record cannot be processed and should be routed to the DLQ."""
class RetriableError(Exception):
"""Transient failure — do not commit, retry on next poll."""
def main() -> int:
cfg = ConsumerConfig.from_env()
MetricsConsumer(cfg).run()
return 0
if __name__ == "__main__":
sys.exit(main())
Several things in this code are load-bearing and worth highlighting explicitly.
We use store_offsets plus periodic commit rather than committing after each message. store_offsets just updates the client’s in-memory notion of “what should be committed next,” and then commit sends that snapshot to the broker. Committing after every single record is a latency disaster at high throughput; committing every ~1 second batches the work and still limits worst-case replay to roughly one second of records.
The on_revoke callback calls commit(asynchronous=False). This is the last synchronous commit before the partition is yanked. If you skip this, any records you processed since the last periodic commit will replay after the rebalance — not a correctness bug under at-least-once, but a big waste. The on_lost callback deliberately does not commit, because by the time we get there, someone else may already own those partitions and our commit would be wrong.
Poison pills advance the offset; retriables do not. This is the distinction between “this record will never work, skip it and log” and “this record might work next time, don’t touch the offset.” Blurring these leads to infinite replay loops.
Error Handling and Dead Letter Queues
Every running consumer eventually meets a message it cannot process. It might be a bug in the producer, an Avro schema incompatibility, a field that’s technically valid but semantically wrong, or a downstream service that’s rejecting writes for reasons unrelated to the record. How you handle that record decides whether your pipeline keeps moving or grinds to a halt.
There are four broad strategies, and a healthy consumer uses at least three of them at different points:
- Skip. Log the record, advance the offset, move on. Appropriate when the record is genuinely unprocessable and loss is acceptable — bad telemetry, corrupted log lines, etc.
- Retry with backoff. Don’t commit, sleep, and let the next poll re-deliver. Appropriate for transient failures: a downstream HTTP timeout, a temporary DB connection drop, a rate limit. Cap the retries so you don’t block the partition forever.
- Route to a DLQ topic. Produce the raw bytes, headers, and failure metadata to a separate “dead letter” topic, then advance the offset. A human (or a scheduled job) can inspect the DLQ later, fix the bug, and optionally replay. This is the right default for almost all poison-pill cases in production.
- Circuit break. If your error rate exceeds a threshold, pause consumption entirely and page someone. Keeps you from dumping millions of messages into a DLQ because a downstream service is completely down.
The DLQ pattern deserves a little more attention because it’s often implemented wrong. A good DLQ record preserves the original raw bytes of the value (so you can still deserialize it with whatever schema was current at produce time), includes headers with the original topic/partition/offset, the error reason, and a timestamp. Never try to re-serialize a poison pill “prettier” for the DLQ; you’ll lose the exact evidence you need to diagnose it. The snippet above does this correctly by passing msg.value() straight through.
DLQ topics should have their own retention — longer than the main topic, because you need time to look at failures — and their own monitoring. A DLQ that silently grows is almost as bad as a consumer that silently lags. Alert on DLQ production rate, not just the main consumer lag.
Consumer Lag Monitoring
Consumer lag is the difference, per partition, between the latest offset produced and the latest offset committed by a consumer group. If lag is zero, you’re caught up. If lag is positive and growing, you’re falling behind. If lag is positive and stable at a small value, you’re steady-state and healthy. If lag is positive and huge, you’re about to have a very bad day.
The simplest way to see lag is from the command line:
# Show lag for a group
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--describe \
--group metrics-consumer
# Output (truncated):
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# metrics-consumer server-metrics 0 1047329 1048210 881
# metrics-consumer server-metrics 1 1046118 1047002 884
# metrics-consumer server-metrics 2 1045884 1053991 8107
# Reset a group to the beginning of a topic
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--group metrics-consumer --topic server-metrics \
--reset-offsets --to-earliest --execute
# Reset to a specific timestamp (replay last hour)
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--group metrics-consumer --topic server-metrics \
--reset-offsets --to-datetime 2026-04-12T13:00:00.000 --execute
For production, you want lag exported as a metric and alerted on. Two widely used tools for this are LinkedIn’s Burrow, which has a smart sliding-window evaluator that classifies groups as OK/WARN/ERR based on whether they’re stuck or falling behind, and Kafka Lag Exporter, which exposes lag as Prometheus metrics (kafka_consumergroup_group_lag and kafka_consumergroup_group_lag_seconds).
Alerting on raw lag count is usually wrong — a burst of produces can spike lag without indicating a real problem. Alerting on lag in seconds (how old is the oldest record I haven’t read?) is much better, because it directly corresponds to the SLA your consumers are trying to meet.
| Lag in Seconds | Severity | Action |
|---|---|---|
| < 10s | Healthy | Normal operation. |
| 10s – 60s | Warning | Check for a produce burst or transient downstream slowdown. |
| 1 min – 5 min | Page secondary | Sustained drift. Investigate handler latency and downstream health. |
| > 5 min | Page on-call | Consumer is behind SLA. Start horizontal scaling or investigate rebalance loops. |
| > retention window | Data loss imminent | Records will be deleted before you read them. All-hands incident. |
Note that “no lag alert ever fired” is itself a red flag. It usually means your thresholds are too generous and you’re missing real regressions. Test your lag alerts regularly by artificially slowing a consumer in staging and confirming the pages fire.
Scaling, Stateful Processing, and Beyond
Horizontal scaling of stateless consumers is Kafka’s happy path: add more consumer instances to the same group, and the next rebalance redistributes partitions. With cooperative-sticky assignment, the only partitions that pause are the ones that actually move. You can scale up (and down) with minimal disruption. The ceiling is the partition count: you cannot get more parallelism than you have partitions in the subscribed topics combined. If you’re at the ceiling, your only options are to increase partition count (which requires planning — see the producer post for why partition keys and counts are hard to change later) or to make each consumer faster.
Making each consumer faster usually means one of three things: batch downstream writes, move heavy work off the polling thread onto a worker pool, or tune fetch.min.bytes and max.poll.records to trade latency for throughput. For a sink like a time-series pipeline that lands data in InfluxDB or Iceberg, batched writes are almost always the biggest single win — flushing 500 records per HTTP round trip instead of one gives you a 50–100x throughput improvement without touching Kafka at all.
Stateless consumers cover maybe 80% of use cases. For the remaining 20%, where you need to do joins, windowed aggregations, sessionization, or anything that depends on state accumulated across records, a plain consumer is not the right tool. You can technically make it work by keeping state in RocksDB or Redis and reconciling on rebalance, but you’ll rebuild Kafka Streams badly. Use Apache Flink for complex event processing, or Kafka Streams if you’re on the JVM. Both handle partition-local state, checkpointing, and exactly-once semantics for you — things you really don’t want to hand-roll.
Another common question: do you need to write consumer code at all? If the goal is to land Kafka messages in an external system — Postgres, S3, Elasticsearch, Snowflake — check whether Kafka Connect already has a sink connector for it. Kafka Connect runs as a separate cluster of workers, handles rebalancing and exactly-once for you (with compatible sinks), and replaces dozens of hand-written consumers with a few lines of JSON config. The break-even point for hand-rolled Python is when your business logic genuinely needs to do something Connect cannot — custom enrichment, calling a model, routing based on content, or anything with downstream dependencies Connect can’t express.
Frequently Asked Questions
Should I use enable.auto.commit=true or manual commits in production?
Manual commits, almost always. Auto-commit is convenient for prototypes and toy examples, but it decouples “offset committed” from “record actually processed,” which means a crash at the wrong moment silently drops records. Set enable.auto.commit=false, process your batch, call store_offsets, and periodically commit. The small amount of extra code is what buys you “no silent data loss.”
What’s the difference between eager and cooperative rebalancing?
Eager rebalancing revokes every partition from every consumer at the start of a rebalance, so the entire group goes idle until the new assignment is computed and applied — this is the classic “stop-the-world” behavior. Cooperative rebalancing (KIP-429, stable since 2.4) only revokes partitions that actually need to move, letting everyone else keep processing. Under cooperative, a normal scale-up from 5 to 6 consumers pauses maybe one partition briefly instead of pausing all five existing consumers completely. Set partition.assignment.strategy=cooperative-sticky for any new deployment.
Can I have more consumers than partitions for more throughput?
No. Extra consumers in the same group beyond the partition count will be idle. Kafka’s parallelism ceiling in a single consumer group is the number of partitions subscribed. If you need more parallel throughput, you have to either increase partition count on the topic or make each consumer do more work per unit time (batching downstream writes usually helps most). You can have extra consumers as hot standbys, but they won’t process anything until someone else dies or leaves.
How do I achieve exactly-once semantics with a Python consumer?
In the strict Kafka-to-Kafka sense, exactly-once in Python requires using the transactional producer API alongside your consumer, with isolation.level=read_committed on downstream consumers. The confluent-kafka-python library supports this, but the surface is narrower and harder to get right than in Java. In practice, most Python consumers achieve “effective” exactly-once by running at-least-once and relying on an idempotent sink: upserting by a natural key, deduping by a hash in a dedupe table, or writing to a store like TimescaleDB that treats duplicate rows as overwrites. For true end-to-end EOS across heterogeneous systems, Flink or Kafka Streams is a better foundation than a hand-rolled Python consumer.
When should I use Kafka Streams or Flink instead of a plain consumer?
Use a stream processing framework when your logic needs state that spans multiple records — joining two streams, computing a 5-minute moving average, sessionizing events into user sessions, deduping with a rolling window, or emitting an alert when pattern X is followed by pattern Y within Z seconds. A plain consumer can do these, but you’ll end up writing your own checkpointing, rebalance-aware state restoration, and failure recovery, and it’ll be worse than the ones those frameworks already ship. Stick with a plain consumer when you’re doing stateless per-record transforms or simple sinks, and reach for Flink or Streams the moment you notice “I wish I had a windowed aggregation here.”
Related Reading
Related Reading
- Apache Kafka as a Multivariate Time-Series Engine — the companion producer post covering Avro schemas, partitioning, and producer configuration.
- Complex Event Processing with Apache Flink — when a plain consumer is not enough and you need stateful stream processing.
- InfluxDB to AWS Iceberg Data Pipeline — a real example of a downstream sink that consumes from Kafka-style pipelines.
- Best Databases for Time-Series Data — choosing the right sink for consumed records.
- Docker Containers Explained — the easiest way to run a local Kafka broker for development.
- Clean Code Principles — keeping a long-running consumer readable and maintainable.
- Python vs Rust — language trade-offs for very high-throughput consumers.
Conclusion
If you take one thing away from all of this, take this: a Kafka consumer is not a loop that reads messages — it’s a small stateful distributed system that happens to call poll(). Every interesting production failure you’ll hit comes from forgetting that. The rebalance you didn’t handle, the offset you committed too early, the poison pill that blocked a partition for three hours, the silent lag that ate your retention window, the heartbeat that stopped firing because your handler was stuck in a synchronous HTTP call. None of these are Kafka bugs. They’re consumer design bugs, and almost all of them have the same fix: manual commits, cooperative rebalancing, an explicit DLQ, a fast handler, and lag alerts that fire before you lose data.
The code in this post is close to what a real production consumer should look like. The structure — config from env, manual commits with store_offsets, cooperative rebalancing, explicit poison-pill vs retriable exceptions, DLQ with header metadata, graceful shutdown on SIGTERM, rebalance callbacks — is the same whether you’re consuming server metrics, financial events, user activity logs, or IoT sensor data. The handler body changes. The scaffolding stays.
If you’re coming to this from the producer side already, you now have both halves of the pipeline: a producer that ships Avro-encoded server metrics with a thoughtful partition key, and a consumer that reads them safely, handles failures without losing data, and scales horizontally without rebalance storms. What you do with those metrics after the consumer hands them to your handler — land them in a time-series database, aggregate them into windows, feed them to a FastAPI service that serves real-time dashboards, or pipe them into a stream processor — is up to you. But the hardest part, the part that will wake you up at 3 a.m. if you get it wrong, is done.
References
- Apache Kafka Documentation — Consumer API
- confluent-kafka-python — Python Client Documentation
- KIP-429: Kafka Consumer Incremental Rebalance Protocol
- Confluent — Exactly-Once Semantics in Apache Kafka
- LinkedIn Burrow — Kafka Consumer Lag Monitoring
- Kafka Lag Exporter — Prometheus Metrics for Consumer Lag
Leave a Reply