Producers & Consumers
7 min read
Streaming Systems Index
Streaming Systems Index
Producers & Consumers
1. What Is It?
A is a client that sends records to topics. A is a client that reads records from topics. They are the two ends of the log — producers append, consumers read at their own pace from any . Both are libraries (not separate services): your application links the Kafka client and talks directly to brokers.
The problem they solve: every other moving piece (topics, partitions, , offsets) only matters because there are producers writing and consumers reading. Mastering the client semantics — batching, acks, idempotence, poll loops, commits, rebalances — is what turns a working Kafka pipeline into a correct, fast, and recoverable one. Almost every production Kafka bug lives in the client, not the .
Your team is debugging a Kafka-based order processing pipeline. Records are being duplicated and offsets are being committed at the wrong time, causing consumers to re-read messages after restarts. A junior engineer suggests the bug must be inside the Kafka broker. Where is this class of bug most likely to actually reside, and why?
2. How It Works
Producer mechanics
- App calls
.send(record). - Client serializes the record, looks up the leader from cached metadata (refreshing on errors), and buffers the record in an in-memory accumulator grouped by .
- A background sender thread flushes batches to the when (a) the batch reaches
batch.size, (b)linger.mselapses, or (c) the buffer is full. - responds with success (and the ) once the configured
ackspolicy is satisfied. The invokes the callback / completes the future. - On retriable error, the client retries up to
delivery.timeout.ms. Withenable.idempotence=trueretries do not create duplicates within a single producer session.
Consumer mechanics
- subscribes to one or more topics:
.subscribe(List.of("orders")). - The consumer joins a ; the group coordinator broker assigns it a subset of the partitions.
- App runs a poll loop:
consumer.poll(Duration.ofMillis(500))returns a batch of records. - App processes the batch, then commits offsets — either automatically (
enable.auto.commit=true) atauto.commit.interval.ms, or explicitly (consumer.commitSync()/commitAsync()). - The consumer holds onto its partition assignments until it leaves the group, fails its heartbeat (
session.timeout.ms), or another rebalance fires.
Concrete producer (Java, 3.x):
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); props.put(ProducerConfig.LINGER_MS_CONFIG, 10); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); try (Producer<String, String> producer = new KafkaProducer<>(props)) { producer.send( new ProducerRecord<>("orders", orderId, payload), (metadata, ex) -> { if (ex != null) handleSendFailure(orderId, ex); } ); }
Concrete consumer (Java, 3.x):
Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-scoring"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(List.of("orders")); while (running) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500)); processBatch(records); consumer.commitSync(); } } static void processBatch(ConsumerRecords<String, String> records) { for (ConsumerRecord<String, String> record : records) { handleOrder(record.key(), record.value()); } }
A Kafka consumer service processes orders and commits offsets manually with commitSync() after each batch. Midway through processing a large batch, the application crashes before reaching the commit call. What happens when the consumer restarts?
3. What Mid-Senior SWEs Actually Need to Know
essentials:
- Always set
acks=all+enable.idempotence=truefor any where you care about durability. Default in modern clients. linger.msandbatch.sizecontrol batching. Tinylinger.ms= low latency, low throughput. Larger values amortize per-batch overhead and compression.- is multi-threaded internally; safe to share across app threads. Don't create one per request.
send()is async by default. Use the callback or future; calling.get()on every send makes it synchronous and crashes throughput.- surfaces as
BufferExhaustedExceptionor blocking insend()whenbuffer.memoryis full. Tunemax.block.msto fail fast vs back off.
essentials:
- The poll loop drives heartbeats. If you take longer than
max.poll.interval.msbetween polls, the assumes you're dead and triggers a rebalance. Process inside the poll loop; do not block on external IO for minutes between polls. - Auto-commit is a footgun for . It commits the last polled every interval — including records you haven't finished processing — so a crash mid-batch loses messages. For correctness, set
enable.auto.commit=falseand commit explicitly after processing. auto..resetcontrols behavior on first read or after offset deletion:earliestreplays from the beginning;latestskips to the tail. Wrong choice here causes either replay storms or silent data loss.- Rebalances pause all consumers in the group. Frequent rebalances (long GC, slow poll, members leaving) kill throughput. Use the cooperative sticky assignor (second in the default list after the range assignor in modern clients) to minimize disruption.
- Common misunderstanding: "The reads in real-time as messages arrive." It reads when you call
poll(). If your app is busy, lag grows. Lag is the operational metric, not "is the consumer running?"
Operational signals you should monitor:
- Producer: send error rate, in-flight requests, batch size, request latency.
- Consumer: lag (offset gap from log-end), poll interval, rebalance rate, commit latency.
A backend service uses a Kafka consumer with enable.auto.commit=true (the default). The service polls a batch of 50 messages, then processes each one by writing to a database. Halfway through the batch, the service crashes. What happens to the unprocessed messages when the consumer restarts?
4. Tradeoffs & Decisions
| If you need... | Set... | Tradeoff |
|---|---|---|
| Lowest latency producer | linger.ms=0, small batch.size | Lower throughput, more broker requests |
| Highest throughput producer | linger.ms=5–50, batch.size=64KB+, compression.type=lz4 | Higher per-record latency |
| At-least-once correctness | acks=all, enable.idempotence=true, sync send + retries | Slightly higher latency vs acks=1 |
| At-least-once consumer | enable.auto.commit=false, commit after process | Manual commit logic; ensure idempotent sink |
| Fast catch-up on a large lag | Higher max.poll.records and fetch.max.bytes | Larger heap per consumer; longer per-poll processing |
| Stable consumer group (avoid rebalances) | Long max.poll.interval.ms, cooperative sticky assignor | Dead consumers detected later |
Key tradeoff for producers: latency vs throughput vs durability — all three move together when you change linger.ms, acks, and batch.size. Pick which one matters for the .
Key tradeoff for consumers: batch size vs latency vs rebalance risk — bigger batches amortize per-poll cost but make a single slow record block heartbeats; smaller batches stay responsive but burn CPU on per-poll overhead.
A backend service publishes audit log events to Kafka. The events are non-time-sensitive, but the team wants to minimize broker load and network overhead. Which producer configuration best achieves this goal, and what is the key tradeoff?
5. Interview & System Design Cheat Sheet
- A is a batching, asynchronous client — its key knobs are
acks,enable.idempotence,linger.ms,batch.size,compression.type. - A is a stateful, polling client — its key knobs are
enable.auto.commit,auto..reset,max.poll.records,max.poll.interval.ms, group ID, assignor. - The poll loop drives heartbeats: do not block external IO between polls for longer than
max.poll.interval.ms, or you trigger a rebalance. - Default is on the side; correctness comes from
enable.auto.commit=false+ commit-after-process + idempotent downstream writes. - Lag is the single most useful health metric for a consumer.
Common follow-ups:
- "Why is my consumer constantly?" — Almost always:
max.poll.interval.msexceeded because a record took too long to process. Either speed up processing, reducemax.poll.records, or move slow work off the poll loop. - "How do I make a synchronous?" — Call
.get()on the returned future. Don't — except in tests; it destroys throughput. - "How do producers know which leader to send to?" — Bootstrap to any , fetch metadata (returns leader map), then directly target leader brokers. Metadata is refreshed on errors and on
metadata.max.age.ms.
If asked to design X, anchor on this: Sketch the producer config (acks, idempotence, batching), the layout (group ID, assignment, poll cadence), and where commits happen relative to side effects. Those three sentences are usually 80% of a "design a pipeline" answer.
A Kafka consumer in your backend pipeline is triggering frequent rebalances. Investigating the logs, you notice that processing each batch of records occasionally takes 45 seconds, but max.poll.interval.ms is set to 30 seconds. Which of the following changes would directly resolve the rebalancing issue?
Glossary History
Click dotted jargon to save explanations here.
Glossary History
Click dotted jargon to save explanations here.