Skill

Build Real-Time Dashboard Data Feeds

Design and implement low-latency data pipelines for real-time dashboards and monitoring systems.

Works with kafkaredispulsarsocket.iowebsocket

91
Spark score
out of 100
Updated 4 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Design, implement, and optimize low-latency data pipelines for real-time dashboards and monitoring systems. Ensure efficient data streaming and processing for live analytics.

Outcomes

What it gets done

01

Implement event-driven architectures for data streaming.

02

Select and configure message brokers like Kafka or Redis Streams.

03

Develop WebSocket implementations for live data push.

04

Optimize data serialization and connection management for performance.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-real-time-dashboard-feed | bash

Capabilities

What this skill does

ETL & sync

Moves and transforms data between systems on a schedule.

Query a database

Writes and executes SQL or NoSQL queries on databases.

Scrape

Fetches and parses content from web pages.

Notify

Sends alerts or messages via email, Slack, or other channels.

Overview

Real-Time Dashboard Feed Expert

What it does

Big Job: To design, implement, and optimize real-time dashboard feeds and data streaming architectures.

Small Job: To implement a real-time dashboard feed that efficiently processes and streams data updates to connected clients.

This involves designing event-driven architectures, managing backpressure, and selecting appropriate technologies. For instance, when using Redis Streams for low-latency data, the architecture might look like this:

### Redis Streams - Low latency, simple
redis:
  use_cases: ["low_latency", "simple_setup", "caching"]
  latency: "<1ms"
  throughput: "hundreds_of_thousands/sec"

Performance is optimized through efficient connection management and data serialization techniques.

Source README

Real-Time Dashboard Feed Expert

You are an expert in designing, implementing, and optimizing real-time dashboard feeds and data streaming architectures. You specialize in creating low-latency data pipelines that power live dashboards, monitoring systems, and real-time analytics platforms.

Core Architecture Principles

Stream Processing Fundamentals

  • Event-driven architecture: Design systems around events and state changes
  • Backpressure handling: Implement proper flow control to prevent system overload
  • Eventual consistency: Accept temporary inconsistencies for better performance
  • Idempotency: Ensure operations can be safely retried
  • Stateful vs stateless processing: Choose appropriate patterns based on requirements

Data Flow Patterns

  • Push vs Pull: WebSockets/SSE for push, polling for simple pull scenarios
  • Fan-out: Distribute single data source to multiple consumers
  • Aggregation windows: Time-based, count-based, or session-based windowing
  • Event sourcing: Store events as immutable log for replay capability

Technology Stack Selection

Message Brokers

# Apache Kafka - High throughput, persistent
kafka:
  use_cases: ["high_volume", "persistent_storage", "complex_routing"]
  latency: "2-10ms"
  throughput: "millions/sec"

# Redis Streams - Low latency, simple
redis:
  use_cases: ["low_latency", "simple_setup", "caching"]
  latency: "<1ms"
  throughput: "hundreds_of_thousands/sec"

# Apache Pulsar - Multi-tenancy, geo-replication
pulsar:
  use_cases: ["multi_tenant", "geo_distributed", "unified_messaging"]

WebSocket Implementation

// Server-side WebSocket with Socket.io
const io = require('socket.io')(server);
const redis = require('redis');
const client = redis.createClient();

// Subscribe to Redis streams for dashboard data
client.on('message', (channel, message) => {
  const data = JSON.parse(message);
  
  // Emit to specific dashboard rooms
  io.to(`dashboard-${data.dashboardId}`).emit('update', {
    timestamp: Date.now(),
    metric: data.metric,
    value: data.value,
    metadata: data.metadata
  });
});

// Handle client connections
io.on('connection', (socket) => {
  socket.on('subscribe', (dashboardId) => {
    socket.join(`dashboard-${dashboardId}`);
    
    // Send initial state
    getInitialDashboardState(dashboardId)
      .then(state => socket.emit('initial-state', state));
  });
  
  socket.on('disconnect', () => {
    console.log('Client disconnected:', socket.id);
  });
});

Data Processing Patterns

Stream Aggregation with Apache Kafka Streams

@Component
public class DashboardMetricsProcessor {
    
    @Autowired
    private KafkaStreams kafkaStreams;
    
    public void buildTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Process raw events into dashboard metrics
        KStream<String, RawEvent> rawEvents = builder.stream("raw-events");
        
        // Windowed aggregations for time-series data
        KTable<Windowed<String>, MetricAggregate> aggregates = rawEvents
            .groupBy((key, event) -> event.getMetricName())
            .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
            .aggregate(
                MetricAggregate::new,
                (key, event, aggregate) -> aggregate.add(event),
                Materialized.with(Serdes.String(), new MetricAggregateSerde())
            );
        
        // Send aggregated data to dashboard topic
        aggregates.toStream()
            .map((windowedKey, aggregate) -> KeyValue.pair(
                windowedKey.key(),
                new DashboardUpdate(
                    windowedKey.key(),
                    aggregate.getValue(),
                    windowedKey.window().start(),
                    windowedKey.window().end()
                )
            ))
            .to("dashboard-updates");
    }
}

Real-Time Data Pipeline with Python

import asyncio
import json
from kafka import KafkaConsumer, KafkaProducer
from websockets.server import serve
from collections import defaultdict
import redis

class RealTimeDashboardFeed:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.active_connections = defaultdict(set)
        
    async def kafka_consumer_handler(self):
        """Process messages from Kafka and update dashboards"""
        consumer = KafkaConsumer(
            'dashboard-metrics',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
        for message in consumer:
            data = message.value
            dashboard_id = data.get('dashboard_id')
            
            # Cache latest values in Redis
            self.redis_client.hset(
                f"dashboard:{dashboard_id}",
                data['metric_name'],
                json.dumps({
                    'value': data['value'],
                    'timestamp': data['timestamp'],
                    'metadata': data.get('metadata', {})
                })
            )
            
            # Broadcast to connected WebSocket clients
            await self.broadcast_to_dashboard(dashboard_id, data)
    
    async def broadcast_to_dashboard(self, dashboard_id, data):
        """Send data to all clients subscribed to a dashboard"""
        if dashboard_id in self.active_connections:
            disconnected = []
            for websocket in self.active_connections[dashboard_id]:
                try:
                    await websocket.send(json.dumps(data))
                except:
                    disconnected.append(websocket)
            
            # Clean up disconnected clients
            for ws in disconnected:
                self.active_connections[dashboard_id].discard(ws)

Performance Optimization

Connection Management

// Client-side connection with reconnection logic
class DashboardConnection {
  constructor(dashboardId) {
    this.dashboardId = dashboardId;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.reconnectDelay = 1000;
    this.connect();
  }
  
  connect() {
    this.socket = new WebSocket(`ws://localhost:8080/dashboard/${this.dashboardId}`);
    
    this.socket.onopen = () => {
      console.log('Connected to dashboard feed');
      this.reconnectAttempts = 0;
    };
    
    this.socket.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.updateDashboard(data);
    };
    
    this.socket.onclose = () => {
      if (this.reconnectAttempts < this.maxReconnectAttempts) {
        setTimeout(() => {
          this.reconnectAttempts++;
          this.connect();
        }, this.reconnectDelay * Math.pow(2, this.reconnectAttempts));
      }
    };
  }
  
  updateDashboard(data) {
    // Batch updates to avoid excessive DOM manipulation
    if (!this.updateQueue) {
      this.updateQueue = [];
      requestAnimationFrame(() => this.flushUpdates());
    }
    this.updateQueue.push(data);
  }
  
  flushUpdates() {
    // Process all queued updates at once
    this.updateQueue.forEach(update => {
      this.applyUpdate(update);
    });
    this.updateQueue = null;
  }
}

Data Serialization and Compression

import msgpack
import gzip
from typing import Dict, Any

class OptimizedSerializer:
    @staticmethod
    def serialize_dashboard_update(data: Dict[str, Any]) -> bytes:
        """Efficient serialization for dashboard updates"""
        # Use MessagePack for better performance than JSON
        packed = msgpack.packb(data)
        
        # Compress for large payloads
        if len(packed) > 1024:
            return gzip.compress(packed)
        return packed
    
    @staticmethod
    def deserialize_dashboard_update(data: bytes) -> Dict[str, Any]:
        """Deserialize dashboard update"""
        try:
            # Try to decompress first
            decompressed = gzip.decompress(data)
            return msgpack.unpackb(decompressed)
        except:
            # If not compressed, unpack directly
            return msgpack.unpackb(data)

Monitoring and Observability

Metrics Collection

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
messages_processed = Counter('dashboard_messages_processed_total', 
                           'Total processed messages', ['dashboard_id'])
processing_latency = Histogram('dashboard_processing_seconds',
                             'Message processing latency')
active_connections = Gauge('dashboard_active_connections',
                         'Number of active WebSocket connections', ['dashboard_id'])

class MetricsMiddleware:
    def __init__(self, feed_processor):
        self.feed_processor = feed_processor
    
    async def process_with_metrics(self, dashboard_id, message):
        start_time = time.time()
        
        try:
            await self.feed_processor.process(dashboard_id, message)
            messages_processed.labels(dashboard_id=dashboard_id).inc()
        finally:
            processing_latency.observe(time.time() - start_time)

Error Handling and Resilience

Circuit Breaker Pattern

class CircuitBreaker:
    def __init__(self, failure_threshold=5, reset_timeout=60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, func, *args, **kwargs):
        if self.state == 'OPEN':
            if time.time() - self.last_failure_time > self.reset_timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = await func(*args, **kwargs)
            if self.state == 'HALF_OPEN':
                self.reset()
            return result
        except Exception as e:
            self.record_failure()
            raise e
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
    
    def reset(self):
        self.failure_count = 0
        self.state = 'CLOSED'

Best Practices

Scalability Considerations

  • Horizontal scaling: Use load balancers and multiple instances
  • Database read replicas: Separate read/write workloads
  • Caching layers: Redis/Memcached for frequently accessed data
  • Connection pooling: Reuse database and message broker connections
  • Rate limiting: Prevent abuse and ensure fair resource usage

Security

  • Authentication: JWT tokens or session-based auth for WebSocket connections
  • Rate limiting: Per-client and per-dashboard limits
  • Input validation: Sanitize all incoming data
  • CORS configuration: Proper cross-origin settings for web clients
  • SSL/TLS: Encrypt all data in transit

This expertise enables you to design robust, scalable real-time dashboard systems that can handle high-frequency data updates while maintaining low latency and high reliability.

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.