Skip to content

Snowflake Integration

Semantica features a native integration with Snowflake, the powerful cloud data warehouse that enables scalable data storage and analytics for enterprise workloads.

Overview

Snowflake is integrated into Semantica's ingest module via the SnowflakeIngestor. This allows you to seamlessly extract structured data from Snowflake tables and queries into semantic structures that can be indexed, searched, and analyzed within the Semantica framework.


📖 Integration Documentation

The SnowflakeIngestor provides a high-level interface for Snowflake data ingestion. It supports:

  • Multiple Authentication Methods: Password, key-pair, OAuth, and SSO authentication.
  • Advanced Querying: Custom SQL queries with parameterization and batching.
  • Schema Introspection: Automatic table schema discovery and metadata extraction.
  • Document Export: Convert Snowflake data to Semantica document format.

Basic Usage

from semantica.ingest import SnowflakeIngestor

# Initialize with environment variables
ingestor = SnowflakeIngestor()

# Ingest a table
data = ingestor.ingest_table("CUSTOMERS")

# Access the structured data
print(f"Retrieved {data.row_count} rows")
print(f"Columns: {data.columns}")

For more details, see the Ingest Reference.


🧑🏽‍🍳 Integration Example

We provide a detailed cookbook and clear code examples to help you get started quickly.

Snowflake Clear Code Example

from semantica.ingest import SnowflakeIngestor
import os
from dotenv import load_dotenv

# 1. Load environment variables
load_dotenv()

# 2. Initialize the Snowflake Ingestor
ingestor = SnowflakeIngestor(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema=os.getenv("SNOWFLAKE_SCHEMA")
)

# 3. Ingest a table with filters
data = ingestor.ingest_table(
    "CUSTOMERS",
    where="COUNTRY = 'USA' AND CREATED_DATE > '2024-01-01'",
    order_by="CREATED_DATE DESC",
    limit=10000
)

# 4. Access the structured data
print(f"--- Customer Data ---")
print(f"Retrieved {data.row_count} customers")
print(f"Columns: {data.columns}")

# 5. Iterate through rows
for row in data.data[:5]:  # Print first 5 rows
    print(f"Customer: {row['NAME']} ({row['EMAIL']})")

# 6. Export as documents for Semantica processing
documents = ingestor.export_as_documents(
    data,
    id_field="CUSTOMER_ID",
    text_fields=["NAME", "EMAIL", "NOTES"]
)

print(f"Created {len(documents)} documents for processing")

See more in our Code Examples.


💻 GitHub Source

The integration is open-source and available on GitHub. You can explore the implementation, contribute improvements, or report issues.


📦 PyPI & Installation

Snowflake connector is an optional dependency for Semantica. You can install it along with Semantica or as a separate requirement.

Install via Semantica

# Install with Snowflake support
pip install semantica[db-snowflake]

# Or install with all database connectors
pip install semantica[db-all]

Install Snowflake connector manually

If you are working in a custom environment:

pip install snowflake-connector-python

For full installation details, see the Installation Guide.


🔐 Authentication Methods

Snowflake integration supports multiple authentication methods for different security requirements:

Password Authentication

ingestor = SnowflakeIngestor(
    account="myaccount",
    user="myuser",
    password="mypassword",
    warehouse="COMPUTE_WH"
)
ingestor = SnowflakeIngestor(
    account="myaccount",
    user="myuser",
    private_key_path="/path/to/rsa_key.p8",
    warehouse="COMPUTE_WH"
)

OAuth Authentication

ingestor = SnowflakeIngestor(
    account="myaccount",
    user="myuser",
    authenticator="oauth",
    token="your_oauth_token",
    warehouse="COMPUTE_WH"
)

SSO Authentication

ingestor = SnowflakeIngestor(
    account="myaccount",
    user="myuser",
    authenticator="externalbrowser",
    warehouse="COMPUTE_WH"
)

🚀 Advanced Features

Schema Introspection

# Get table schema
schema = ingestor.get_table_schema("CUSTOMERS")
for column in schema["columns"]:
    print(f"{column['name']}: {column['type']}")

Custom Queries

# Execute custom SQL
data = ingestor.ingest_query("""
    SELECT 
        CUSTOMER_ID,
        SUM(AMOUNT) AS TOTAL_AMOUNT
    FROM SALES
    WHERE DATE >= '2024-01-01'
    GROUP BY CUSTOMER_ID
""")

Batch Processing

# Handle large result sets
data = ingestor.ingest_query(
    "SELECT * FROM LARGE_TABLE",
    batch_size=5000
)

📊 Best Practices

Use Environment Variables

import os
from dotenv import load_dotenv

load_dotenv()
ingestor = SnowflakeIngestor()  # Reads from environment

Use Key-Pair Authentication for Production

ingestor = SnowflakeIngestor(
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    user=os.getenv("SNOWFLAKE_USER"),
    private_key_path=os.getenv("SNOWFLAKE_PRIVATE_KEY_PATH"),
    warehouse="COMPUTE_WH"
)

Paginate Large Results

PAGE_SIZE = 10000
for page in range(total_pages):
    data = ingestor.ingest_table(
        "LARGE_TABLE",
        limit=PAGE_SIZE,
        offset=page * PAGE_SIZE
    )
    process_batch(data)

🔍 Troubleshooting

Connection Issues

# Test connection
connector = SnowflakeConnector(
    account="myaccount",
    user="myuser",
    password="mypassword"
)

if not connector.test_connection():
    print("Connection failed - check credentials")

Performance Optimization

# Use appropriate warehouse size
ingestor = SnowflakeIngestor(
    account="myaccount",
    user="myuser",
    password="mypassword",
    warehouse="LARGE_WH"  # For heavy workloads
)

📚 See Also