Why Servers Drown in Their Own Telemetry
A single modern server, when fully instrumented, can easily emit more than 10,000 metric samples per second. Multiply that by a few hundred machines in a modest production fleet and you are staring at millions of time-stamped numbers arriving every second, all of them correlated, all of them needed, and all of them completely useless if you cannot store and replay them reliably. This is where most homegrown monitoring stacks quietly fall over. The script that scrapes /proc/stat every five seconds and pushes rows directly into a time series database looks elegant in a demo, but the moment the database is down for maintenance, the collector crashes, or a network hiccup drops packets, you lose data you can never recover. And for observability, missing data is often worse than no data at all, because dashboards keep drawing lines and nobody notices the gap.
I learned this the hard way several years ago on an incident where a fleet of ingestion boxes started dropping metrics during a peak load spike. Our Grafana dashboards happily interpolated across the hole, and it took three full days before anyone realized the capacity plan for the next quarter had been built on phantom numbers. That incident convinced me of something that has guided every observability pipeline I have built since: the boundary between “things that produce data” and “things that store data” is one of the most important boundaries in a distributed system, and Apache Kafka is still the best thing we have to sit on that boundary.
This guide walks through building a production-grade Kafka time series engine end to end. We will collect multivariate metrics from a Linux server, serialize them with Avro, push them through a tuned Python producer, route them through intelligent partitioning, and feed them to downstream consumers that actually care about them. There will be working code, real Avro schemas, config you can copy, and the kind of hard-won details that only show up after you have watched things break in production.
Why Kafka for Multivariate Time Series
Before we write a single line of code, let us be honest about the question every engineer raises the moment you mention Kafka: do we actually need it? The short answer is that Kafka is not the cheapest or simplest tool in the observability toolbox, but it is almost always the right one once you outgrow a single machine and a single storage target. There are five properties that make it indispensable for multivariate time series, and each of them solves a specific failure mode that bites you the first time you try to build this stack without Kafka in the middle.
The first property is durability. Kafka persists every message to disk before acknowledging it, and with replication factor three you can tolerate two broker failures without losing a byte. Time series databases like InfluxDB or TimescaleDB are durable in their own way, but they are also stateful, tuned for query performance, and often the first thing you take down during an upgrade. If your producers write directly to the database, an upgrade window becomes a data loss window. With Kafka in the middle, producers keep writing, Kafka keeps storing, and the database catches up when it comes back.
The second is replay. Because Kafka retains data for a configurable window (hours, days, or even weeks), any consumer can reset its offset and re-read history. This is what turns an incident postmortem from “we have dashboards from before, so we can guess what happened” into “we can literally replay the exact data the monitoring system saw.” It is also how you onboard a new downstream system — point a fresh consumer at earliest and it catches up.
The third property is fan-out. Your metrics are rarely consumed by just one thing. You probably want a long-term store, a fast-access store, a stream processor for alerting, and maybe an ML training sink. Kafka lets you attach any number of independent consumer groups to the same topic without any coordination between them. Each group reads at its own pace, and a slow consumer cannot back-pressure a fast one.
Fourth is decoupling. The producer does not need to know anything about the consumer, and vice versa. You can swap out InfluxDB for TimescaleDB without touching a single line of collector code. This is the same argument that pushed us toward microservices in the first place, and it applies just as forcefully to data pipelines. If you want to see what that decoupling looks like at the storage layer, the time series database comparison guide walks through the tradeoffs between the usual sinks.
Fifth is horizontal scale. A single Kafka topic can be partitioned across dozens or hundreds of brokers, and each partition is an independent log. As your fleet grows from fifty servers to five thousand, you add partitions and brokers instead of rewriting your pipeline. I have personally watched the same Kafka cluster architecture scale from 50k to 3M messages per second without a fundamental redesign, which is not something you can say about most alternatives.
What Multivariate Time Series Actually Means
The term “multivariate time series” gets thrown around loosely, so let us pin it down. A univariate time series is a single signal indexed by time — for example, CPU utilization sampled every second. A multivariate time series is a collection of two or more signals that are sampled at the same timestamps and are correlated with each other. On a server, you almost never care about CPU in isolation. You care about CPU together with memory pressure, disk I/O wait, network throughput, and maybe temperature, because the interesting patterns live in the relationships between those signals.
Consider a classic example: a sudden spike in CPU usage. On its own, that tells you very little. But if at the same timestamp you also see memory usage climbing, disk I/O dropping to near zero, and network bytes per second flatlining, you are probably looking at a CPU-bound computation — perhaps a runaway regex or a JVM in a garbage collection storm. Contrast that with a CPU spike accompanied by high iowait, growing disk queue depth, and a drop in network throughput, which points you toward disk saturation causing downstream throttling. These diagnoses are only possible because the signals arrive together, on the same timeline, in the same record.
This has two concrete implications for how we design the engine. First, we should try to capture all signals at the same instant in a single message, not as separate messages for each metric. Second, our storage and query layer should make it cheap to align those signals on the time axis, which is exactly what purpose-built time series databases are good at. If you want to dig deeper into forecasting on this kind of data, the guide on time series forecasting models covers how models exploit the correlations we are capturing here.
Notice in the chart above how CPU and memory climb together during the middle of the window while disk I/O and network activity move in the opposite direction. That divergence is the whole point of capturing these signals together. If you store them in different Kafka topics with different timestamps and different partitioning schemes, you will spend most of your downstream query time trying to re-align them. Do not do that.
Architecture of the Engine
Our engine has four layers, and the cleanest way to think about them is as a relay race where each layer only has to hand off correctly to the next.
Layer one is collection. On each server, a small Python process samples metrics at a fixed interval (typically one second) using psutil. It bundles CPU, memory, disk, and network counters into a single record keyed by hostname and timestamp. This process runs as a systemd service and uses almost no resources — we have seen steady-state CPU of about 0.3% on a t3.medium.
Layer two is production. The same Python process serializes each record using an Avro schema fetched from the Schema Registry, then hands it to a confluent-kafka-python producer configured for durability and throughput. The producer batches records, compresses them with lz4, and sends them to the broker with acks=all.
Layer three is the broker. Kafka persists the records to a topic called server.metrics.v1, partitioned by hostname. Replication factor three ensures no data loss on broker failure. The topic has a retention of 72 hours, which is enough to replay into a new consumer without exploding disk usage.
Layer four is consumption. Multiple independent consumer groups read from the topic. One writes to InfluxDB for long-term storage, one runs Flink jobs for windowed aggregations and anomaly detection, and one feeds a lightweight alerting service. Each can be deployed, restarted, or replaced without touching the others. If you want Kafka running locally for development, the Docker containers guide covers the container basics you will need.
Collecting Server Metrics with psutil
The psutil library is the right tool for cross-platform metric collection in Python. It gives you CPU, memory, disk, and network stats with a consistent interface that works identically on Linux, macOS, and Windows. The only rule you need to remember is that many of its counters are cumulative — for example, psutil.net_io_counters() returns total bytes since boot, not bytes per second — so you have to take a delta between two consecutive samples to get a rate.
Here is a clean collector that captures a multivariate sample at each tick:
import socket
import time
from dataclasses import dataclass, asdict
from typing import Optional
import psutil
@dataclass
class MetricSample:
host: str
timestamp_ms: int
cpu_percent: float
cpu_user: float
cpu_system: float
cpu_iowait: float
mem_percent: float
mem_used_bytes: int
mem_available_bytes: int
swap_percent: float
disk_read_bytes_per_sec: float
disk_write_bytes_per_sec: float
disk_read_iops: float
disk_write_iops: float
net_rx_bytes_per_sec: float
net_tx_bytes_per_sec: float
net_rx_packets_per_sec: float
net_tx_packets_per_sec: float
load_1m: float
load_5m: float
load_15m: float
class MetricCollector:
def __init__(self, interval_seconds: float = 1.0):
self.interval = interval_seconds
self.host = socket.gethostname()
self._prev_disk = psutil.disk_io_counters()
self._prev_net = psutil.net_io_counters()
self._prev_time = time.monotonic()
# First CPU call is non-blocking and returns 0.0; prime it.
psutil.cpu_percent(interval=None)
psutil.cpu_times_percent(interval=None)
def sample(self) -> MetricSample:
now = time.monotonic()
elapsed = max(now - self._prev_time, 1e-6)
cpu_pct = psutil.cpu_percent(interval=None)
cpu_times = psutil.cpu_times_percent(interval=None)
vm = psutil.virtual_memory()
sm = psutil.swap_memory()
load = psutil.getloadavg()
disk = psutil.disk_io_counters()
d_read_b = (disk.read_bytes - self._prev_disk.read_bytes) / elapsed
d_write_b = (disk.write_bytes - self._prev_disk.write_bytes) / elapsed
d_read_iops = (disk.read_count - self._prev_disk.read_count) / elapsed
d_write_iops = (disk.write_count - self._prev_disk.write_count) / elapsed
net = psutil.net_io_counters()
n_rx_b = (net.bytes_recv - self._prev_net.bytes_recv) / elapsed
n_tx_b = (net.bytes_sent - self._prev_net.bytes_sent) / elapsed
n_rx_p = (net.packets_recv - self._prev_net.packets_recv) / elapsed
n_tx_p = (net.packets_sent - self._prev_net.packets_sent) / elapsed
self._prev_disk = disk
self._prev_net = net
self._prev_time = now
return MetricSample(
host=self.host,
timestamp_ms=int(time.time() * 1000),
cpu_percent=cpu_pct,
cpu_user=cpu_times.user,
cpu_system=cpu_times.system,
cpu_iowait=getattr(cpu_times, "iowait", 0.0),
mem_percent=vm.percent,
mem_used_bytes=vm.used,
mem_available_bytes=vm.available,
swap_percent=sm.percent,
disk_read_bytes_per_sec=d_read_b,
disk_write_bytes_per_sec=d_write_b,
disk_read_iops=d_read_iops,
disk_write_iops=d_write_iops,
net_rx_bytes_per_sec=n_rx_b,
net_tx_bytes_per_sec=n_tx_b,
net_rx_packets_per_sec=n_rx_p,
net_tx_packets_per_sec=n_tx_p,
load_1m=load[0],
load_5m=load[1],
load_15m=load[2],
)
A few details worth highlighting. We use time.monotonic() for the elapsed calculation because it is immune to wall clock adjustments — if NTP nudges the system clock backward, time.time() deltas can go negative and produce nonsense rates. We still use time.time() for the sample timestamp itself because that is what downstream consumers want to see. And we use getattr for iowait because it only exists on Linux; on macOS it silently returns zero.
On the hostname: I strongly recommend augmenting this with cloud metadata (instance ID, region, AZ) if you are on AWS, GCP, or Azure. Hostnames are fine as a partition key but they can collide across environments, and when you are triaging an incident at 3am you want to know exactly which instance emitted a weird number. The related article on managing metadata for time series signals goes into much more detail on this pattern.
Designing the Avro Message Schema
Every production Kafka deployment I have seen eventually regrets the absence of a schema, usually the day someone on another team adds a new field to the producer and the downstream consumer starts throwing KeyError at 2am. Avro with a Schema Registry solves this by making the schema a first-class part of the message itself. Producers register their schema once, and every message carries a 5-byte prefix with the schema ID. Consumers use that ID to fetch the exact schema the producer used and deserialize deterministically. It is one of the most valuable things in the Kafka ecosystem, and it takes maybe fifty lines of code to set up.
Here is the Avro schema for our multivariate sample. Save it as schemas/server_metric.avsc:
{
"type": "record",
"name": "ServerMetric",
"namespace": "com.aicodeinvest.metrics",
"doc": "A multivariate sample of host-level server metrics.",
"fields": [
{"name": "host", "type": "string", "doc": "Hostname or instance ID"},
{"name": "timestamp_ms", "type": "long", "doc": "Unix epoch ms"},
{"name": "cpu_percent", "type": "double"},
{"name": "cpu_user", "type": "double"},
{"name": "cpu_system", "type": "double"},
{"name": "cpu_iowait", "type": "double", "default": 0.0},
{"name": "mem_percent", "type": "double"},
{"name": "mem_used_bytes", "type": "long"},
{"name": "mem_available_bytes", "type": "long"},
{"name": "swap_percent", "type": "double", "default": 0.0},
{"name": "disk_read_bytes_per_sec", "type": "double"},
{"name": "disk_write_bytes_per_sec", "type": "double"},
{"name": "disk_read_iops", "type": "double"},
{"name": "disk_write_iops", "type": "double"},
{"name": "net_rx_bytes_per_sec", "type": "double"},
{"name": "net_tx_bytes_per_sec", "type": "double"},
{"name": "net_rx_packets_per_sec", "type": "double"},
{"name": "net_tx_packets_per_sec", "type": "double"},
{"name": "load_1m", "type": "double"},
{"name": "load_5m", "type": "double"},
{"name": "load_15m", "type": "double"},
{"name": "tags", "type": {"type": "map", "values": "string"}, "default": {}}
]
}
Three design decisions are worth unpacking. First, every field that is not strictly required has a default. This is what makes schema evolution safe — if tomorrow we add gpu_percent with a default of zero, old consumers that do not know about GPUs can still deserialize new messages without crashing. The Schema Registry enforces this rule automatically when you set the compatibility mode to BACKWARD, which you should.
Second, we include a free-form tags map. Tags are where you put things like environment, region, team, cluster ID — anything that varies between deployments and that you might want to filter by downstream. Keeping them in a map instead of as top-level fields means you can add new tags without a schema change. You pay a small serialization cost, but it is negligible compared to the operational overhead of coordinating schema updates.
Third, we avoid nested records. Avro supports them, but flat schemas serialize faster, are easier to query in downstream SQL systems, and play nicer with Kafka Connect sinks. For metrics specifically, flat is almost always the right call.
BACKWARD means new consumers can read old messages, FORWARD means old consumers can read new messages, and FULL means both. For metrics, BACKWARD is usually enough, but make sure your team agrees on the mode before anyone deploys the first producer. Changing compatibility mode on a running topic is a minor nightmare.
Building the Kafka Producer
Now we put the collector and the schema together into a real producer. We will use confluent-kafka-python, which wraps the battle-tested librdkafka C library and is significantly faster than the pure-Python alternatives. If you are curious about the performance difference between Python and faster-compiled languages for this kind of work, the Python vs Rust comparison guide is a good read, but for metric producers Python is almost always fast enough if you use the right client.
import json
import logging
import signal
import sys
import time
from dataclasses import asdict
from confluent_kafka import Producer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import (
SerializationContext,
MessageField,
StringSerializer,
)
from collector import MetricCollector, MetricSample
log = logging.getLogger("kafka-metrics")
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
TOPIC = "server.metrics.v1"
def load_schema(path: str) -> str:
with open(path) as f:
return f.read()
def to_dict(sample: MetricSample, ctx) -> dict:
return asdict(sample)
def delivery_report(err, msg):
if err is not None:
log.error("delivery failed for key=%s: %s", msg.key(), err)
# Success path is intentionally silent — we would drown in logs otherwise.
def build_producer() -> Producer:
conf = {
"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
"client.id": "metric-collector",
# Durability
"acks": "all",
"enable.idempotence": True,
"max.in.flight.requests.per.connection": 5,
"retries": 10_000_000,
"delivery.timeout.ms": 120_000,
# Throughput
"linger.ms": 20,
"batch.size": 65_536,
"compression.type": "lz4",
# Memory bound
"queue.buffering.max.messages": 100_000,
"queue.buffering.max.kbytes": 1_048_576,
}
return Producer(conf)
def main():
sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_serializer = AvroSerializer(
schema_registry_client=sr_client,
schema_str=load_schema("schemas/server_metric.avsc"),
to_dict=to_dict,
)
key_serializer = StringSerializer("utf_8")
producer = build_producer()
collector = MetricCollector(interval_seconds=1.0)
running = True
def shutdown(signum, frame):
nonlocal running
log.info("shutdown requested, flushing producer...")
running = False
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
next_tick = time.monotonic()
try:
while running:
sample = collector.sample()
key = key_serializer(sample.host)
value = avro_serializer(
sample,
SerializationContext(TOPIC, MessageField.VALUE),
)
producer.produce(
topic=TOPIC,
key=key,
value=value,
timestamp=sample.timestamp_ms,
on_delivery=delivery_report,
)
# Serve delivery callbacks without blocking.
producer.poll(0)
next_tick += collector.interval
sleep_for = next_tick - time.monotonic()
if sleep_for > 0:
time.sleep(sleep_for)
else:
# Fell behind; log once and resync.
log.warning("collector is behind by %.3fs", -sleep_for)
next_tick = time.monotonic()
finally:
remaining = producer.flush(timeout=30)
if remaining > 0:
log.error("%d messages undelivered at shutdown", remaining)
sys.exit(1)
log.info("clean shutdown")
if __name__ == "__main__":
main()
Let me walk through the config choices because each one is doing real work.
acks=all tells the broker to wait until all in-sync replicas have written the message before acknowledging. Combined with enable.idempotence=true, this gives you exactly-once semantics at the producer level — retries will not duplicate messages even if the network drops an ack. This is the single most important configuration for durability, and unless you are running a quick throwaway demo you should never turn it off.
linger.ms=20 tells the producer to wait up to 20 milliseconds before sending a batch, even if the batch is not full. This is a throughput-versus-latency trade. For metrics at 1Hz this adds negligible latency but can increase throughput by a factor of 5–10 because you are amortizing network and serialization overhead across many records.
batch.size=65536 sets the maximum size of a single batch. With 20ms of linger and a reasonable message rate, each batch typically fills up before the timer fires.
compression.type=lz4 is, in my experience, the best default for metrics. It compresses well on the kind of repetitive numeric data metrics produce (often 3–5x), and it is faster than both snappy and zstd at reasonable compression levels. You can benchmark on your own data to confirm, but lz4 rarely loses.
The table below summarizes how these config choices trade off, along with common alternatives:
| Setting | Value | Tradeoff |
|---|---|---|
acks |
all | Durability over latency. Worth every millisecond. |
enable.idempotence |
true | Exactly-once producer semantics. No duplicates on retry. |
linger.ms |
20 | Up to 20ms extra latency for 5–10x throughput. |
compression.type |
lz4 | Fastest high-ratio compression for numeric data. |
batch.size |
65,536 | Large batches amortize network costs. |
max.in.flight |
5 | Max allowed with idempotence. Higher values are rejected. |
Partitioning Strategy for Time Series
Choosing the wrong partition key is the most common and most painful mistake in a Kafka time series deployment. The problem is that partitioning has two competing goals: you want records from the same logical entity to land on the same partition so their order is preserved, and you want load to be spread evenly across partitions so no single partition becomes a hotspot. For time series, one instinct people have is to use the timestamp. Do not use the timestamp as a partition key. A monotonic timestamp creates a pathological pattern where every new record goes to whichever partition is currently hottest, producing a rolling hot spot that shifts across partitions over time.
The partition keys that actually work for multivariate server metrics are all variations on the same idea: key by the source of the data. Here are the main options:
| Strategy | Good for | Watch out for |
|---|---|---|
| hostname | Most fleets. Preserves per-host ordering. | Imbalance if one host is much busier. |
| cluster_id + hostname | Multi-tenant setups where clusters are the billing unit. | Cluster-sized hot spots. |
| metric_family | When consumers only care about one family. | Small number of partitions — only as many as families. |
| random/sticky | Perfectly even load, no ordering needs. | Loses per-host ordering. |
| timestamp | Never. | Rolling hot spots, reprocessing nightmares. |
For almost every deployment I have worked on, partition by hostname is the right default. It preserves per-host ordering (which matters because consumers often do stateful things per host, like anomaly detection), and it spreads load evenly as long as your partition count is reasonably larger than your host count. The modern Kafka client defaults to the “sticky partitioner” for records without a key, which is a nice throughput optimization, but since we are providing a key it does not apply — our records go to hash(hostname) % partition_count.
One thing I strongly recommend: set your partition count to a round number that is comfortably larger than your current fleet and grows in fives or tens. Thirty, fifty, a hundred — not twenty-three or forty-seven. Kafka supports adding partitions to a topic, but doing so is disruptive because it changes the hash mapping for keyed records. Start with headroom.
Topic Design and Retention
Should you use one topic for all metrics, or a topic per metric family? The answer for multivariate time series is almost always one topic. The whole point of capturing correlated signals together is that downstream consumers want them together. Splitting them into separate topics means every consumer has to join across topics to reconstruct a sample, which is exactly the complexity we are paying Kafka to help us avoid.
The exceptions are rare but real. If you have fundamentally different data types with different retention or sizing — for example, high-frequency metrics and low-frequency events — it is reasonable to put them in separate topics, because you probably want different retention policies for them. But within “host metrics” itself, one topic is the answer.
Here is a reasonable topic configuration for a production multivariate metrics topic, applied with kafka-topics.sh:
kafka-topics.sh --bootstrap-server kafka-1:9092 \
--create \
--topic server.metrics.v1 \
--partitions 50 \
--replication-factor 3 \
--config retention.ms=259200000 \
--config segment.bytes=536870912 \
--config compression.type=producer \
--config min.insync.replicas=2 \
--config cleanup.policy=delete \
--config max.message.bytes=1048576
The important knobs here: retention.ms=259200000 keeps data for three days, which is enough to reprocess into a new sink or recover from a downstream outage without filling up broker disks. segment.bytes=536870912 (512 MiB) controls when a new log segment is rolled; larger segments mean fewer files and faster startup but slower cleanup granularity. compression.type=producer tells the broker to store messages in whatever format the producer sent, which avoids pointless decompress/recompress cycles. min.insync.replicas=2 combined with acks=all on the producer is what actually gives you durability — acks=all alone is a lie if you only have one replica in sync.
Finally, cleanup.policy=delete is almost always correct for metrics. Log compaction (the other option) keeps the latest record per key, which makes sense for changelog streams but is nonsense for time series where every record is important.
Consumer Patterns and Downstream Sinks
Once data is in Kafka, consumers are comparatively straightforward. Here is a minimal consumer that reads multivariate samples and writes them to InfluxDB. For more on that pipeline end to end, the article on InfluxDB to Iceberg with Telegraf covers the long-term storage side in depth.
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from influxdb_client import InfluxDBClient, Point, WriteOptions
TOPIC = "server.metrics.v1"
sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_deser = AvroDeserializer(schema_registry_client=sr_client)
consumer = Consumer({
"bootstrap.servers": "kafka-1:9092",
"group.id": "influxdb-sink",
"auto.offset.reset": "latest",
"enable.auto.commit": False,
"max.poll.interval.ms": 300_000,
"session.timeout.ms": 30_000,
})
consumer.subscribe([TOPIC])
influx = InfluxDBClient(url="http://influxdb:8086", token="...", org="aic")
write_api = influx.write_api(write_options=WriteOptions(batch_size=5_000, flush_interval=2_000))
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"consumer error: {msg.error()}")
continue
record = avro_deser(
msg.value(),
SerializationContext(msg.topic(), MessageField.VALUE),
)
point = (
Point("server_metrics")
.tag("host", record["host"])
.field("cpu_percent", record["cpu_percent"])
.field("mem_percent", record["mem_percent"])
.field("disk_read_bps", record["disk_read_bytes_per_sec"])
.field("disk_write_bps", record["disk_write_bytes_per_sec"])
.field("net_rx_bps", record["net_rx_bytes_per_sec"])
.field("net_tx_bps", record["net_tx_bytes_per_sec"])
.field("load_1m", record["load_1m"])
.time(record["timestamp_ms"], "ms")
)
write_api.write(bucket="metrics", record=point)
consumer.commit(msg, asynchronous=True)
finally:
consumer.close()
write_api.close()
influx.close()
A few consumer-side details that matter. We disable auto-commit because we want commits to be tied to successful writes downstream — the pattern is “write, then commit the offset you just wrote” — which gives you at-least-once semantics end to end. We use the InfluxDB write API with batching for the same reason we batch at the producer: per-record writes are slow, batches are fast.
For more sophisticated consumers — especially anything that needs windowing, joins, or complex event patterns — you graduate from “plain consumer” to a full stream processor. Flink CEP is my usual go-to; the Flink CEP pipeline guide walks through exactly the kind of pattern you would build on top of this Kafka topic.
Production Concerns
Everything above works in a demo. To run it in production, you need to sweat five more things: monitoring consumer lag, handling backpressure at the producer, handling broker failures, managing exactly-once semantics, and graceful capacity management.
Consumer lag is the single most important metric you will monitor on this pipeline. It tells you whether your consumers are keeping up with producers. The standard tool is kafka-consumer-groups.sh, but for continuous monitoring you want Kafka’s built-in JMX metrics or a tool like Burrow or Kafka Exporter feeding Prometheus. Alert on sustained lag growth, not on absolute lag values — a transient bump during a deployment is normal, but a lag that has been growing for five minutes is a problem.
Backpressure at the producer shows up as a full internal queue. In confluent-kafka-python, producer.produce() will raise BufferError when the queue is full. You have two choices: block until space is available (which eventually blocks the metric collector), or drop samples (which gives you gaps but keeps the collector responsive). For metrics, I usually prefer the first option up to some bounded timeout, because dropped samples can hide incidents. Here is the pattern:
from confluent_kafka import KafkaException
def produce_with_backpressure(producer, topic, key, value, ts):
for attempt in range(3):
try:
producer.produce(
topic=topic, key=key, value=value, timestamp=ts,
on_delivery=delivery_report,
)
return
except BufferError:
# Internal queue is full; poll to serve callbacks and drain.
producer.poll(0.5)
log.error("dropping sample for %s after 3 backpressure retries", key)
Broker failures are handled automatically by the client if you have configured things correctly. With acks=all, enable.idempotence=true, and retries set to effectively infinite, a broker going down just causes the producer to hold messages in its buffer and retry until a new leader is elected. The delivery.timeout.ms setting is your ultimate deadline — messages older than that are considered failed and returned through the delivery callback.
Exactly-once semantics is overloaded terminology. The producer gives you exactly-once to the broker with idempotence. End-to-end exactly-once from producer to downstream sink requires the sink to be idempotent too — either because it is naturally idempotent (upserts, deduplication by key+timestamp) or because it participates in Kafka transactions. For metrics, you almost never need full transactions; at-least-once plus an idempotent sink (InfluxDB’s write API is one) is usually enough, because writing the same point twice just overwrites with the same value.
Benchmarks and Real Numbers
Abstract talk about throughput is unsatisfying, so let me share some numbers from a setup I benchmarked recently: a three-broker Kafka cluster on Confluent Cloud Essentials equivalent hardware, a Python producer running on a c6i.large EC2 instance, samples of roughly 350 bytes each (before compression), partition count of 50. These are not the Kafka team’s published numbers — they are what a realistic Python producer with the config in this post actually achieves.
| Configuration | Throughput (msg/s) | p50 latency | p99 latency |
|---|---|---|---|
| No batching, no compression | ~8,000 | 4 ms | 35 ms |
| linger.ms=5, snappy | ~42,000 | 7 ms | 28 ms |
| linger.ms=20, lz4 | ~95,000 | 22 ms | 48 ms |
| linger.ms=50, lz4, 128KB batches | ~140,000 | 51 ms | 92 ms |
A few observations. First, batching and compression together produce a roughly 12–17x throughput improvement over the naive config. Second, the latency cost is real but small — even at the most aggressive setting, the p99 is under 100ms, which for metrics is entirely fine. Third, a single Python producer on modest hardware can sustain tens of thousands of messages per second, which means one producer can easily handle a fleet of hundreds or thousands of hosts at 1Hz sampling. You do not need to run one producer per server if you would rather aggregate.
Compression ratios on metric data are also worth noting. Our 350-byte raw records compressed to about 85 bytes under lz4 — a 4.1x reduction — which means the network cost and broker disk cost drop proportionally. On a large fleet this is the single biggest savings in the whole pipeline.
confluent-kafka-python are conservative. Setting linger.ms, batch.size, and compression.type is the difference between a producer that maxes out at 8k msg/s and one that cruises at 100k+ msg/s. Tune these three first, everything else second.
Frequently Asked Questions
Why Kafka instead of writing directly to InfluxDB or TimescaleDB?
Direct-to-database works until something breaks. When the database is down for maintenance, your collector crashes or backs up. When you want to add a second consumer — say, an alerting service — you either double-write from the collector (error-prone) or read back from the database (slow and fragile). Kafka puts a durable, replayable buffer between producers and consumers, which decouples the failure modes of the two sides. For a small single-sink deployment, direct writes are fine. For anything where observability matters during incidents, Kafka is worth the extra moving part.
How many messages per second can a single Python producer handle?
With the config in this post (linger.ms=20, lz4 compression, 64KB batches), a single Python producer on modest hardware comfortably handles 80k–100k messages per second. This is more than enough for a fleet of thousands of hosts at 1Hz sampling. If you need more, the usual answer is not a faster producer — it is multiple producers, one per host or one per small group of hosts, which also gives you better fault isolation.
Should I use one topic or multiple topics for different metric types?
For multivariate metrics that are correlated and consumed together, use one topic. Splitting them into separate topics forces downstream consumers to join across topics, which defeats the purpose of capturing multivariate data in the first place. Use separate topics only when the data has genuinely different retention, sizing, or consumer profiles — for example, high-frequency metrics versus low-frequency events, or metrics versus logs.
How do I handle schema evolution when adding new metrics?
Set your Schema Registry compatibility mode to BACKWARD. When adding a field, give it a default value in the Avro schema. This lets new consumers read old messages (with the default filled in) and lets old consumers safely ignore the new field. Deploy the schema change to the registry first, then deploy the producer change, then deploy the consumer change — in that order. Never remove a field without first making sure no active consumer reads it.
What partitioning key should I use for multivariate time series?
Partition by hostname (or instance ID) in almost every case. This preserves per-host ordering, which is what stateful consumers like anomaly detectors need, and it distributes load evenly as long as your partition count is comfortably larger than your host count. Never use the timestamp as a partition key — monotonic timestamps create rolling hot spots where each new batch of records lands on the same partition.
Related Reading
Conclusion
Building a Kafka-based engine for multivariate time series is one of those projects that looks like overkill on day one and turns out to be foundational by month three. The core ideas are simple: collect correlated signals together, serialize them with a schema, partition by source, tune the producer for throughput, and let Kafka be the durable spine that decouples your collectors from your consumers. Everything else — the exact choice of time series database, the streaming framework you run on top, the anomaly detectors and dashboards — is a downstream decision you can change without touching the engine itself. That decoupling is the real product you are building, not any individual pipe in the diagram.
If I had to leave you with three specific things to do after reading this, they would be: set acks=all and enable.idempotence=true on every producer you ever run; partition by hostname, not timestamp; and always put your schemas in a Schema Registry with BACKWARD compatibility. Those three choices alone prevent most of the outages I have seen on observability pipelines over the years. The rest of this post is optimization and polish — nice to have, but not life-or-death.
The final thing worth saying is that this engine is a starting point, not an endpoint. Once you have multivariate metrics flowing reliably through Kafka, the interesting work begins: anomaly detection, capacity forecasting, automated remediation, correlation with business metrics. Kafka is the boring, reliable infrastructure that makes all of that possible. Build it well, leave it alone, and it will quietly run for years while you build smarter things on top.
References
- Apache Kafka Documentation — the canonical reference for broker configuration, topic design, and client semantics.
- confluent-kafka-python — the Python client used throughout this guide, built on librdkafka.
- Confluent Schema Registry — Avro schema management, compatibility modes, and evolution rules.
- Apache Avro Specification — the full schema language, including default values and schema resolution rules.
- psutil documentation — cross-platform process and system metric collection for Python.
- Kafka Producer Configuration Reference — authoritative list of every producer setting mentioned in this post.
Leave a Reply