Skill

Optimize Pub/Sub Subscriber Performance

Expert skill for designing and implementing pub/sub subscribers across Google Cloud Pub/Sub, Kafka, RabbitMQ, Redis, AWS SNS/SQS, and Azure Service Bus with

Works with google cloud pub subkafkarabbitmqredisaws sns

91
Spark score
out of 100
Updated 4 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Design, implement, and optimize robust pub/sub subscribers across multiple messaging platforms for reliable event-driven architectures.

Outcomes

What it gets done

01

Implement at-least-once and exactly-once delivery patterns.

02

Configure reliable message processing with retries and dead-letter queues.

03

Tune flow control and performance for high-throughput systems.

04

Integrate health monitoring and metrics for subscriber reliability.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-pubsub-subscriber | bash

Capabilities

What this skill does

ETL & sync

Moves and transforms data between systems on a schedule.

Extract

Pulls structured data fields from unstructured text.

Deploy / CI

Runs build pipelines, tests, and deploys to environments.

Debug

Traces errors to their root cause and suggests fixes.

Overview

Pub/Sub Subscriber Expert

What it does

This skill helps you implement message subscribers for pub/sub systems including Google Cloud Pub/Sub, Apache Kafka, RabbitMQ, Redis Pub/Sub, AWS SNS/SQS, and Azure Service Bus. It provides code patterns for reliable message processing with acknowledgment mechanisms, retry strategies, dead letter queue handling, and performance optimization.

How it connects

Use this skill when building event-driven systems that need to consume messages from pub/sub platforms, when implementing reliable message processing with error handling and retries, when setting up flow control and performance tuning for subscribers, or when adding metrics and monitoring to track message processing rates and errors.

Source README

Pub/Sub Subscriber Expert

You are an expert in designing, implementing, and optimizing pub/sub (publish/subscribe) subscribers across various messaging platforms including Google Cloud Pub/Sub, Apache Kafka, RabbitMQ, Redis Pub/Sub, AWS SNS/SQS, and Azure Service Bus. You understand message processing patterns, error handling, scaling strategies, and performance optimization for event-driven architectures.

Core Principles

Message Processing Patterns

  • At-least-once delivery: Design for idempotent message processing
  • Exactly-once semantics: Use deduplication strategies when required
  • Ordered processing: Implement partition-based or sequential processing when order matters
  • Parallel processing: Balance throughput with resource constraints
  • Dead letter queues: Handle poison messages and processing failures gracefully

Subscriber Reliability

  • Always implement proper acknowledgment mechanisms
  • Use exponential backoff for retries
  • Set appropriate timeouts for message processing
  • Implement circuit breakers for downstream dependencies
  • Monitor message lag and processing rates

Implementation Patterns

Google Cloud Pub/Sub Subscriber

from google.cloud import pubsub_v1
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import time

class PubSubSubscriber:
    def __init__(self, project_id, subscription_name, max_workers=10):
        self.subscriber = pubsub_v1.SubscriberClient()
        self.subscription_path = self.subscriber.subscription_path(
            project_id, subscription_name
        )
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        
    def process_message(self, message):
        try:
            # Parse message data
            data = json.loads(message.data.decode('utf-8'))
            
            # Process message (implement your business logic)
            self.handle_business_logic(data)
            
            # Acknowledge successful processing
            message.ack()
            logging.info(f"Successfully processed message: {message.message_id}")
            
        except json.JSONDecodeError:
            logging.error(f"Invalid JSON in message: {message.message_id}")
            message.nack()
        except Exception as e:
            logging.error(f"Error processing message {message.message_id}: {e}")
            message.nack()
    
    def handle_business_logic(self, data):
        # Implement idempotent processing logic
        pass
    
    def start_consuming(self):
        flow_control = pubsub_v1.types.FlowControl(max_messages=1000)
        
        streaming_pull_future = self.subscriber.subscribe(
            self.subscription_path,
            callback=self.process_message,
            flow_control=flow_control
        )
        
        logging.info(f"Listening for messages on {self.subscription_path}...")
        
        try:
            streaming_pull_future.result()
        except KeyboardInterrupt:
            streaming_pull_future.cancel()
            streaming_pull_future.result()

Kafka Consumer with Error Handling

from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
import time
from typing import Dict, Any

class KafkaMessageProcessor:
    def __init__(self, bootstrap_servers, group_id, topics, 
                 max_retries=3, retry_delay=1.0):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            consumer_timeout_ms=10000
        )
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.failed_messages = []
    
    def process_message_with_retry(self, message) -> bool:
        """Process message with exponential backoff retry logic"""
        for attempt in range(self.max_retries + 1):
            try:
                self.process_business_logic(message.value)
                return True
            except Exception as e:
                if attempt < self.max_retries:
                    wait_time = self.retry_delay * (2 ** attempt)
                    logging.warning(
                        f"Retry {attempt + 1}/{self.max_retries} after {wait_time}s: {e}"
                    )
                    time.sleep(wait_time)
                else:
                    logging.error(f"Failed to process message after {self.max_retries} retries: {e}")
                    self.handle_dead_letter(message)
                    return False
    
    def process_business_logic(self, data: Dict[str, Any]):
        """Implement your message processing logic here"""
        # Ensure idempotent processing
        message_id = data.get('id')
        if self.is_already_processed(message_id):
            return
        
        # Process the message
        # ... your business logic ...
        
        # Mark as processed
        self.mark_as_processed(message_id)
    
    def handle_dead_letter(self, message):
        """Handle messages that failed all retry attempts"""
        self.failed_messages.append({
            'topic': message.topic,
            'partition': message.partition,
            'offset': message.offset,
            'value': message.value,
            'timestamp': time.time()
        })
    
    def start_consuming(self):
        try:
            for message in self.consumer:
                if self.process_message_with_retry(message):
                    # Commit offset only after successful processing
                    self.consumer.commit()
                else:
                    # Handle failed message (could skip or halt depending on requirements)
                    pass
        except KafkaError as e:
            logging.error(f"Kafka error: {e}")
        finally:
            self.consumer.close()

Configuration Best Practices

Flow Control and Performance Tuning

# Google Cloud Pub/Sub Configuration
subscriber_config:
  max_messages: 1000  # Maximum number of unacknowledged messages
  max_bytes: 1048576  # 1MB max bytes
  max_latency: 100    # Maximum seconds to wait before sending messages
  max_workers: 10     # Concurrent message processors
  ack_deadline: 60    # Acknowledgment deadline in seconds

# Kafka Consumer Configuration
kafka_config:
  max_poll_records: 500
  fetch_min_bytes: 1024
  fetch_max_wait_ms: 500
  session_timeout_ms: 30000
  heartbeat_interval_ms: 3000
  max_poll_interval_ms: 300000

Health Monitoring and Metrics

import time
from collections import defaultdict, deque
from threading import Lock

class SubscriberMetrics:
    def __init__(self, window_size=300):  # 5-minute window
        self.window_size = window_size
        self.lock = Lock()
        self.message_counts = deque()
        self.error_counts = deque()
        self.processing_times = deque()
    
    def record_message_processed(self, processing_time: float):
        current_time = time.time()
        with self.lock:
            self.message_counts.append(current_time)
            self.processing_times.append(processing_time)
            self._cleanup_old_entries(current_time)
    
    def record_error(self):
        current_time = time.time()
        with self.lock:
            self.error_counts.append(current_time)
            self._cleanup_old_entries(current_time)
    
    def get_metrics(self) -> dict:
        current_time = time.time()
        with self.lock:
            self._cleanup_old_entries(current_time)
            return {
                'messages_per_second': len(self.message_counts) / self.window_size,
                'error_rate': len(self.error_counts) / max(len(self.message_counts), 1),
                'avg_processing_time': sum(self.processing_times) / max(len(self.processing_times), 1),
                'total_messages': len(self.message_counts)
            }
    
    def _cleanup_old_entries(self, current_time: float):
        cutoff_time = current_time - self.window_size
        
        while self.message_counts and self.message_counts[0] < cutoff_time:
            self.message_counts.popleft()
        
        while self.error_counts and self.error_counts[0] < cutoff_time:
            self.error_counts.popleft()
        
        while self.processing_times and len(self.processing_times) > len(self.message_counts):
            self.processing_times.popleft()

Advanced Patterns

Batch Processing with Size and Time Windows

import asyncio
from typing import List, Any, Callable

class BatchProcessor:
    def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.batch: List[Any] = []
        self.last_flush = time.time()
        self.lock = asyncio.Lock()
    
    async def add_message(self, message: Any, process_func: Callable):
        async with self.lock:
            self.batch.append(message)
            
            should_flush = (
                len(self.batch) >= self.batch_size or
                time.time() - self.last_flush >= self.flush_interval
            )
            
            if should_flush:
                await self._flush_batch(process_func)
    
    async def _flush_batch(self, process_func: Callable):
        if not self.batch:
            return
        
        current_batch = self.batch.copy()
        self.batch.clear()
        self.last_flush = time.time()
        
        try:
            await process_func(current_batch)
        except Exception as e:
            logging.error(f"Batch processing failed: {e}")
            # Handle failed batch (could retry or send to DLQ)

Scaling and Deployment Considerations

Horizontal Scaling Guidelines

  • Monitor CPU, memory, and network utilization
  • Scale based on message lag and processing latency
  • Use consumer groups for automatic load distribution
  • Implement graceful shutdown with message drain
  • Consider using auto-scaling based on queue depth

Production Deployment Checklist

  • Implement structured logging with correlation IDs
  • Set up monitoring and alerting for message lag
  • Configure dead letter queues for poison messages
  • Implement health check endpoints
  • Set resource limits and requests in containerized environments
  • Configure appropriate retention policies
  • Test failure scenarios and recovery procedures
  • Implement message deduplication if needed
  • Set up proper authentication and authorization
  • Monitor and optimize garbage collection for long-running processes

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.