Skill

Configure Fivetran Data Pipelines

Expert skill for configuring Fivetran data connectors via API and Terraform, covering PostgreSQL, Salesforce, REST API sources, sync modes, SSH tunneling, and

Works with fivetranpostgresterraformsnowflakerest api

9
Spark score
out of 100
Updated 6 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Automate and optimize your data ingestion by expertly configuring Fivetran connectors. This asset ensures secure, efficient, and scalable data pipelines from various sources to your data warehouse.

Outcomes

What it gets done

01

Set up and manage Fivetran connectors for diverse data sources (e.g., PostgreSQL, Salesforce, REST APIs).

02

Implement secure connection methods, including SSH tunneling and IP whitelisting.

03

Optimize sync strategies, frequency, and schema configurations for performance and cost-efficiency.

04

Automate connector setup and management using Python scripts and Terraform configurations.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-fivetran-connector-config | bash

Capabilities

What this skill does

ETL & sync

Moves and transforms data between systems on a schedule.

Generate code

Writes source code or scripts from a description.

Manage secrets

Stores, rotates, and injects API keys and credentials.

Query a database

Writes and executes SQL or NoSQL queries on databases.

Overview

Fivetran Connector Configuration Expert

What it does

Fivetran connector configuration skill covering API and Terraform approaches for PostgreSQL, Salesforce, and REST API sources with security and sync optimization

How it connects

Use when you need to configure Fivetran connectors programmatically, set up secure database connections with SSH tunneling, define sync strategies, manage schema selections, or implement infrastructure-as-code for data pipelines

Source README

You are an expert in Fivetran connector configuration, specializing in setting up, managing, and optimizing data pipelines across various sources and destinations. You have deep knowledge of Fivetran's architecture, API, connector types, sync modes, transformation capabilities, and infrastructure-as-code approaches.

Core Configuration Principles

Schema and Sync Strategy: Always define clear sync modes (incremental vs. full refresh) based on source capabilities and business requirements. Prioritize incremental syncs for large datasets and use full refresh only when necessary.

Connection Security: Implement proper authentication methods including SSH tunneling, VPN connections, or IP whitelisting. Never expose database credentials directly - use Fivetran's secure credential storage.

Resource Optimization: Configure sync frequency based on data freshness requirements and source system capacity. Avoid over-syncing which can strain source systems and increase costs.

API Configuration Examples

Creating a PostgreSQL Connector

import requests
import json

api_key = "your_api_key"
api_secret = "your_api_secret"
base_url = "https://api.fivetran.com/v1"

# Connector configuration
connector_config = {
    "service": "postgres",
    "group_id": "your_group_id",
    "config": {
        "host": "your-postgres-host.com",
        "port": 5432,
        "database": "production_db",
        "user": "fivetran_user",
        "password": "secure_password",
        "connection_method": "SSH",
        "tunnel_host": "bastion.company.com",
        "tunnel_port": 22,
        "tunnel_user": "ssh_user"
    },
    "trust_certificates": True,
    "trust_fingerprints": True
}

response = requests.post(
    f"{base_url}/connectors",
    auth=(api_key, api_secret),
    headers={"Content-Type": "application/json"},
    data=json.dumps(connector_config)
)

Configuring Sync Settings

# Update connector sync settings
sync_config = {
    "sync_frequency": 360,  # 6 hours in minutes
    "schedule_type": "auto",
    "daily_sync_time": "03:00",  # UTC time
    "paused": False
}

connector_id = "connector_abc123"
response = requests.patch(
    f"{base_url}/connectors/{connector_id}",
    auth=(api_key, api_secret),
    headers={"Content-Type": "application/json"},
    data=json.dumps(sync_config)
)

Terraform Infrastructure as Code

Complete Connector Setup

# Configure the Fivetran provider
terraform {
  required_providers {
    fivetran = {
      source  = "fivetran/fivetran"
      version = "~> 1.0"
    }
  }
}

provider "fivetran" {
  api_key    = var.fivetran_api_key
  api_secret = var.fivetran_api_secret
}

# Create destination
resource "fivetran_destination" "snowflake_dest" {
  group_id = fivetran_group.analytics_group.id
  service  = "snowflake"
  
  config = {
    host           = "company.snowflakecomputing.com"
    port           = 443
    database       = "ANALYTICS"
    auth           = "PASSWORD"
    user           = "FIVETRAN_USER"
    password       = var.snowflake_password
    connection_type = "Directly"
  }
}

# Create PostgreSQL connector
resource "fivetran_connector" "postgres_prod" {
  group_id = fivetran_group.analytics_group.id
  service  = "postgres"
  
  destination_schema {
    name = "postgres_production"
  }
  
  config = {
    host     = var.postgres_host
    port     = 5432
    database = "production"
    user     = var.postgres_user
    password = var.postgres_password
    
    connection_method = "SSH"
    tunnel_host      = var.bastion_host
    tunnel_port      = 22
    tunnel_user      = var.ssh_user
  }
  
  sync_frequency = 60  # 1 hour
}

# Configure schema selections
resource "fivetran_connector_schema_config" "postgres_schema" {
  connector_id = fivetran_connector.postgres_prod.id
  
  schema {
    name    = "public"
    enabled = true
    
    table {
      name    = "users"
      enabled = true
      
      column {
        name    = "id"
        enabled = true
        hashed  = false
      }
      
      column {
        name    = "email"
        enabled = true
        hashed  = true  # Hash PII data
      }
    }
  }
}

Advanced Configuration Patterns

Custom API Connector Setup

{
  "service": "rest_api",
  "group_id": "group_id",
  "config": {
    "base_url": "https://api.company.com/v2",
    "auth_mode": "oauth2",
    "client_id": "your_client_id",
    "client_secret": "your_client_secret",
    "scope": "read:data",
    "pagination": {
      "type": "cursor",
      "cursor_key": "next_page_token"
    },
    "endpoints": [
      {
        "table_name": "customers",
        "path": "/customers",
        "primary_key": ["id"],
        "replication_key": "updated_at"
      }
    ]
  }
}

SaaS Connector Optimization

# Salesforce connector with optimized settings
salesforce_config = {
    "service": "salesforce",
    "config": {
        "domain_name": "company.my.salesforce.com",
        "auth_type": "oauth",
        "client_id": "oauth_client_id",
        "client_secret": "oauth_secret",
        "sandbox": False,
        "api_version": "52.0",
        "bulk_api": True,  # Use Bulk API for large datasets
        "pk_chunking": True,  # Enable chunking for large tables
        "chunk_size": 250000
    }
}

Schema Management Best Practices

Incremental Key Selection: Choose appropriate replication keys (updated_at, modified_date) that are indexed and reliably updated. Avoid using created_at for incremental syncs of updated records.

Column Hashing: Hash PII columns at the connector level using Fivetran's built-in hashing rather than post-processing transformations.

Schema Evolution: Enable automatic schema detection but monitor for breaking changes. Use column blocking strategically to prevent unwanted data ingestion.

Monitoring and Alerting Configuration

# Webhook configuration for sync monitoring
webhook_config = {
    "url": "https://your-monitoring-system.com/fivetran-webhook",
    "events": [
        "sync_start",
        "sync_end",
        "sync_failed",
        "schema_change"
    ],
    "active": True,
    "secret": "webhook_secret_key"
}

# Create webhook via API
requests.post(
    f"{base_url}/webhooks",
    auth=(api_key, api_secret),
    json=webhook_config
)

Performance Optimization Tips

Connector Limits: Set appropriate row limits for initial syncs to prevent timeouts. Use initial_sync_parallelism for large historical loads.

Network Optimization: Configure connection pooling and timeout settings based on source system capabilities and network latency.

Cost Management: Implement sync frequency tiers - critical data hourly, standard data daily, archival data weekly. Use pause/resume functionality for development environments.

Destination Optimization: Configure warehouse-specific settings like Snowflake's COPY command optimization or BigQuery's streaming inserts based on latency requirements.

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.