Data Warehouse Integration
Connect GA4 data with your data warehouse ecosystem using Snowflake, Databricks, and modern data stack tools.
Architecture Overview
Modern Data Stack
┌─────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────────────────────────────────────────────────┤
│ GA4 │ Google Ads │ Meta │ CRM │ Backend DB │
└───┬───┴──────┬───────┴───┬────┴───┬───┴───────┬────────┘
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────┐
│ Ingestion Layer │
│ (Fivetran, Airbyte, Stitch) │
└───────────────────────────┬─────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Data Warehouse │
│ (Snowflake, Databricks, BigQuery) │
└───────────────────────────┬─────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Transformation Layer │
│ (dbt) │
└───────────────────────────┬─────────────────────────────┘
│
┌───────────────────────┼───────────────────────┐
▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌──────────┐
│ BI │ │ Reverse │ │ ML │
│ Tools │ │ ETL │ │ Platform │
└────────┘ └──────────┘ └──────────┘
Snowflake Integration
Getting GA4 Data into Snowflake
Option 1: Via BigQuery + Data Sharing
-- In BigQuery: Grant access to GA4 export
-- Then use Snowflake's BigQuery connector
-- Create external table in Snowflake
CREATE OR REPLACE EXTERNAL TABLE ga4_events
WITH LOCATION = @bigquery_stage/analytics_XXXXX/events_*
FILE_FORMAT = (TYPE = PARQUET);
Option 2: Via Fivetran
- Connect Fivetran to GA4 (Admin API)
- Select tables: events, users
- Configure sync schedule
- Data lands in Snowflake
Option 3: Direct Export via Cloud Storage
-- BigQuery: Export to GCS
EXPORT DATA OPTIONS (
uri = 'gs://bucket/ga4_events/*.parquet',
format = 'PARQUET'
) AS
SELECT * FROM `project.analytics_XXXXXX.events_*`
WHERE _TABLE_SUFFIX >= '20240101';
-- Snowflake: Create stage and load
CREATE OR REPLACE STAGE ga4_stage
URL = 'gcs://bucket/ga4_events/'
STORAGE_INTEGRATION = gcs_int;
COPY INTO ga4_events
FROM @ga4_stage
FILE_FORMAT = (TYPE = PARQUET);
Snowflake GA4 Schema
-- Create optimized GA4 table
CREATE OR REPLACE TABLE analytics.ga4_events (
event_date DATE,
event_timestamp TIMESTAMP_NTZ,
event_name STRING,
user_pseudo_id STRING,
user_id STRING,
device_category STRING,
geo_country STRING,
traffic_source_source STRING,
traffic_source_medium STRING,
event_params VARIANT, -- JSON column for flexibility
user_properties VARIANT,
ecommerce VARIANT
)
CLUSTER BY (event_date, event_name);
Query Examples
-- Session analysis in Snowflake
WITH sessions AS (
SELECT
user_pseudo_id,
event_params:ga_session_id::NUMBER as session_id,
MIN(event_timestamp) as session_start,
MAX(event_timestamp) as session_end,
traffic_source_source as source,
traffic_source_medium as medium,
COUNT(*) as events
FROM analytics.ga4_events
WHERE event_date >= DATEADD(day, -30, CURRENT_DATE())
GROUP BY 1, 2, 5, 6
)
SELECT
source,
medium,
COUNT(*) as sessions,
AVG(events) as avg_events,
AVG(DATEDIFF(second, session_start, session_end)) as avg_duration
FROM sessions
GROUP BY 1, 2
ORDER BY sessions DESC;
Databricks Integration
Setting Up Databricks
Unity Catalog
# Create catalog and schema
spark.sql("""
CREATE CATALOG IF NOT EXISTS analytics;
CREATE SCHEMA IF NOT EXISTS analytics.ga4;
""")
Load GA4 Data
# Read from BigQuery using Spark connector
from pyspark.sql import SparkSession
ga4_df = spark.read \
.format("bigquery") \
.option("table", "project.analytics_XXXXXX.events_*") \
.option("filter", "_TABLE_SUFFIX >= '20240101'") \
.load()
# Write to Delta Lake
ga4_df.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("analytics.ga4.events")
Optimize for Analytics
# Optimize Delta table
spark.sql("""
OPTIMIZE analytics.ga4.events
ZORDER BY (event_date, event_name)
""")
# Enable Change Data Feed for incremental processing
spark.sql("""
ALTER TABLE analytics.ga4.events
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
Machine Learning in Databricks
from databricks import feature_store
from sklearn.model_selection import train_test_split
import mlflow
# Create features
features_df = spark.sql("""
SELECT
user_pseudo_id,
COUNT(*) as total_events,
SUM(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as purchases,
AVG(event_params.engagement_time_msec) as avg_engagement
FROM analytics.ga4.events
GROUP BY user_pseudo_id
""")
# Store in Feature Store
fs = feature_store.FeatureStoreClient()
fs.create_table(
name="analytics.ga4.user_features",
primary_keys=["user_pseudo_id"],
df=features_df
)
# Train model with MLflow tracking
with mlflow.start_run():
# ... training code
mlflow.sklearn.log_model(model, "churn_model")
dbt for Transformation
Project Structure
dbt_project/
├── dbt_project.yml
├── models/
│ ├── staging/
│ │ └── ga4/
│ │ ├── stg_ga4_events.sql
│ │ └── stg_ga4_users.sql
│ ├── intermediate/
│ │ └── int_sessions.sql
│ └── marts/
│ ├── dim_users.sql
│ └── fct_conversions.sql
└── macros/
└── extract_event_param.sql
Staging Model
-- models/staging/ga4/stg_ga4_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
partition_by={
"field": "event_date",
"data_type": "date"
}
)
}}
WITH source AS (
SELECT * FROM {{ source('ga4', 'events') }}
{% if is_incremental() %}
WHERE event_date >= (SELECT MAX(event_date) FROM {{ this }})
{% endif %}
),
renamed AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['user_pseudo_id', 'event_timestamp', 'event_name']) }} as event_id,
PARSE_DATE('%Y%m%d', event_date) as event_date,
TIMESTAMP_MICROS(event_timestamp) as event_timestamp,
event_name,
user_pseudo_id,
user_id,
{{ extract_event_param('page_location', 'string') }} as page_location,
{{ extract_event_param('ga_session_id', 'int') }} as session_id,
{{ extract_event_param('engagement_time_msec', 'int') }} as engagement_time_msec,
traffic_source.source as traffic_source,
traffic_source.medium as traffic_medium,
traffic_source.campaign as traffic_campaign
FROM source
)
SELECT * FROM renamed
Session Model
-- models/intermediate/int_sessions.sql
{{
config(
materialized='incremental',
unique_key='session_key'
)
}}
WITH events AS (
SELECT * FROM {{ ref('stg_ga4_events') }}
{% if is_incremental() %}
WHERE event_date >= (SELECT MAX(event_date) FROM {{ this }})
{% endif %}
),
sessions AS (
SELECT
{{ dbt_utils.generate_surrogate_key(['user_pseudo_id', 'session_id']) }} as session_key,
user_pseudo_id,
user_id,
session_id,
MIN(event_timestamp) as session_start,
MAX(event_timestamp) as session_end,
MIN(event_date) as session_date,
FIRST_VALUE(traffic_source) OVER (
PARTITION BY user_pseudo_id, session_id
ORDER BY event_timestamp
) as source,
FIRST_VALUE(traffic_medium) OVER (
PARTITION BY user_pseudo_id, session_id
ORDER BY event_timestamp
) as medium,
FIRST_VALUE(page_location) OVER (
PARTITION BY user_pseudo_id, session_id
ORDER BY event_timestamp
) as landing_page,
COUNT(*) as event_count,
SUM(engagement_time_msec) / 1000 as engagement_seconds,
MAX(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) as converted
FROM events
GROUP BY user_pseudo_id, user_id, session_id
)
SELECT * FROM sessions
Metrics Layer
# models/marts/schema.yml
metrics:
- name: conversion_rate
label: Conversion Rate
model: ref('fct_conversions')
description: "Percentage of sessions that convert"
calculation_method: derived
expression: "SUM(converted) / COUNT(*)"
dimensions:
- source
- medium
- device_category
filters:
- field: is_valid_session
operator: '='
value: 'true'
Reverse ETL
Use Cases
| Destination | Data | Purpose | |-------------|------|---------| | CRM | LTV predictions | Sales prioritization | | Email | Churn scores | Retention campaigns | | Ad platforms | Audiences | Targeting | | Support | User history | Context |
Tools
| Tool | Strength | |------|----------| | Census | Enterprise, dbt integration | | Hightouch | User-friendly, fast setup | | Polytomic | API flexibility | | Rudderstack | Open-source option |
Census Example
-- Create sync model in warehouse
CREATE VIEW analytics.high_value_users AS
SELECT
user_id as email,
predicted_ltv,
churn_probability,
segment
FROM analytics.user_predictions
WHERE predicted_ltv > 500
AND churn_probability < 0.3
Configure sync:
- Source:
analytics.high_value_users - Destination: Klaviyo
- Mapping: email → Email
- Schedule: Daily
Best Practices
Data Freshness
| Layer | Freshness | Method | |-------|-----------|--------| | Ingestion | 1 hour | Streaming or micro-batch | | Staging | 1 hour | Incremental models | | Marts | 4 hours | Scheduled dbt runs | | Reverse ETL | Daily | Scheduled syncs |
Cost Management
| Optimization | Impact | |--------------|--------| | Incremental models | -70% compute | | Clustering/partitioning | -80% scan | | Aggregate tables | -90% queries | | Auto-suspend | -50% idle cost |
Data Quality
-- dbt test example
-- tests/assert_no_null_user_id_on_purchases.sql
SELECT *
FROM {{ ref('fct_conversions') }}
WHERE event_name = 'purchase'
AND user_id IS NULL
Previous: Predictive Analytics Related: BigQuery for GA4