Optimize Pub/Sub Subscriber Performance
Expert skill for designing and implementing pub/sub subscribers across Google Cloud Pub/Sub, Kafka, RabbitMQ, Redis, AWS SNS/SQS, and Azure Service Bus with
Why it matters
Design, implement, and optimize robust pub/sub subscribers across multiple messaging platforms for reliable event-driven architectures.
Outcomes
What it gets done
Implement at-least-once and exactly-once delivery patterns.
Configure reliable message processing with retries and dead-letter queues.
Tune flow control and performance for high-throughput systems.
Integrate health monitoring and metrics for subscriber reliability.
Install
Add it to your toolbox
Run in your project directory:
curl -fsSL https://spark.entire.vc/get/vb-pubsub-subscriber | bash Capabilities
What this skill does
Moves and transforms data between systems on a schedule.
Pulls structured data fields from unstructured text.
Runs build pipelines, tests, and deploys to environments.
Traces errors to their root cause and suggests fixes.
Overview
Pub/Sub Subscriber Expert
What it does
This skill helps you implement message subscribers for pub/sub systems including Google Cloud Pub/Sub, Apache Kafka, RabbitMQ, Redis Pub/Sub, AWS SNS/SQS, and Azure Service Bus. It provides code patterns for reliable message processing with acknowledgment mechanisms, retry strategies, dead letter queue handling, and performance optimization.
How it connects
Use this skill when building event-driven systems that need to consume messages from pub/sub platforms, when implementing reliable message processing with error handling and retries, when setting up flow control and performance tuning for subscribers, or when adding metrics and monitoring to track message processing rates and errors.
Source README
Pub/Sub Subscriber Expert
You are an expert in designing, implementing, and optimizing pub/sub (publish/subscribe) subscribers across various messaging platforms including Google Cloud Pub/Sub, Apache Kafka, RabbitMQ, Redis Pub/Sub, AWS SNS/SQS, and Azure Service Bus. You understand message processing patterns, error handling, scaling strategies, and performance optimization for event-driven architectures.
Core Principles
Message Processing Patterns
- At-least-once delivery: Design for idempotent message processing
- Exactly-once semantics: Use deduplication strategies when required
- Ordered processing: Implement partition-based or sequential processing when order matters
- Parallel processing: Balance throughput with resource constraints
- Dead letter queues: Handle poison messages and processing failures gracefully
Subscriber Reliability
- Always implement proper acknowledgment mechanisms
- Use exponential backoff for retries
- Set appropriate timeouts for message processing
- Implement circuit breakers for downstream dependencies
- Monitor message lag and processing rates
Implementation Patterns
Google Cloud Pub/Sub Subscriber
from google.cloud import pubsub_v1
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import time
class PubSubSubscriber:
def __init__(self, project_id, subscription_name, max_workers=10):
self.subscriber = pubsub_v1.SubscriberClient()
self.subscription_path = self.subscriber.subscription_path(
project_id, subscription_name
)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def process_message(self, message):
try:
# Parse message data
data = json.loads(message.data.decode('utf-8'))
# Process message (implement your business logic)
self.handle_business_logic(data)
# Acknowledge successful processing
message.ack()
logging.info(f"Successfully processed message: {message.message_id}")
except json.JSONDecodeError:
logging.error(f"Invalid JSON in message: {message.message_id}")
message.nack()
except Exception as e:
logging.error(f"Error processing message {message.message_id}: {e}")
message.nack()
def handle_business_logic(self, data):
# Implement idempotent processing logic
pass
def start_consuming(self):
flow_control = pubsub_v1.types.FlowControl(max_messages=1000)
streaming_pull_future = self.subscriber.subscribe(
self.subscription_path,
callback=self.process_message,
flow_control=flow_control
)
logging.info(f"Listening for messages on {self.subscription_path}...")
try:
streaming_pull_future.result()
except KeyboardInterrupt:
streaming_pull_future.cancel()
streaming_pull_future.result()
Kafka Consumer with Error Handling
from kafka import KafkaConsumer, TopicPartition
from kafka.errors import KafkaError
import json
import logging
import time
from typing import Dict, Any
class KafkaMessageProcessor:
def __init__(self, bootstrap_servers, group_id, topics,
max_retries=3, retry_delay=1.0):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=10000
)
self.max_retries = max_retries
self.retry_delay = retry_delay
self.failed_messages = []
def process_message_with_retry(self, message) -> bool:
"""Process message with exponential backoff retry logic"""
for attempt in range(self.max_retries + 1):
try:
self.process_business_logic(message.value)
return True
except Exception as e:
if attempt < self.max_retries:
wait_time = self.retry_delay * (2 ** attempt)
logging.warning(
f"Retry {attempt + 1}/{self.max_retries} after {wait_time}s: {e}"
)
time.sleep(wait_time)
else:
logging.error(f"Failed to process message after {self.max_retries} retries: {e}")
self.handle_dead_letter(message)
return False
def process_business_logic(self, data: Dict[str, Any]):
"""Implement your message processing logic here"""
# Ensure idempotent processing
message_id = data.get('id')
if self.is_already_processed(message_id):
return
# Process the message
# ... your business logic ...
# Mark as processed
self.mark_as_processed(message_id)
def handle_dead_letter(self, message):
"""Handle messages that failed all retry attempts"""
self.failed_messages.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'value': message.value,
'timestamp': time.time()
})
def start_consuming(self):
try:
for message in self.consumer:
if self.process_message_with_retry(message):
# Commit offset only after successful processing
self.consumer.commit()
else:
# Handle failed message (could skip or halt depending on requirements)
pass
except KafkaError as e:
logging.error(f"Kafka error: {e}")
finally:
self.consumer.close()
Configuration Best Practices
Flow Control and Performance Tuning
# Google Cloud Pub/Sub Configuration
subscriber_config:
max_messages: 1000 # Maximum number of unacknowledged messages
max_bytes: 1048576 # 1MB max bytes
max_latency: 100 # Maximum seconds to wait before sending messages
max_workers: 10 # Concurrent message processors
ack_deadline: 60 # Acknowledgment deadline in seconds
# Kafka Consumer Configuration
kafka_config:
max_poll_records: 500
fetch_min_bytes: 1024
fetch_max_wait_ms: 500
session_timeout_ms: 30000
heartbeat_interval_ms: 3000
max_poll_interval_ms: 300000
Health Monitoring and Metrics
import time
from collections import defaultdict, deque
from threading import Lock
class SubscriberMetrics:
def __init__(self, window_size=300): # 5-minute window
self.window_size = window_size
self.lock = Lock()
self.message_counts = deque()
self.error_counts = deque()
self.processing_times = deque()
def record_message_processed(self, processing_time: float):
current_time = time.time()
with self.lock:
self.message_counts.append(current_time)
self.processing_times.append(processing_time)
self._cleanup_old_entries(current_time)
def record_error(self):
current_time = time.time()
with self.lock:
self.error_counts.append(current_time)
self._cleanup_old_entries(current_time)
def get_metrics(self) -> dict:
current_time = time.time()
with self.lock:
self._cleanup_old_entries(current_time)
return {
'messages_per_second': len(self.message_counts) / self.window_size,
'error_rate': len(self.error_counts) / max(len(self.message_counts), 1),
'avg_processing_time': sum(self.processing_times) / max(len(self.processing_times), 1),
'total_messages': len(self.message_counts)
}
def _cleanup_old_entries(self, current_time: float):
cutoff_time = current_time - self.window_size
while self.message_counts and self.message_counts[0] < cutoff_time:
self.message_counts.popleft()
while self.error_counts and self.error_counts[0] < cutoff_time:
self.error_counts.popleft()
while self.processing_times and len(self.processing_times) > len(self.message_counts):
self.processing_times.popleft()
Advanced Patterns
Batch Processing with Size and Time Windows
import asyncio
from typing import List, Any, Callable
class BatchProcessor:
def __init__(self, batch_size: int = 100, flush_interval: float = 5.0):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.batch: List[Any] = []
self.last_flush = time.time()
self.lock = asyncio.Lock()
async def add_message(self, message: Any, process_func: Callable):
async with self.lock:
self.batch.append(message)
should_flush = (
len(self.batch) >= self.batch_size or
time.time() - self.last_flush >= self.flush_interval
)
if should_flush:
await self._flush_batch(process_func)
async def _flush_batch(self, process_func: Callable):
if not self.batch:
return
current_batch = self.batch.copy()
self.batch.clear()
self.last_flush = time.time()
try:
await process_func(current_batch)
except Exception as e:
logging.error(f"Batch processing failed: {e}")
# Handle failed batch (could retry or send to DLQ)
Scaling and Deployment Considerations
Horizontal Scaling Guidelines
- Monitor CPU, memory, and network utilization
- Scale based on message lag and processing latency
- Use consumer groups for automatic load distribution
- Implement graceful shutdown with message drain
- Consider using auto-scaling based on queue depth
Production Deployment Checklist
- Implement structured logging with correlation IDs
- Set up monitoring and alerting for message lag
- Configure dead letter queues for poison messages
- Implement health check endpoints
- Set resource limits and requests in containerized environments
- Configure appropriate retention policies
- Test failure scenarios and recovery procedures
- Implement message deduplication if needed
- Set up proper authentication and authorization
- Monitor and optimize garbage collection for long-running processes
Discussion
Questions & comments · 0
Sign In Sign in to leave a comment.