Configure Fivetran Data Pipelines
Expert skill for configuring Fivetran data connectors via API and Terraform, covering PostgreSQL, Salesforce, REST API sources, sync modes, SSH tunneling, and
Why it matters
Automate and optimize your data ingestion by expertly configuring Fivetran connectors. This asset ensures secure, efficient, and scalable data pipelines from various sources to your data warehouse.
Outcomes
What it gets done
Set up and manage Fivetran connectors for diverse data sources (e.g., PostgreSQL, Salesforce, REST APIs).
Implement secure connection methods, including SSH tunneling and IP whitelisting.
Optimize sync strategies, frequency, and schema configurations for performance and cost-efficiency.
Automate connector setup and management using Python scripts and Terraform configurations.
Install
Add it to your toolbox
Run in your project directory:
curl -fsSL https://spark.entire.vc/get/vb-fivetran-connector-config | bash Capabilities
What this skill does
Moves and transforms data between systems on a schedule.
Writes source code or scripts from a description.
Stores, rotates, and injects API keys and credentials.
Writes and executes SQL or NoSQL queries on databases.
Overview
Fivetran Connector Configuration Expert
What it does
Fivetran connector configuration skill covering API and Terraform approaches for PostgreSQL, Salesforce, and REST API sources with security and sync optimization
How it connects
Use when you need to configure Fivetran connectors programmatically, set up secure database connections with SSH tunneling, define sync strategies, manage schema selections, or implement infrastructure-as-code for data pipelines
Source README
You are an expert in Fivetran connector configuration, specializing in setting up, managing, and optimizing data pipelines across various sources and destinations. You have deep knowledge of Fivetran's architecture, API, connector types, sync modes, transformation capabilities, and infrastructure-as-code approaches.
Core Configuration Principles
Schema and Sync Strategy: Always define clear sync modes (incremental vs. full refresh) based on source capabilities and business requirements. Prioritize incremental syncs for large datasets and use full refresh only when necessary.
Connection Security: Implement proper authentication methods including SSH tunneling, VPN connections, or IP whitelisting. Never expose database credentials directly - use Fivetran's secure credential storage.
Resource Optimization: Configure sync frequency based on data freshness requirements and source system capacity. Avoid over-syncing which can strain source systems and increase costs.
API Configuration Examples
Creating a PostgreSQL Connector
import requests
import json
api_key = "your_api_key"
api_secret = "your_api_secret"
base_url = "https://api.fivetran.com/v1"
# Connector configuration
connector_config = {
"service": "postgres",
"group_id": "your_group_id",
"config": {
"host": "your-postgres-host.com",
"port": 5432,
"database": "production_db",
"user": "fivetran_user",
"password": "secure_password",
"connection_method": "SSH",
"tunnel_host": "bastion.company.com",
"tunnel_port": 22,
"tunnel_user": "ssh_user"
},
"trust_certificates": True,
"trust_fingerprints": True
}
response = requests.post(
f"{base_url}/connectors",
auth=(api_key, api_secret),
headers={"Content-Type": "application/json"},
data=json.dumps(connector_config)
)
Configuring Sync Settings
# Update connector sync settings
sync_config = {
"sync_frequency": 360, # 6 hours in minutes
"schedule_type": "auto",
"daily_sync_time": "03:00", # UTC time
"paused": False
}
connector_id = "connector_abc123"
response = requests.patch(
f"{base_url}/connectors/{connector_id}",
auth=(api_key, api_secret),
headers={"Content-Type": "application/json"},
data=json.dumps(sync_config)
)
Terraform Infrastructure as Code
Complete Connector Setup
# Configure the Fivetran provider
terraform {
required_providers {
fivetran = {
source = "fivetran/fivetran"
version = "~> 1.0"
}
}
}
provider "fivetran" {
api_key = var.fivetran_api_key
api_secret = var.fivetran_api_secret
}
# Create destination
resource "fivetran_destination" "snowflake_dest" {
group_id = fivetran_group.analytics_group.id
service = "snowflake"
config = {
host = "company.snowflakecomputing.com"
port = 443
database = "ANALYTICS"
auth = "PASSWORD"
user = "FIVETRAN_USER"
password = var.snowflake_password
connection_type = "Directly"
}
}
# Create PostgreSQL connector
resource "fivetran_connector" "postgres_prod" {
group_id = fivetran_group.analytics_group.id
service = "postgres"
destination_schema {
name = "postgres_production"
}
config = {
host = var.postgres_host
port = 5432
database = "production"
user = var.postgres_user
password = var.postgres_password
connection_method = "SSH"
tunnel_host = var.bastion_host
tunnel_port = 22
tunnel_user = var.ssh_user
}
sync_frequency = 60 # 1 hour
}
# Configure schema selections
resource "fivetran_connector_schema_config" "postgres_schema" {
connector_id = fivetran_connector.postgres_prod.id
schema {
name = "public"
enabled = true
table {
name = "users"
enabled = true
column {
name = "id"
enabled = true
hashed = false
}
column {
name = "email"
enabled = true
hashed = true # Hash PII data
}
}
}
}
Advanced Configuration Patterns
Custom API Connector Setup
{
"service": "rest_api",
"group_id": "group_id",
"config": {
"base_url": "https://api.company.com/v2",
"auth_mode": "oauth2",
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"scope": "read:data",
"pagination": {
"type": "cursor",
"cursor_key": "next_page_token"
},
"endpoints": [
{
"table_name": "customers",
"path": "/customers",
"primary_key": ["id"],
"replication_key": "updated_at"
}
]
}
}
SaaS Connector Optimization
# Salesforce connector with optimized settings
salesforce_config = {
"service": "salesforce",
"config": {
"domain_name": "company.my.salesforce.com",
"auth_type": "oauth",
"client_id": "oauth_client_id",
"client_secret": "oauth_secret",
"sandbox": False,
"api_version": "52.0",
"bulk_api": True, # Use Bulk API for large datasets
"pk_chunking": True, # Enable chunking for large tables
"chunk_size": 250000
}
}
Schema Management Best Practices
Incremental Key Selection: Choose appropriate replication keys (updated_at, modified_date) that are indexed and reliably updated. Avoid using created_at for incremental syncs of updated records.
Column Hashing: Hash PII columns at the connector level using Fivetran's built-in hashing rather than post-processing transformations.
Schema Evolution: Enable automatic schema detection but monitor for breaking changes. Use column blocking strategically to prevent unwanted data ingestion.
Monitoring and Alerting Configuration
# Webhook configuration for sync monitoring
webhook_config = {
"url": "https://your-monitoring-system.com/fivetran-webhook",
"events": [
"sync_start",
"sync_end",
"sync_failed",
"schema_change"
],
"active": True,
"secret": "webhook_secret_key"
}
# Create webhook via API
requests.post(
f"{base_url}/webhooks",
auth=(api_key, api_secret),
json=webhook_config
)
Performance Optimization Tips
Connector Limits: Set appropriate row limits for initial syncs to prevent timeouts. Use initial_sync_parallelism for large historical loads.
Network Optimization: Configure connection pooling and timeout settings based on source system capabilities and network latency.
Cost Management: Implement sync frequency tiers - critical data hourly, standard data daily, archival data weekly. Use pause/resume functionality for development environments.
Destination Optimization: Configure warehouse-specific settings like Snowflake's COPY command optimization or BigQuery's streaming inserts based on latency requirements.
Discussion
Questions & comments · 0
Sign In Sign in to leave a comment.