Skip to main content

Data Warehouse Integration

Connect GA4 data with your data warehouse ecosystem using Snowflake, Databricks, and modern data stack tools.

Architecture Overview

Modern Data Stack

┌─────────────────────────────────────────────────────────┐
│                    Data Sources                          │
├─────────────────────────────────────────────────────────┤
│  GA4  │  Google Ads  │  Meta  │  CRM  │  Backend DB    │
└───┬───┴──────┬───────┴───┬────┴───┬───┴───────┬────────┘
    │          │           │        │           │
    ▼          ▼           ▼        ▼           ▼
┌─────────────────────────────────────────────────────────┐
│                    Ingestion Layer                       │
│              (Fivetran, Airbyte, Stitch)                │
└───────────────────────────┬─────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                    Data Warehouse                        │
│            (Snowflake, Databricks, BigQuery)            │
└───────────────────────────┬─────────────────────────────┘
                            │
                            ▼
┌─────────────────────────────────────────────────────────┐
│                 Transformation Layer                     │
│                        (dbt)                             │
└───────────────────────────┬─────────────────────────────┘
                            │
    ┌───────────────────────┼───────────────────────┐
    ▼                       ▼                       ▼
┌────────┐            ┌──────────┐           ┌──────────┐
│   BI   │            │ Reverse  │           │    ML    │
│ Tools  │            │   ETL    │           │ Platform │
└────────┘            └──────────┘           └──────────┘

Snowflake Integration

Getting GA4 Data into Snowflake

Option 1: Via BigQuery + Data Sharing

-- In BigQuery: Grant access to GA4 export
-- Then use Snowflake's BigQuery connector

-- Create external table in Snowflake
CREATE OR REPLACE EXTERNAL TABLE ga4_events
WITH LOCATION = @bigquery_stage/analytics_XXXXX/events_*
FILE_FORMAT = (TYPE = PARQUET);

Option 2: Via Fivetran

  1. Connect Fivetran to GA4 (Admin API)
  2. Select tables: events, users
  3. Configure sync schedule
  4. Data lands in Snowflake

Option 3: Direct Export via Cloud Storage

-- BigQuery: Export to GCS
EXPORT DATA OPTIONS (
  uri = 'gs://bucket/ga4_events/*.parquet',
  format = 'PARQUET'
) AS
SELECT * FROM `project.analytics_XXXXXX.events_*`
WHERE _TABLE_SUFFIX >= '20240101';

-- Snowflake: Create stage and load
CREATE OR REPLACE STAGE ga4_stage
  URL = 'gcs://bucket/ga4_events/'
  STORAGE_INTEGRATION = gcs_int;

COPY INTO ga4_events
FROM @ga4_stage
FILE_FORMAT = (TYPE = PARQUET);

Snowflake GA4 Schema

-- Create optimized GA4 table
CREATE OR REPLACE TABLE analytics.ga4_events (
  event_date DATE,
  event_timestamp TIMESTAMP_NTZ,
  event_name STRING,
  user_pseudo_id STRING,
  user_id STRING,
  device_category STRING,
  geo_country STRING,
  traffic_source_source STRING,
  traffic_source_medium STRING,
  event_params VARIANT,  -- JSON column for flexibility
  user_properties VARIANT,
  ecommerce VARIANT
)
CLUSTER BY (event_date, event_name);

Query Examples

-- Session analysis in Snowflake
WITH sessions AS (
  SELECT
    user_pseudo_id,
    event_params:ga_session_id::NUMBER as session_id,
    MIN(event_timestamp) as session_start,
    MAX(event_timestamp) as session_end,
    traffic_source_source as source,
    traffic_source_medium as medium,
    COUNT(*) as events
  FROM analytics.ga4_events
  WHERE event_date >= DATEADD(day, -30, CURRENT_DATE())
  GROUP BY 1, 2, 5, 6
)
SELECT
  source,
  medium,
  COUNT(*) as sessions,
  AVG(events) as avg_events,
  AVG(DATEDIFF(second, session_start, session_end)) as avg_duration
FROM sessions
GROUP BY 1, 2
ORDER BY sessions DESC;

Databricks Integration

Setting Up Databricks

Unity Catalog

# Create catalog and schema
spark.sql("""
  CREATE CATALOG IF NOT EXISTS analytics;
  CREATE SCHEMA IF NOT EXISTS analytics.ga4;
""")

Load GA4 Data

# Read from BigQuery using Spark connector
from pyspark.sql import SparkSession

ga4_df = spark.read \
  .format("bigquery") \
  .option("table", "project.analytics_XXXXXX.events_*") \
  .option("filter", "_TABLE_SUFFIX >= '20240101'") \
  .load()

# Write to Delta Lake
ga4_df.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable("analytics.ga4.events")

Optimize for Analytics

# Optimize Delta table
spark.sql("""
  OPTIMIZE analytics.ga4.events
  ZORDER BY (event_date, event_name)
""")

# Enable Change Data Feed for incremental processing
spark.sql("""
  ALTER TABLE analytics.ga4.events
  SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")

Machine Learning in Databricks

from databricks import feature_store
from sklearn.model_selection import train_test_split
import mlflow

# Create features
features_df = spark.sql("""
  SELECT
    user_pseudo_id,
    COUNT(*) as total_events,
    SUM(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as purchases,
    AVG(event_params.engagement_time_msec) as avg_engagement
  FROM analytics.ga4.events
  GROUP BY user_pseudo_id
""")

# Store in Feature Store
fs = feature_store.FeatureStoreClient()
fs.create_table(
  name="analytics.ga4.user_features",
  primary_keys=["user_pseudo_id"],
  df=features_df
)

# Train model with MLflow tracking
with mlflow.start_run():
  # ... training code
  mlflow.sklearn.log_model(model, "churn_model")

dbt for Transformation

Project Structure

dbt_project/
├── dbt_project.yml
├── models/
│   ├── staging/
│   │   └── ga4/
│   │       ├── stg_ga4_events.sql
│   │       └── stg_ga4_users.sql
│   ├── intermediate/
│   │   └── int_sessions.sql
│   └── marts/
│       ├── dim_users.sql
│       └── fct_conversions.sql
└── macros/
    └── extract_event_param.sql

Staging Model

-- models/staging/ga4/stg_ga4_events.sql

{{
  config(
    materialized='incremental',
    unique_key='event_id',
    partition_by={
      "field": "event_date",
      "data_type": "date"
    }
  )
}}

WITH source AS (
  SELECT * FROM {{ source('ga4', 'events') }}
  {% if is_incremental() %}
  WHERE event_date >= (SELECT MAX(event_date) FROM {{ this }})
  {% endif %}
),

renamed AS (
  SELECT
    {{ dbt_utils.generate_surrogate_key(['user_pseudo_id', 'event_timestamp', 'event_name']) }} as event_id,
    PARSE_DATE('%Y%m%d', event_date) as event_date,
    TIMESTAMP_MICROS(event_timestamp) as event_timestamp,
    event_name,
    user_pseudo_id,
    user_id,
    {{ extract_event_param('page_location', 'string') }} as page_location,
    {{ extract_event_param('ga_session_id', 'int') }} as session_id,
    {{ extract_event_param('engagement_time_msec', 'int') }} as engagement_time_msec,
    traffic_source.source as traffic_source,
    traffic_source.medium as traffic_medium,
    traffic_source.campaign as traffic_campaign
  FROM source
)

SELECT * FROM renamed

Session Model

-- models/intermediate/int_sessions.sql

{{
  config(
    materialized='incremental',
    unique_key='session_key'
  )
}}

WITH events AS (
  SELECT * FROM {{ ref('stg_ga4_events') }}
  {% if is_incremental() %}
  WHERE event_date >= (SELECT MAX(event_date) FROM {{ this }})
  {% endif %}
),

sessions AS (
  SELECT
    {{ dbt_utils.generate_surrogate_key(['user_pseudo_id', 'session_id']) }} as session_key,
    user_pseudo_id,
    user_id,
    session_id,
    MIN(event_timestamp) as session_start,
    MAX(event_timestamp) as session_end,
    MIN(event_date) as session_date,
    FIRST_VALUE(traffic_source) OVER (
      PARTITION BY user_pseudo_id, session_id
      ORDER BY event_timestamp
    ) as source,
    FIRST_VALUE(traffic_medium) OVER (
      PARTITION BY user_pseudo_id, session_id
      ORDER BY event_timestamp
    ) as medium,
    FIRST_VALUE(page_location) OVER (
      PARTITION BY user_pseudo_id, session_id
      ORDER BY event_timestamp
    ) as landing_page,
    COUNT(*) as event_count,
    SUM(engagement_time_msec) / 1000 as engagement_seconds,
    MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as converted
  FROM events
  GROUP BY user_pseudo_id, user_id, session_id
)

SELECT * FROM sessions

Metrics Layer

# models/marts/schema.yml

metrics:
  - name: conversion_rate
    label: Conversion Rate
    model: ref('fct_conversions')
    description: "Percentage of sessions that convert"

    calculation_method: derived
    expression: "SUM(converted) / COUNT(*)"

    dimensions:
      - source
      - medium
      - device_category

    filters:
      - field: is_valid_session
        operator: '='
        value: 'true'

Reverse ETL

Use Cases

| Destination | Data | Purpose | |-------------|------|---------| | CRM | LTV predictions | Sales prioritization | | Email | Churn scores | Retention campaigns | | Ad platforms | Audiences | Targeting | | Support | User history | Context |

Tools

| Tool | Strength | |------|----------| | Census | Enterprise, dbt integration | | Hightouch | User-friendly, fast setup | | Polytomic | API flexibility | | Rudderstack | Open-source option |

Census Example

-- Create sync model in warehouse
CREATE VIEW analytics.high_value_users AS
SELECT
  user_id as email,
  predicted_ltv,
  churn_probability,
  segment
FROM analytics.user_predictions
WHERE predicted_ltv > 500
  AND churn_probability < 0.3

Configure sync:

  1. Source: analytics.high_value_users
  2. Destination: Klaviyo
  3. Mapping: email → Email
  4. Schedule: Daily

Best Practices

Data Freshness

| Layer | Freshness | Method | |-------|-----------|--------| | Ingestion | 1 hour | Streaming or micro-batch | | Staging | 1 hour | Incremental models | | Marts | 4 hours | Scheduled dbt runs | | Reverse ETL | Daily | Scheduled syncs |

Cost Management

| Optimization | Impact | |--------------|--------| | Incremental models | -70% compute | | Clustering/partitioning | -80% scan | | Aggregate tables | -90% queries | | Auto-suspend | -50% idle cost |

Data Quality

-- dbt test example
-- tests/assert_no_null_user_id_on_purchases.sql
SELECT *
FROM {{ ref('fct_conversions') }}
WHERE event_name = 'purchase'
  AND user_id IS NULL

Previous: Predictive Analytics Related: BigQuery for GA4