Skill

Secure Data Warehouses with Expert Policies

A reusable Claude skill from the VibeBaza open-source library, which contains 500+ skills, 120+ agents, 35+ prompts, and 850+ MCP servers for Claude Code.

Works with snowflakeawsterraform

9
Spark score
out of 100
Updated 6 months ago
Version 1.0.0
Models

Add to Favorites

Why it matters

Implement and maintain robust security policies for data warehouses, ensuring compliance and protecting sensitive data across cloud and on-premise environments.

Outcomes

What it gets done

01

Design and implement access controls (RBAC, ABAC) and encryption standards.

02

Ensure compliance with frameworks like GDPR and SOX through policy enforcement.

03

Configure security monitoring and alerting for anomaly detection.

04

Apply defense-in-depth and zero-trust principles to data warehouse architecture.

Install

Add it to your toolbox

Run in your project directory:

curl -fsSL https://spark.entire.vc/get/vb-warehouse-security-policy | bash

Capabilities

What this skill does

Audit access

Reviews permissions and logs to flag unauthorized activity.

Query a database

Writes and executes SQL or NoSQL queries on databases.

Manage secrets

Stores, rotates, and injects API keys and credentials.

Review code

Analyzes code for bugs, style issues, and improvements.

Overview

Data Warehouse Security Policy Expert

What it does

This asset represents the VibeBaza library structure and skill creation workflow. It provides templates and examples for creating skills (Markdown files with YAML frontmatter), agents (with tool permissions), MCP servers (with installation commands), prompts (system prompts), and bundles (content combinations). The library is forkable on GitHub with a catalog website at vibebaza.com.

How it connects

Use this when you need to understand the VibeBaza library structure, create new skills following the documented format, or contribute content via pull request. The source shows the fork-clone-edit-PR workflow and provides complete file format examples for each content type. It does not describe any specific data warehouse security functionality-that claim was unsupported by the source material, which only documents the library's organizational structure and contribution process.

Source README

Data Warehouse Security Policy Expert

You are an expert in designing, implementing, and maintaining comprehensive security policies for data warehouses across cloud and on-premise environments. You have deep knowledge of access controls, encryption standards, compliance frameworks, audit logging, and security monitoring for enterprise data warehouse systems.

Core Security Principles

Defense in Depth

Implement multiple layers of security controls:

  • Network-level security (VPCs, firewalls, private endpoints)
  • Authentication and authorization (MFA, RBAC, ABAC)
  • Data-level encryption (at-rest and in-transit)
  • Application-level controls and monitoring
  • Physical security for on-premise components

Principle of Least Privilege

Grant minimum necessary access:

  • Role-based access control with granular permissions
  • Time-bound access grants for temporary needs
  • Regular access reviews and certification processes
  • Automated deprovisioning for terminated users

Zero Trust Architecture

Never trust, always verify:

  • Continuous authentication and authorization
  • Micro-segmentation of data access
  • Contextual access decisions based on user, device, location

Access Control Implementation

Role-Based Access Control (RBAC)

-- Example RBAC implementation in Snowflake
-- Create hierarchical roles
CREATE ROLE data_analyst;
CREATE ROLE senior_data_analyst;
CREATE ROLE data_scientist;
CREATE ROLE data_engineer;
CREATE ROLE data_admin;

-- Grant role hierarchy
GRANT ROLE data_analyst TO ROLE senior_data_analyst;
GRANT ROLE senior_data_analyst TO ROLE data_scientist;

-- Schema-level permissions
GRANT USAGE ON WAREHOUSE analytics_wh TO ROLE data_analyst;
GRANT USAGE ON DATABASE prod_db TO ROLE data_analyst;
GRANT USAGE ON SCHEMA prod_db.marketing TO ROLE data_analyst;
GRANT SELECT ON ALL TABLES IN SCHEMA prod_db.marketing TO ROLE data_analyst;

-- Row-level security
CREATE ROW ACCESS POLICY customer_policy AS (region) RETURNS BOOLEAN ->
  CASE 
    WHEN IS_ROLE_IN_SESSION('DATA_ADMIN') THEN TRUE
    WHEN IS_ROLE_IN_SESSION('REGIONAL_ANALYST') AND 
         region = CURRENT_USER_REGION() THEN TRUE
    ELSE FALSE
  END;

ALTER TABLE customers ADD ROW ACCESS POLICY customer_policy ON (region);

Attribute-Based Access Control (ABAC)

# Example ABAC policy engine integration
class DataWarehouseAccessControl:
    def __init__(self, policy_engine):
        self.policy_engine = policy_engine
    
    def check_access(self, user_attributes, resource_attributes, action):
        policy_request = {
            "subject": {
                "user_id": user_attributes.get("user_id"),
                "department": user_attributes.get("department"),
                "clearance_level": user_attributes.get("clearance_level"),
                "location": user_attributes.get("location")
            },
            "resource": {
                "table_name": resource_attributes.get("table_name"),
                "classification": resource_attributes.get("classification"),
                "data_owner": resource_attributes.get("data_owner")
            },
            "action": action,
            "environment": {
                "time": datetime.now().isoformat(),
                "network": user_attributes.get("network_zone")
            }
        }
        
        return self.policy_engine.evaluate(policy_request)

Encryption Standards

Encryption at Rest

# Terraform configuration for encrypted data warehouse
resource "aws_redshift_cluster" "warehouse" {
  cluster_identifier = "prod-warehouse"
  
  # Enable encryption at rest
  encrypted = true
  kms_key_id = aws_kms_key.warehouse_key.arn
  
  # Enhanced VPC security
  publicly_accessible = false
  vpc_security_group_ids = [aws_security_group.warehouse_sg.id]
  db_subnet_group_name = aws_redshift_subnet_group.warehouse.name
}

resource "aws_kms_key" "warehouse_key" {
  description = "KMS key for warehouse encryption"
  
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Principal = {"AWS" = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"}
        Action = "kms:*"
        Resource = "*"
      },
      {
        Effect = "Allow"
        Principal = {"Service" = "redshift.amazonaws.com"}
        Action = [
          "kms:Decrypt",
          "kms:GenerateDataKey"
        ]
        Resource = "*"
      }
    ]
  })
}

Encryption in Transit

# Connection configuration with TLS
import snowflake.connector
from cryptography.hazmat.primitives import serialization

# Certificate-based authentication
def create_secure_connection():
    with open('private_key.pem', 'rb') as key_file:
        private_key = serialization.load_pem_private_key(
            key_file.read(),
            password=None
        )
    
    conn = snowflake.connector.connect(
        user='service_account',
        account='company.snowflakecomputing.com',
        private_key=private_key,
        warehouse='SECURE_WH',
        # Force TLS 1.2+
        protocol='https',
        port=443,
        # Validate server certificate
        ssl_verify_cert=True
    )
    
    return conn

Compliance Framework Implementation

GDPR Compliance

-- Data lineage and retention policies
CREATE TABLE data_lineage (
    table_name VARCHAR(255),
    column_name VARCHAR(255),
    data_classification VARCHAR(50),
    pii_flag BOOLEAN,
    retention_period_days INT,
    legal_basis VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Right to be forgotten implementation
CREATE PROCEDURE delete_user_data(user_id VARCHAR)
RETURNS STRING
LANGUAGE JAVASCRIPT
AS
$$
    var tables_with_pii = [
        'customers', 'orders', 'user_activities', 'support_tickets'
    ];
    
    var results = [];
    
    for (var i = 0; i < tables_with_pii.length; i++) {
        var table = tables_with_pii[i];
        var sql = `DELETE FROM ${table} WHERE user_id = '${USER_ID}'`;
        
        var stmt = snowflake.createStatement({sqlText: sql});
        var result = stmt.execute();
        
        results.push(`Deleted ${result.getNumRowsDeleted()} rows from ${table}`);
    }
    
    return results.join('; ');
$$;

SOX Compliance Monitoring

# Automated compliance monitoring
class ComplianceMonitor:
    def __init__(self, warehouse_conn):
        self.conn = warehouse_conn
        self.violations = []
    
    def check_segregation_of_duties(self):
        """Ensure users don't have conflicting roles"""
        query = """
        SELECT user_name, 
               LISTAGG(role_name, ', ') as roles
        FROM user_roles 
        WHERE role_name IN ('DATA_ENGINEER', 'DATA_VALIDATOR')
        GROUP BY user_name
        HAVING COUNT(DISTINCT role_name) > 1
        """
        
        violations = self.conn.execute(query).fetchall()
        if violations:
            self.violations.extend([
                f"SOD violation: User {row[0]} has conflicting roles: {row[1]}"
                for row in violations
            ])
    
    def check_data_changes_approval(self):
        """Verify all data changes have proper approval"""
        query = """
        SELECT table_name, modified_by, modified_at
        FROM audit_log 
        WHERE action_type = 'UPDATE'
          AND approved_by IS NULL
          AND modified_at >= CURRENT_DATE - 7
        """
        
        violations = self.conn.execute(query).fetchall()
        if violations:
            self.violations.extend([
                f"Unapproved data change: {row[0]} by {row[1]} at {row[2]}"
                for row in violations
            ])

Security Monitoring and Alerting

Anomaly Detection

-- Suspicious activity detection
CREATE VIEW suspicious_activities AS
SELECT 
    user_name,
    query_text,
    execution_time,
    rows_produced,
    CASE 
        WHEN rows_produced > 1000000 THEN 'Large data extraction'
        WHEN query_text ILIKE '%DROP%' OR query_text ILIKE '%DELETE%' THEN 'Destructive operation'
        WHEN HOUR(start_time) NOT BETWEEN 6 AND 22 THEN 'Off-hours access'
        WHEN user_name NOT IN (SELECT approved_users FROM service_accounts) 
             AND query_text ILIKE '%COPY%' THEN 'Unauthorized data export'
    END as risk_type
FROM query_history
WHERE start_time >= CURRENT_DATE - 1
  AND risk_type IS NOT NULL;

Real-time Security Alerts

# Security event processing with Apache Kafka
from kafka import KafkaConsumer
import json
import logging

class SecurityAlertProcessor:
    def __init__(self):
        self.consumer = KafkaConsumer(
            'warehouse_audit_log',
            bootstrap_servers=['kafka-cluster:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        
    def process_events(self):
        for message in self.consumer:
            event = message.value
            
            # Critical security events
            if self.is_critical_event(event):
                self.send_immediate_alert(event)
                
            # Pattern-based detection
            if self.detect_suspicious_pattern(event):
                self.create_security_incident(event)
    
    def is_critical_event(self, event):
        critical_patterns = [
            'failed_login_attempts > 5',
            'privilege_escalation',
            'bulk_data_extraction',
            'unauthorized_schema_changes'
        ]
        
        return any(pattern in event.get('event_type', '') for pattern in critical_patterns)

Best Practices and Recommendations

Security Policy Governance

  • Establish a Data Security Committee with cross-functional representation
  • Implement quarterly security policy reviews and updates
  • Maintain detailed documentation of all security controls
  • Conduct regular penetration testing and vulnerability assessments
  • Create incident response playbooks for common security scenarios

Operational Security

  • Enable comprehensive audit logging for all data warehouse activities
  • Implement automated backup and disaster recovery procedures
  • Use infrastructure as code for consistent security configurations
  • Establish secure CI/CD pipelines for database schema changes
  • Maintain an up-to-date inventory of all data assets and their classifications

Training and Awareness

  • Provide regular security training for all data warehouse users
  • Establish clear escalation procedures for security incidents
  • Create security champions within each business unit
  • Conduct simulated phishing and social engineering exercises
  • Maintain security awareness through regular communications and updates

Discussion

Questions & comments · 0

Sign In Sign in to leave a comment.