Skill

Stream and Ingest Data with Azure Event Hubs

TypeScript SDK for Azure Event Hubs enabling high-throughput event streaming, real-time data ingestion, and checkpointed consumption with partition control.

Works with azuretypescriptnpm

91
Spark score
out of 100
Updated 3 months ago
Version 1.0.0

Add to Favorites

Why it matters

Leverage Azure Event Hubs SDK for TypeScript to build high-throughput event streaming and real-time data ingestion pipelines. This asset facilitates sending and receiving events, managing checkpoints, and integrating with Azure services.

Outcomes

What it gets done

01

Send events to Azure Event Hubs, with options for batching and specific partitions.

02

Receive events from Azure Event Hubs, supporting simple and checkpointed consumption.

03

Configure event processing with options for start position, batch size, and wait times.

04

Implement robust error handling and best practices for reliable event streaming.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/ag-azure-eventhub-ts | bash

Capabilities

What this skill does

Extract

Pulls structured data fields from unstructured text.

ETL & sync

Moves and transforms data between systems on a schedule.

Query a database

Writes and executes SQL or NoSQL queries on databases.

Deploy / CI

Runs build pipelines, tests, and deploys to environments.

Overview

Azure Event Hubs SDK for TypeScript

What it does

This skill provides the Azure Event Hubs SDK for TypeScript, enabling developers to send and receive events through Azure Event Hubs. It covers producer and consumer client setup, batch event sending, checkpointing with Azure Blob Storage, partition targeting, and error handling patterns.

How it connects

Use this skill when you need to integrate TypeScript applications with Azure Event Hubs for event streaming scenarios-sending telemetry data, processing logs, building event-driven architectures, or consuming events with checkpointing for reliable processing resumption.

Source README

Azure Event Hubs SDK for TypeScript

High-throughput event streaming and real-time data ingestion.

Installation

npm install @azure/event-hubs @azure/identity

For checkpointing with consumer groups:

npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob

Environment Variables

EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints

Authentication

import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";

const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
const credential = new DefaultAzureCredential();

// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);

// Consumer
const consumer = new EventHubConsumerClient(
  "$Default", // Consumer group
  fullyQualifiedNamespace,
  eventHubName,
  credential
);

Core Workflow

Send Events

const producer = new EventHubProducerClient(namespace, eventHubName, credential);

// Create batch and add events
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });

await producer.sendBatch(batch);
await producer.close();

Send to Specific Partition

// By partition ID
const batch = await producer.createBatch({ partitionId: "0" });

// By partition key (consistent hashing)
const batch = await producer.createBatch({ partitionKey: "device-123" });

Receive Events (Simple)

const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
    }
  },
  processError: async (err, context) => {
    console.error(`Error on partition ${context.partitionId}: ${err.message}`);
  },
});

// Stop after some time
setTimeout(async () => {
  await subscription.close();
  await consumer.close();
}, 60000);

Receive with Checkpointing (Production)

import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

const containerClient = new ContainerClient(
  `https://${storageAccount}.blob.core.windows.net/${containerName}`,
  credential
);

const checkpointStore = new BlobCheckpointStore(containerClient);

const consumer = new EventHubConsumerClient(
  "$Default",
  namespace,
  eventHubName,
  credential,
  checkpointStore
);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Processing: ${JSON.stringify(event.body)}`);
    }
    // Checkpoint after processing batch
    if (events.length > 0) {
      await context.updateCheckpoint(events[events.length - 1]);
    }
  },
  processError: async (err, context) => {
    console.error(`Error: ${err.message}`);
  },
});

Receive from Specific Position

const subscription = consumer.subscribe({
  processEvents: async (events, context) => { /* ... */ },
  processError: async (err, context) => { /* ... */ },
}, {
  startPosition: {
    // Start from beginning
    "0": { offset: "@earliest" },
    // Start from end (new events only)
    "1": { offset: "@latest" },
    // Start from specific offset
    "2": { offset: "12345" },
    // Start from specific time
    "3": { enqueuedOn: new Date("2024-01-01") },
  },
});

Event Hub Properties

// Get hub info
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);

// Get partition info
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Last sequence: ${partitionProperties.lastEnqueuedSequenceNumber}`);

Batch Processing Options

const subscription = consumer.subscribe(
  {
    processEvents: async (events, context) => { /* ... */ },
    processError: async (err, context) => { /* ... */ },
  },
  {
    maxBatchSize: 100,           // Max events per batch
    maxWaitTimeInSeconds: 30,    // Max wait for batch
  }
);

Key Types

import {
  EventHubProducerClient,
  EventHubConsumerClient,
  EventData,
  ReceivedEventData,
  PartitionContext,
  Subscription,
  SubscriptionEventHandlers,
  CreateBatchOptions,
  EventPosition,
} from "@azure/event-hubs";

import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

Event Properties

// Send with properties
const batch = await producer.createBatch();
batch.tryAdd({
  body: { data: "payload" },
  properties: {
    eventType: "telemetry",
    deviceId: "sensor-1",
  },
  contentType: "application/json",
  correlationId: "request-123",
});

// Access in receiver
consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Type: ${event.properties?.eventType}`);
      console.log(`Sequence: ${event.sequenceNumber}`);
      console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
      console.log(`Offset: ${event.offset}`);
    }
  },
});

Error Handling

consumer.subscribe({
  processEvents: async (events, context) => {
    try {
      for (const event of events) {
        await processEvent(event);
      }
      await context.updateCheckpoint(events[events.length - 1]);
    } catch (error) {
      // Don't checkpoint on error - events will be reprocessed
      console.error("Processing failed:", error);
    }
  },
  processError: async (err, context) => {
    if (err.name === "MessagingError") {
      // Transient error - SDK will retry
      console.warn("Transient error:", err.message);
    } else {
      // Fatal error
      console.error("Fatal error:", err);
    }
  },
});

Best Practices

  1. Use checkpointing - Always checkpoint in production for exactly-once processing
  2. Batch sends - Use createBatch() for efficient sending
  3. Partition keys - Use partition keys to ensure ordering for related events
  4. Consumer groups - Use separate consumer groups for different processing pipelines
  5. Handle errors gracefully - Don't checkpoint on processing failures
  6. Close clients - Always close producer/consumer when done
  7. Monitor lag - Track lastEnqueuedSequenceNumber vs processed sequence

When to Use

This skill is applicable to execute the workflow or actions described in the overview.

Limitations

  • Use this skill only when the task clearly matches the scope described above.
  • Do not treat the output as a substitute for environment-specific validation, testing, or expert review.
  • Stop and ask for clarification if required inputs, permissions, safety boundaries, or success criteria are missing.

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.