Skill

Build Robust Kafka Consumers

Expert guidance for building Apache Kafka consumers with manual offset management, retry logic, dead-letter queues, and performance tuning in Java and Python.

Works with kafka

9
Spark score
out of 100
Updated 6 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Automate the creation of scalable and reliable Apache Kafka consumers. This asset helps you design and implement consumers for various use cases, ensuring efficient data processing and error handling.

Outcomes

What it gets done

01

Configure essential Kafka consumer properties for high availability and performance.

02

Implement basic and advanced Java consumer patterns with manual commit and retry logic.

03

Develop Python Kafka consumers with integrated retry mechanisms and dead-letter queue support.

04

Optimize consumers for throughput, latency, and at-least-once processing guarantees.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-kafka-consumer-builder | bash

Capabilities

What this skill does

Generate code

Writes source code or scripts from a description.

ETL & sync

Moves and transforms data between systems on a schedule.

Debug

Traces errors to their root cause and suggests fixes.

Review code

Analyzes code for bugs, style issues, and improvements.

Overview

Kafka Consumer Builder

What it does

Big job: Build production-grade Apache Kafka consumers that reliably process streaming data with at-least-once guarantees, graceful error handling, and optimal throughput.

Small job: Configure consumer groups, implement manual commit patterns, add exponential backoff retry logic, and route failed messages to dead-letter queues.

The skill provides battle-tested implementation patterns. For example, a robust Java consumer with manual commit:

public class RobustKafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private volatile boolean running = true;
    
    public RobustKafkaConsumer(String bootstrapServers, String groupId, String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        
        this.consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList(topic));
        
        while (running) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    processRecord(record);
                }
                
                if (!records.isEmpty()) {
                    consumer.commitSync();
                }
                
            } catch (WakeupException e) {
                break;
            } catch (Exception e) {
                log.error("Error processing records", e);
            }
        }
    }
}

It covers essential properties (bootstrap.servers, group.id, deserializers), performance settings (max.poll.records, session.timeout.ms, heartbeat.interval.ms), and advanced patterns including retry with exponential backoff and dead-letter queue routing for both Java and Python implementations.

Source README

Kafka Consumer Builder Expert

You are an expert in building robust, scalable Apache Kafka consumers. You understand consumer group management, offset handling, serialization, error recovery, and performance optimization. You can design consumers for various use cases including real-time processing, batch processing, and event sourcing patterns.

Core Consumer Configuration Principles

Essential Properties

  • bootstrap.servers: Always use multiple brokers for high availability
  • group.id: Choose meaningful names that reflect business purpose
  • key.deserializer/value.deserializer: Match producer serialization format
  • enable.auto.commit: Set to false for at-least-once processing guarantees
  • auto.offset.reset: Use "earliest" for reprocessing, "latest" for real-time only

Performance and Reliability Settings

  • fetch.min.bytes: Increase for higher throughput (default 1, consider 1024-10240)
  • fetch.max.wait.ms: Balance latency vs throughput (default 500ms)
  • max.poll.records: Tune based on message processing time (default 500)
  • session.timeout.ms: Set based on processing complexity (10-30 seconds)
  • heartbeat.interval.ms: Should be 1/3 of session.timeout.ms

Java Consumer Implementation Patterns

Basic Consumer with Manual Commit

public class RobustKafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private volatile boolean running = true;
    
    public RobustKafkaConsumer(String bootstrapServers, String groupId, String topic) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
        
        this.consumer = new KafkaConsumer<>(props);
        this.topic = topic;
    }
    
    public void consume() {
        consumer.subscribe(Arrays.asList(topic));
        
        while (running) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                
                for (ConsumerRecord<String, String> record : records) {
                    processRecord(record);
                }
                
                if (!records.isEmpty()) {
                    consumer.commitSync(); // Commit after successful processing
                }
                
            } catch (WakeupException e) {
                break; // Expected for graceful shutdown
            } catch (Exception e) {
                log.error("Error processing records", e);
                // Implement retry logic or dead letter queue here
            }
        }
    }
    
    private void processRecord(ConsumerRecord<String, String> record) {
        // Your business logic here
        log.info("Processing: key={}, value={}, partition={}, offset={}", 
                record.key(), record.value(), record.partition(), record.offset());
    }
    
    public void shutdown() {
        running = false;
        consumer.wakeup();
    }
}

Advanced Consumer with Retry and Dead Letter Queue

public class AdvancedConsumerWithRetry {
    private final KafkaConsumer<String, String> consumer;
    private final KafkaProducer<String, String> deadLetterProducer;
    private final String deadLetterTopic;
    private final int maxRetries = 3;
    
    public void processWithRetry() {
        consumer.subscribe(Arrays.asList("input-topic"));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            
            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
            
            for (ConsumerRecord<String, String> record : records) {
                boolean success = false;
                int attempts = 0;
                
                while (!success && attempts < maxRetries) {
                    try {
                        processRecord(record);
                        success = true;
                    } catch (Exception e) {
                        attempts++;
                        log.warn("Processing failed, attempt {} of {}", attempts, maxRetries, e);
                        
                        if (attempts < maxRetries) {
                            try {
                                Thread.sleep(1000 * attempts); // Exponential backoff
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                    }
                }
                
                if (!success) {
                    sendToDeadLetterQueue(record);
                }
                
                // Track offset for manual commit
                offsetsToCommit.put(
                    new TopicPartition(record.topic(), record.partition()),
                    new OffsetAndMetadata(record.offset() + 1)
                );
            }
            
            if (!offsetsToCommit.isEmpty()) {
                consumer.commitSync(offsetsToCommit);
            }
        }
    }
}

Python Consumer Implementation

Kafka-Python Consumer

from kafka import KafkaConsumer, KafkaProducer
import json
import logging
import time
from typing import Dict, Any

class RobustKafkaConsumer:
    def __init__(self, bootstrap_servers: str, group_id: str, topic: str):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda x: json.loads(x.decode('utf-8')),
            key_deserializer=lambda x: x.decode('utf-8') if x else None,
            max_poll_records=100,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000
        )
        
        self.dead_letter_producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda x: json.dumps(x).encode('utf-8')
        )
        
        self.running = True
        self.max_retries = 3
    
    def consume(self):
        try:
            for message in self.consumer:
                if not self.running:
                    break
                    
                success = self.process_with_retry(message)
                
                if success:
                    self.consumer.commit()
                else:
                    self.send_to_dlq(message)
                    self.consumer.commit()  # Commit even failed messages after DLQ
                    
        except KeyboardInterrupt:
            logging.info("Shutting down consumer...")
        finally:
            self.consumer.close()
            self.dead_letter_producer.close()
    
    def process_with_retry(self, message) -> bool:
        for attempt in range(self.max_retries):
            try:
                self.process_message(message.value, message.key, message.partition, message.offset)
                return True
            except Exception as e:
                logging.warning(f"Processing failed, attempt {attempt + 1}/{self.max_retries}: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
        return False
    
    def process_message(self, value: Dict[Any, Any], key: str, partition: int, offset: int):
        # Your business logic here
        logging.info(f"Processing: key={key}, partition={partition}, offset={offset}")
        # Simulate processing
        if value.get('should_fail'):
            raise ValueError("Simulated processing error")
    
    def send_to_dlq(self, message):
        dlq_record = {
            'original_topic': message.topic,
            'original_partition': message.partition,
            'original_offset': message.offset,
            'original_key': message.key,
            'original_value': message.value,
            'failed_at': time.time()
        }
        
        self.dead_letter_producer.send('dead-letter-queue', dlq_record)
        logging.info(f"Sent message to DLQ: {message.key}")

Consumer Group Management Best Practices

Rebalancing Optimization

  • Implement ConsumerRebalanceListener to handle partition assignment changes gracefully
  • Use cooperative-sticky partition assignment strategy for minimal disruption
  • Ensure processing completes before rebalancing timeout

Offset Management Strategies

  • At-least-once: Commit after successful processing (manual commit)
  • At-most-once: Enable auto-commit before processing
  • Exactly-once: Use transactional consumers with idempotent processing

Monitoring and Observability

Key Metrics to Track

  • Consumer lag per partition
  • Processing rate (messages/second)
  • Error rate and retry counts
  • Rebalancing frequency
  • Commit success/failure rates

Health Check Implementation

public class ConsumerHealthCheck {
    private final KafkaConsumer<String, String> consumer;
    private volatile long lastPollTime = System.currentTimeMillis();
    
    public boolean isHealthy() {
        long timeSinceLastPoll = System.currentTimeMillis() - lastPollTime;
        return timeSinceLastPoll < 60000; // Consider unhealthy if no poll in 1 minute
    }
    
    public Map<String, Object> getMetrics() {
        Map<String, Object> metrics = new HashMap<>();
        
        // Get consumer metrics
        consumer.metrics().forEach((metricName, metric) -> {
            if (metricName.name().contains("lag") || 
                metricName.name().contains("rate") ||
                metricName.name().contains("total")) {
                metrics.put(metricName.name(), metric.metricValue());
            }
        });
        
        return metrics;
    }
}

Common Pitfalls and Solutions

Avoid These Mistakes

  • Don't use auto-commit with long-running message processing
  • Don't ignore consumer rebalancing events
  • Don't commit offsets for failed message processing
  • Don't use synchronous processing for high-throughput scenarios without batching

Performance Optimization Tips

  • Batch message processing when possible
  • Use appropriate deserializer configurations
  • Tune fetch sizes based on message size and processing capacity
  • Implement proper connection pooling for downstream systems
  • Consider using multiple consumer instances for CPU-intensive processing

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.