Data Warehouse Integration

NEW

Step-by-step guide for integrating Tandem Beam export APIs with Snowflake, Domo, and other data warehouses. Build automated pipelines for conversion analytics, attribution analysis, and platform performance reporting.

Snowflake
Domo
BigQuery
Data Engineering

Prerequisites

  • Active Tandem Beam API token with export permissions
  • Advertiser UUID for the X-Advertiser-ID header
  • Access to your data warehouse (Snowflake, Domo, or BigQuery)
  • Basic SQL knowledge for creating tables and queries

Export API Overview

Tandem Beam provides three bulk export endpoints designed for data warehouse integration:

/api/export/events

Tracking events with conversion data

/api/export/sessions

User sessions with aggregated metrics

/api/export/platform-logs

Platform delivery status and performance

API Reference: For complete endpoint documentation including all parameters and response formats, see the Bulk Data Export APIs section in the API Reference.

Snowflake Integration

Snowflake

Set up automated data pipelines to load Tandem Beam export data into Snowflake using external functions and scheduled tasks.

1

Create Database and Schema

-- Create dedicated database for Beam data
CREATE DATABASE IF NOT EXISTS BEAM_ANALYTICS;
USE DATABASE BEAM_ANALYTICS;

-- Create schema for raw data
CREATE SCHEMA IF NOT EXISTS RAW;

-- Create schema for transformed data
CREATE SCHEMA IF NOT EXISTS ANALYTICS;
2

Create Target Tables

Create tables to store the exported data:

-- Events table
CREATE TABLE RAW.BEAM_EVENTS (
    id INTEGER,
    event_type VARCHAR(100),
    event_data VARIANT,
    tandem_tracking VARIANT,
    attribution_sources ARRAY,
    -- Attribution columns (NEW)
    attribution_source VARCHAR(50),
    attribution_type VARCHAR(30),
    attribution_priority INTEGER,
    -- UTM columns (NEW)
    utm_source VARCHAR(100),
    utm_medium VARCHAR(100),
    utm_campaign VARCHAR(255),
    created_at TIMESTAMP_NTZ,
    updated_at TIMESTAMP_NTZ,
    _loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Sessions table
CREATE TABLE RAW.BEAM_SESSIONS (
    id VARCHAR(50),
    identity_key_hash VARCHAR(64),
    first_seen TIMESTAMP_NTZ,
    last_seen TIMESTAMP_NTZ,
    event_count INTEGER,
    page_view_count INTEGER,
    conversion_count INTEGER,
    total_value DECIMAL(12,2),
    _loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

-- Platform logs table
CREATE TABLE RAW.BEAM_PLATFORM_LOGS (
    id VARCHAR(50),
    platform_id VARCHAR(50),
    platform_slug VARCHAR(50),
    advertiser_platform_id VARCHAR(50),
    tracking_event_id INTEGER,
    event_name VARCHAR(100),
    vendor_id_used VARCHAR(255),
    attribution_source VARCHAR(50),
    status_code INTEGER,
    sent_at TIMESTAMP_NTZ,
    is_success BOOLEAN,
    _loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

Attribution & UTM Columns
NEW

Column Snowflake Type Description
attribution_sourceVARCHAR(50)Credited platform/source (e.g., google-ads, meta, newsletter)
attribution_typeVARCHAR(30)Category: paid, organic, email, referral, social, affiliate
attribution_priorityINTEGER1=click_id, 2=utm, 3=referrer, 4=organic
utm_sourceVARCHAR(100)UTM source parameter (e.g., google, facebook)
utm_mediumVARCHAR(100)UTM medium parameter (e.g., cpc, email)
utm_campaignVARCHAR(255)UTM campaign parameter (e.g., brand_2024)
Pre-Computed Attribution: These columns are computed at event creation time using a 4-level priority system. This enables fast queries without JSON parsing, ideal for large-scale analytics.
3

Create Python UDF for API Calls

Create a Python UDF to fetch data from the Beam export API:

-- Create Python UDF for API calls (Snowpark)
CREATE OR REPLACE FUNCTION RAW.FETCH_BEAM_EXPORT(
    endpoint VARCHAR,
    from_date DATE,
    to_date DATE,
    cursor_val VARCHAR
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('requests')
HANDLER = 'fetch_data'
AS
$$
import requests
import json

def fetch_data(endpoint, from_date, to_date, cursor_val):
    api_token = 'YOUR_API_TOKEN'  -- Store in Snowflake secrets
    advertiser_id = 'YOUR_ADVERTISER_UUID'

    base_url = 'https://api.tandembeam.com/api/export'
    url = f"{base_url}/{endpoint}"

    params = {
        'from': str(from_date),
        'to': str(to_date),
        'limit': 10000,
        'format': 'json'
    }

    if cursor_val:
        params['cursor'] = cursor_val

    headers = {
        'Authorization': f'Bearer {api_token}',
        'X-Advertiser-ID': advertiser_id
    }

    response = requests.get(url, params=params, headers=headers)
    return response.json()
$$;
Security Best Practice: Store API tokens using Snowflake Secrets Manager rather than hardcoding them in the UDF. See Snowflake Secrets documentation.
4

Create Sync Procedure

-- Stored procedure for incremental sync
CREATE OR REPLACE PROCEDURE RAW.SYNC_BEAM_EVENTS(from_date DATE, to_date DATE)
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
    cursor_val VARCHAR DEFAULT NULL;
    has_more BOOLEAN DEFAULT TRUE;
    batch_count INTEGER DEFAULT 0;
BEGIN
    -- Loop through all pages
    WHILE (has_more) DO
        -- Fetch batch from API
        LET result VARIANT := (SELECT RAW.FETCH_BEAM_EXPORT('events', :from_date, :to_date, :cursor_val));

        -- Insert data into table
        INSERT INTO RAW.BEAM_EVENTS (id, event_type, event_data, tandem_tracking, attribution_sources, created_at, updated_at)
        SELECT
            value:id::INTEGER,
            value:event_type::VARCHAR,
            value:event_data::VARIANT,
            value:tandem_tracking::VARIANT,
            value:attribution_sources::ARRAY,
            value:created_at::TIMESTAMP_NTZ,
            value:updated_at::TIMESTAMP_NTZ
        FROM TABLE(FLATTEN(input => :result:data));

        -- Update cursor and has_more
        cursor_val := result:meta:cursor::VARCHAR;
        has_more := result:meta:has_more::BOOLEAN;
        batch_count := batch_count + 1;
    END WHILE;

    RETURN 'Synced ' || batch_count || ' batches successfully';
END;
$$;
5

Schedule Automated Sync

-- Create hourly sync task
CREATE OR REPLACE TASK RAW.BEAM_HOURLY_SYNC
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = 'USING CRON 0 * * * * UTC'
AS
    CALL RAW.SYNC_BEAM_EVENTS(DATEADD('day', -1, CURRENT_DATE()), CURRENT_DATE());

-- Enable the task
ALTER TASK RAW.BEAM_HOURLY_SYNC RESUME;

-- Verify task is running
SHOW TASKS LIKE 'BEAM%';

Domo Integration

Domo

Configure Domo's REST API connector to pull Tandem Beam export data for dashboards and reporting.

1

Configure REST API Connector

  1. Navigate to Data > Connectors in Domo
  2. Search for "REST API (Advanced)" connector
  3. Click Get the Data

Connector Settings

URL https://api.tandembeam.com/api/export/events
Method GET
Authentication Bearer Token
2

Set Request Headers

Add these custom headers:

Authorization: Bearer your_api_token_here
X-Advertiser-ID: your_advertiser_uuid
Content-Type: application/json
3

Set Query Parameters

Configure dynamic date parameters:

Parameter Value
from ${date.subtract(30, 'days').format('YYYY-MM-DD')}
to ${date.format('YYYY-MM-DD')}
limit 10000
format json
4

Configure Response Parsing

Set the data path to extract records:

$.data[*]

This extracts all records from the data array in the response.

5

Schedule Refresh

Configure the DataSet to refresh on a schedule that respects the API rate limits:

Recommended Schedule
  • • Every 6 hours for most use cases
  • • Hourly for real-time dashboards
  • • Daily for historical reporting
Rate Limit Reminder

Export API allows 10 requests/minute per advertiser. Schedule refreshes to avoid hitting limits.

Best Practices

Scheduling Frequency

  • Events: Every 1-6 hours depending on volume
  • Sessions: Every 6-24 hours (sessions aggregate over time)
  • Platform Logs: Every 1-4 hours for monitoring

Handling Rate Limits

  • Implement exponential backoff on 429 responses
  • Use limit=10000 to minimize requests
  • Store cursor position for resume capability
  • Stagger syncs if multiple advertisers

Data Retention

  • Use updated_since for incremental loads
  • UPSERT by event ID to handle updates
  • Partition tables by date for query performance
  • Set retention policies to manage storage costs

Monitoring & Alerting

  • Track total_in_range for data volume trends
  • Alert on sync job failures
  • Monitor latency between event time and load time
  • Compare platform log success rates

Sample Analytics Queries

Once data is loaded into your warehouse, use these queries as starting points for analytics:

Attribution Analysis

-- Conversions by attribution source
SELECT
    attribution_sources[0] as primary_source,
    event_type,
    COUNT(*) as conversion_count,
    SUM(event_data:value::DECIMAL) as total_value
FROM RAW.BEAM_EVENTS
WHERE event_type IN ('purchase', 'lead', 'sign_up')
    AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY total_value DESC;

Platform Delivery Performance

-- Platform success rates by day
SELECT
    DATE_TRUNC('day', sent_at) as day,
    platform_slug,
    COUNT(*) as total_deliveries,
    SUM(CASE WHEN is_success THEN 1 ELSE 0 END) as successful,
    ROUND(100.0 * SUM(CASE WHEN is_success THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM RAW.BEAM_PLATFORM_LOGS
WHERE sent_at >= DATEADD('day', -7, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

Session-Level Conversion Funnel

-- Session conversion funnel metrics
SELECT
    DATE_TRUNC('week', first_seen) as week,
    COUNT(*) as total_sessions,
    SUM(CASE WHEN event_count > 1 THEN 1 ELSE 0 END) as engaged_sessions,
    SUM(CASE WHEN conversion_count > 0 THEN 1 ELSE 0 END) as converted_sessions,
    ROUND(100.0 * SUM(CASE WHEN conversion_count > 0 THEN 1 ELSE 0 END) / COUNT(*), 2) as conversion_rate,
    AVG(total_value) as avg_order_value
FROM RAW.BEAM_SESSIONS
WHERE first_seen >= DATEADD('day', -90, CURRENT_DATE())
GROUP BY 1
ORDER BY 1 DESC;

UTM Campaign Analysis
NEW

Conversions by UTM Source:

SELECT
    utm_source,
    utm_medium,
    COUNT(*) as total_events,
    SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases,
    SUM(CASE WHEN event_type = 'lead' THEN 1 ELSE 0 END) as leads
FROM RAW.BEAM_EVENTS
WHERE utm_source IS NOT NULL
  AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY utm_source, utm_medium
ORDER BY purchases DESC;

Attribution Priority Distribution:

SELECT
    CASE attribution_priority
        WHEN 1 THEN 'Click ID'
        WHEN 2 THEN 'UTM'
        WHEN 3 THEN 'Referrer'
        WHEN 4 THEN 'Organic'
    END as attribution_method,
    COUNT(*) as event_count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM RAW.BEAM_EVENTS
WHERE created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY attribution_priority
ORDER BY attribution_priority;

Campaign Performance with Attribution:

SELECT
    utm_campaign,
    attribution_source,
    COUNT(DISTINCT laser_beam_session) as sessions,
    SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as conversions,
    ROUND(SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) * 100.0 /
          NULLIF(COUNT(DISTINCT laser_beam_session), 0), 2) as conversion_rate
FROM RAW.BEAM_EVENTS
WHERE utm_campaign IS NOT NULL
  AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY utm_campaign, attribution_source
HAVING conversions > 0
ORDER BY conversions DESC
LIMIT 20;

Next Steps

Ready to start integrating? Review the complete API documentation for all available parameters and response formats.