Producers & Consumers

7 min read

Reading Progress0%
Streaming Systems Index
Streaming Systems Index

Producers & Consumers

1. What Is It?

A is a client that sends records to topics. A is a client that reads records from topics. They are the two ends of the log — producers append, consumers read at their own pace from any . Both are libraries (not separate services): your application links the Kafka client and talks directly to brokers.

The problem they solve: every other moving piece (topics, partitions, , offsets) only matters because there are producers writing and consumers reading. Mastering the client semantics — batching, acks, idempotence, poll loops, commits, rebalances — is what turns a working Kafka pipeline into a correct, fast, and recoverable one. Almost every production Kafka bug lives in the client, not the .

QUICK CHECK

Your team is debugging a Kafka-based order processing pipeline. Records are being duplicated and offsets are being committed at the wrong time, causing consumers to re-read messages after restarts. A junior engineer suggests the bug must be inside the Kafka broker. Where is this class of bug most likely to actually reside, and why?

Choose one answer

2. How It Works

Producer mechanics

  1. App calls .send(record).
  2. Client serializes the record, looks up the leader from cached metadata (refreshing on errors), and buffers the record in an in-memory accumulator grouped by .
  3. A background sender thread flushes batches to the when (a) the batch reaches batch.size, (b) linger.ms elapses, or (c) the buffer is full.
  4. responds with success (and the ) once the configured acks policy is satisfied. The invokes the callback / completes the future.
  5. On retriable error, the client retries up to delivery.timeout.ms. With enable.idempotence=true retries do not create duplicates within a single producer session.

Consumer mechanics

  1. subscribes to one or more topics: .subscribe(List.of("orders")).
  2. The consumer joins a ; the group coordinator broker assigns it a subset of the partitions.
  3. App runs a poll loop: consumer.poll(Duration.ofMillis(500)) returns a batch of records.
  4. App processes the batch, then commits offsets — either automatically (enable.auto.commit=true) at auto.commit.interval.ms, or explicitly (consumer.commitSync() / commitAsync()).
  5. The consumer holds onto its partition assignments until it leaves the group, fails its heartbeat (session.timeout.ms), or another rebalance fires.

Concrete producer (Java, 3.x):

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

try (Producer<String, String> producer = new KafkaProducer<>(props)) {
    producer.send(
        new ProducerRecord<>("orders", orderId, payload),
        (metadata, ex) -> {
            if (ex != null) handleSendFailure(orderId, ex);
        }
    );
}

Concrete consumer (Java, 3.x):

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "fraud-scoring");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(List.of("orders"));
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
        processBatch(records);
        consumer.commitSync();
    }
}

static void processBatch(ConsumerRecords<String, String> records) {
    for (ConsumerRecord<String, String> record : records) {
        handleOrder(record.key(), record.value());
    }
}
QUICK CHECK

A Kafka consumer service processes orders and commits offsets manually with commitSync() after each batch. Midway through processing a large batch, the application crashes before reaching the commit call. What happens when the consumer restarts?

Choose one answer

3. What Mid-Senior SWEs Actually Need to Know

essentials:

  • Always set acks=all + enable.idempotence=true for any where you care about durability. Default in modern clients.
  • linger.ms and batch.size control batching. Tiny linger.ms = low latency, low throughput. Larger values amortize per-batch overhead and compression.
  • is multi-threaded internally; safe to share across app threads. Don't create one per request.
  • send() is async by default. Use the callback or future; calling .get() on every send makes it synchronous and crashes throughput.
  • surfaces as BufferExhaustedException or blocking in send() when buffer.memory is full. Tune max.block.ms to fail fast vs back off.

essentials:

  • The poll loop drives heartbeats. If you take longer than max.poll.interval.ms between polls, the assumes you're dead and triggers a rebalance. Process inside the poll loop; do not block on external IO for minutes between polls.
  • Auto-commit is a footgun for . It commits the last polled every interval — including records you haven't finished processing — so a crash mid-batch loses messages. For correctness, set enable.auto.commit=false and commit explicitly after processing.
  • auto..reset controls behavior on first read or after offset deletion: earliest replays from the beginning; latest skips to the tail. Wrong choice here causes either replay storms or silent data loss.
  • Rebalances pause all consumers in the group. Frequent rebalances (long GC, slow poll, members leaving) kill throughput. Use the cooperative sticky assignor (second in the default list after the range assignor in modern clients) to minimize disruption.
  • Common misunderstanding: "The reads in real-time as messages arrive." It reads when you call poll(). If your app is busy, lag grows. Lag is the operational metric, not "is the consumer running?"

Operational signals you should monitor:

  • Producer: send error rate, in-flight requests, batch size, request latency.
  • Consumer: lag (offset gap from log-end), poll interval, rebalance rate, commit latency.
QUICK CHECK

A backend service uses a Kafka consumer with enable.auto.commit=true (the default). The service polls a batch of 50 messages, then processes each one by writing to a database. Halfway through the batch, the service crashes. What happens to the unprocessed messages when the consumer restarts?

Choose one answer

4. Tradeoffs & Decisions

If you need...Set...Tradeoff
Lowest latency producerlinger.ms=0, small batch.sizeLower throughput, more broker requests
Highest throughput producerlinger.ms=5–50, batch.size=64KB+, compression.type=lz4Higher per-record latency
At-least-once correctnessacks=all, enable.idempotence=true, sync send + retriesSlightly higher latency vs acks=1
At-least-once consumerenable.auto.commit=false, commit after processManual commit logic; ensure idempotent sink
Fast catch-up on a large lagHigher max.poll.records and fetch.max.bytesLarger heap per consumer; longer per-poll processing
Stable consumer group (avoid rebalances)Long max.poll.interval.ms, cooperative sticky assignorDead consumers detected later

Key tradeoff for producers: latency vs throughput vs durability — all three move together when you change linger.ms, acks, and batch.size. Pick which one matters for the .

Key tradeoff for consumers: batch size vs latency vs rebalance risk — bigger batches amortize per-poll cost but make a single slow record block heartbeats; smaller batches stay responsive but burn CPU on per-poll overhead.

QUICK CHECK

A backend service publishes audit log events to Kafka. The events are non-time-sensitive, but the team wants to minimize broker load and network overhead. Which producer configuration best achieves this goal, and what is the key tradeoff?

Choose one answer

5. Interview & System Design Cheat Sheet

  • A is a batching, asynchronous client — its key knobs are acks, enable.idempotence, linger.ms, batch.size, compression.type.
  • A is a stateful, polling client — its key knobs are enable.auto.commit, auto..reset, max.poll.records, max.poll.interval.ms, group ID, assignor.
  • The poll loop drives heartbeats: do not block external IO between polls for longer than max.poll.interval.ms, or you trigger a rebalance.
  • Default is on the side; correctness comes from enable.auto.commit=false + commit-after-process + idempotent downstream writes.
  • Lag is the single most useful health metric for a consumer.

Common follow-ups:

  • "Why is my consumer constantly?" — Almost always: max.poll.interval.ms exceeded because a record took too long to process. Either speed up processing, reduce max.poll.records, or move slow work off the poll loop.
  • "How do I make a synchronous?" — Call .get() on the returned future. Don't — except in tests; it destroys throughput.
  • "How do producers know which leader to send to?" — Bootstrap to any , fetch metadata (returns leader map), then directly target leader brokers. Metadata is refreshed on errors and on metadata.max.age.ms.

If asked to design X, anchor on this: Sketch the producer config (acks, idempotence, batching), the layout (group ID, assignment, poll cadence), and where commits happen relative to side effects. Those three sentences are usually 80% of a "design a pipeline" answer.

QUICK CHECK

A Kafka consumer in your backend pipeline is triggering frequent rebalances. Investigating the logs, you notice that processing each batch of records occasionally takes 45 seconds, but max.poll.interval.ms is set to 30 seconds. Which of the following changes would directly resolve the rebalancing issue?

Choose one answer