Data Warehouse Integration
Step-by-step guide for integrating Tandem Beam export APIs with Snowflake, Domo, and other data warehouses. Build automated pipelines for conversion analytics, attribution analysis, and platform performance reporting.
Prerequisites
- Active Tandem Beam API token with export permissions
- Advertiser UUID for the X-Advertiser-ID header
- Access to your data warehouse (Snowflake, Domo, or BigQuery)
- Basic SQL knowledge for creating tables and queries
Export API Overview
Tandem Beam provides three bulk export endpoints designed for data warehouse integration:
/api/export/events
Tracking events with conversion data
/api/export/sessions
User sessions with aggregated metrics
/api/export/platform-logs
Platform delivery status and performance
API Reference: For complete endpoint documentation including all parameters and response formats, see the Bulk Data Export APIs section in the API Reference.
Snowflake Integration
Set up automated data pipelines to load Tandem Beam export data into Snowflake using external functions and scheduled tasks.
Create Database and Schema
-- Create dedicated database for Beam data
CREATE DATABASE IF NOT EXISTS BEAM_ANALYTICS;
USE DATABASE BEAM_ANALYTICS;
-- Create schema for raw data
CREATE SCHEMA IF NOT EXISTS RAW;
-- Create schema for transformed data
CREATE SCHEMA IF NOT EXISTS ANALYTICS;
Create Target Tables
Create tables to store the exported data:
-- Events table
CREATE TABLE RAW.BEAM_EVENTS (
id INTEGER,
event_type VARCHAR(100),
event_data VARIANT,
tandem_tracking VARIANT,
attribution_sources ARRAY,
-- Attribution columns (NEW)
attribution_source VARCHAR(50),
attribution_type VARCHAR(30),
attribution_priority INTEGER,
-- UTM columns (NEW)
utm_source VARCHAR(100),
utm_medium VARCHAR(100),
utm_campaign VARCHAR(255),
created_at TIMESTAMP_NTZ,
updated_at TIMESTAMP_NTZ,
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Sessions table
CREATE TABLE RAW.BEAM_SESSIONS (
id VARCHAR(50),
identity_key_hash VARCHAR(64),
first_seen TIMESTAMP_NTZ,
last_seen TIMESTAMP_NTZ,
event_count INTEGER,
page_view_count INTEGER,
conversion_count INTEGER,
total_value DECIMAL(12,2),
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Platform logs table
CREATE TABLE RAW.BEAM_PLATFORM_LOGS (
id VARCHAR(50),
platform_id VARCHAR(50),
platform_slug VARCHAR(50),
advertiser_platform_id VARCHAR(50),
tracking_event_id INTEGER,
event_name VARCHAR(100),
vendor_id_used VARCHAR(255),
attribution_source VARCHAR(50),
status_code INTEGER,
sent_at TIMESTAMP_NTZ,
is_success BOOLEAN,
_loaded_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
Attribution & UTM Columns
| Column | Snowflake Type | Description |
|---|---|---|
| attribution_source | VARCHAR(50) | Credited platform/source (e.g., google-ads, meta, newsletter) |
| attribution_type | VARCHAR(30) | Category: paid, organic, email, referral, social, affiliate |
| attribution_priority | INTEGER | 1=click_id, 2=utm, 3=referrer, 4=organic |
| utm_source | VARCHAR(100) | UTM source parameter (e.g., google, facebook) |
| utm_medium | VARCHAR(100) | UTM medium parameter (e.g., cpc, email) |
| utm_campaign | VARCHAR(255) | UTM campaign parameter (e.g., brand_2024) |
Create Python UDF for API Calls
Create a Python UDF to fetch data from the Beam export API:
-- Create Python UDF for API calls (Snowpark)
CREATE OR REPLACE FUNCTION RAW.FETCH_BEAM_EXPORT(
endpoint VARCHAR,
from_date DATE,
to_date DATE,
cursor_val VARCHAR
)
RETURNS VARIANT
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
PACKAGES = ('requests')
HANDLER = 'fetch_data'
AS
$$
import requests
import json
def fetch_data(endpoint, from_date, to_date, cursor_val):
api_token = 'YOUR_API_TOKEN' -- Store in Snowflake secrets
advertiser_id = 'YOUR_ADVERTISER_UUID'
base_url = 'https://api.tandembeam.com/api/export'
url = f"{base_url}/{endpoint}"
params = {
'from': str(from_date),
'to': str(to_date),
'limit': 10000,
'format': 'json'
}
if cursor_val:
params['cursor'] = cursor_val
headers = {
'Authorization': f'Bearer {api_token}',
'X-Advertiser-ID': advertiser_id
}
response = requests.get(url, params=params, headers=headers)
return response.json()
$$;
Create Sync Procedure
-- Stored procedure for incremental sync
CREATE OR REPLACE PROCEDURE RAW.SYNC_BEAM_EVENTS(from_date DATE, to_date DATE)
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
cursor_val VARCHAR DEFAULT NULL;
has_more BOOLEAN DEFAULT TRUE;
batch_count INTEGER DEFAULT 0;
BEGIN
-- Loop through all pages
WHILE (has_more) DO
-- Fetch batch from API
LET result VARIANT := (SELECT RAW.FETCH_BEAM_EXPORT('events', :from_date, :to_date, :cursor_val));
-- Insert data into table
INSERT INTO RAW.BEAM_EVENTS (id, event_type, event_data, tandem_tracking, attribution_sources, created_at, updated_at)
SELECT
value:id::INTEGER,
value:event_type::VARCHAR,
value:event_data::VARIANT,
value:tandem_tracking::VARIANT,
value:attribution_sources::ARRAY,
value:created_at::TIMESTAMP_NTZ,
value:updated_at::TIMESTAMP_NTZ
FROM TABLE(FLATTEN(input => :result:data));
-- Update cursor and has_more
cursor_val := result:meta:cursor::VARCHAR;
has_more := result:meta:has_more::BOOLEAN;
batch_count := batch_count + 1;
END WHILE;
RETURN 'Synced ' || batch_count || ' batches successfully';
END;
$$;
Schedule Automated Sync
-- Create hourly sync task
CREATE OR REPLACE TASK RAW.BEAM_HOURLY_SYNC
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 * * * * UTC'
AS
CALL RAW.SYNC_BEAM_EVENTS(DATEADD('day', -1, CURRENT_DATE()), CURRENT_DATE());
-- Enable the task
ALTER TASK RAW.BEAM_HOURLY_SYNC RESUME;
-- Verify task is running
SHOW TASKS LIKE 'BEAM%';
Domo Integration
Configure Domo's REST API connector to pull Tandem Beam export data for dashboards and reporting.
Configure REST API Connector
- Navigate to Data > Connectors in Domo
- Search for "REST API (Advanced)" connector
- Click Get the Data
Connector Settings
https://api.tandembeam.com/api/export/events
GET
Bearer Token
Set Request Headers
Add these custom headers:
Authorization: Bearer your_api_token_here
X-Advertiser-ID: your_advertiser_uuid
Content-Type: application/json
Set Query Parameters
Configure dynamic date parameters:
| Parameter | Value |
|---|---|
| from | ${date.subtract(30, 'days').format('YYYY-MM-DD')} |
| to | ${date.format('YYYY-MM-DD')} |
| limit | 10000 |
| format | json |
Configure Response Parsing
Set the data path to extract records:
$.data[*]
This extracts all records from the data array in the response.
Schedule Refresh
Configure the DataSet to refresh on a schedule that respects the API rate limits:
Recommended Schedule
- • Every 6 hours for most use cases
- • Hourly for real-time dashboards
- • Daily for historical reporting
Rate Limit Reminder
Export API allows 10 requests/minute per advertiser. Schedule refreshes to avoid hitting limits.
Best Practices
Scheduling Frequency
- Events: Every 1-6 hours depending on volume
- Sessions: Every 6-24 hours (sessions aggregate over time)
- Platform Logs: Every 1-4 hours for monitoring
Handling Rate Limits
- Implement exponential backoff on 429 responses
- Use
limit=10000to minimize requests - Store cursor position for resume capability
- Stagger syncs if multiple advertisers
Data Retention
- Use
updated_sincefor incremental loads - UPSERT by event ID to handle updates
- Partition tables by date for query performance
- Set retention policies to manage storage costs
Monitoring & Alerting
- Track
total_in_rangefor data volume trends - Alert on sync job failures
- Monitor latency between event time and load time
- Compare platform log success rates
Sample Analytics Queries
Once data is loaded into your warehouse, use these queries as starting points for analytics:
Attribution Analysis
-- Conversions by attribution source
SELECT
attribution_sources[0] as primary_source,
event_type,
COUNT(*) as conversion_count,
SUM(event_data:value::DECIMAL) as total_value
FROM RAW.BEAM_EVENTS
WHERE event_type IN ('purchase', 'lead', 'sign_up')
AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY total_value DESC;
Platform Delivery Performance
-- Platform success rates by day
SELECT
DATE_TRUNC('day', sent_at) as day,
platform_slug,
COUNT(*) as total_deliveries,
SUM(CASE WHEN is_success THEN 1 ELSE 0 END) as successful,
ROUND(100.0 * SUM(CASE WHEN is_success THEN 1 ELSE 0 END) / COUNT(*), 2) as success_rate
FROM RAW.BEAM_PLATFORM_LOGS
WHERE sent_at >= DATEADD('day', -7, CURRENT_DATE())
GROUP BY 1, 2
ORDER BY 1 DESC, 2;
Session-Level Conversion Funnel
-- Session conversion funnel metrics
SELECT
DATE_TRUNC('week', first_seen) as week,
COUNT(*) as total_sessions,
SUM(CASE WHEN event_count > 1 THEN 1 ELSE 0 END) as engaged_sessions,
SUM(CASE WHEN conversion_count > 0 THEN 1 ELSE 0 END) as converted_sessions,
ROUND(100.0 * SUM(CASE WHEN conversion_count > 0 THEN 1 ELSE 0 END) / COUNT(*), 2) as conversion_rate,
AVG(total_value) as avg_order_value
FROM RAW.BEAM_SESSIONS
WHERE first_seen >= DATEADD('day', -90, CURRENT_DATE())
GROUP BY 1
ORDER BY 1 DESC;
UTM Campaign Analysis
Conversions by UTM Source:
SELECT
utm_source,
utm_medium,
COUNT(*) as total_events,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as purchases,
SUM(CASE WHEN event_type = 'lead' THEN 1 ELSE 0 END) as leads
FROM RAW.BEAM_EVENTS
WHERE utm_source IS NOT NULL
AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY utm_source, utm_medium
ORDER BY purchases DESC;
Attribution Priority Distribution:
SELECT
CASE attribution_priority
WHEN 1 THEN 'Click ID'
WHEN 2 THEN 'UTM'
WHEN 3 THEN 'Referrer'
WHEN 4 THEN 'Organic'
END as attribution_method,
COUNT(*) as event_count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as percentage
FROM RAW.BEAM_EVENTS
WHERE created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY attribution_priority
ORDER BY attribution_priority;
Campaign Performance with Attribution:
SELECT
utm_campaign,
attribution_source,
COUNT(DISTINCT laser_beam_session) as sessions,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as conversions,
ROUND(SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) * 100.0 /
NULLIF(COUNT(DISTINCT laser_beam_session), 0), 2) as conversion_rate
FROM RAW.BEAM_EVENTS
WHERE utm_campaign IS NOT NULL
AND created_at >= DATEADD('day', -30, CURRENT_DATE())
GROUP BY utm_campaign, attribution_source
HAVING conversions > 0
ORDER BY conversions DESC
LIMIT 20;
Next Steps
Ready to start integrating? Review the complete API documentation for all available parameters and response formats.