Create an Automated Pipeline
Build an Automated Research Data Pipeline (Warehouse Sync)
Introduction
This tutorial shows how to build an automated pipeline that pulls UserTesting Results API data and loads it into a data warehouse (such as Snowflake, BigQuery, or Redshift) on a scheduled basis.
The goal is to enable nightly or periodic syncs of session-level research data so analytics, BI, and AI teams can work with UserTesting insights alongside product and business metrics.
What you’ll build
A repeatable pipeline that:
- Uses a known
testIdto list all completed sessions - Pages through session results safely
- Retrieves structured session details
- Retrieves session transcripts
- Loads the data into warehouse-friendly tables
Target audience
- Data engineers
- Analytics engineers
- Research operations teams
- ML / AI teams preparing data for downstream analysis
Prerequisites
- A valid access token (
ACCESS_TOKEN). Go to Authorization for details. - A known test ID, referred to as
TEST_IDortestId. - A scheduler (cron, Airflow, dbt Cloud, GitHub Actions, etc.)
- A destination warehouse (Snowflake, BigQuery, Redshift, etc.)
High-level architecture
- Scheduler triggers the job (nightly or on demand)
- Extractor calls the Results API
- Transformer normalizes JSON into tables
- Loader writes data to the warehouse
graph LR;
Scheduler --> ResultsAPI;
ResultsAPI --> JSON;
JSON --> Normalize;
Normalize --> WarehouseTables;
Steps
Step 1 — List sessions for a test (pagination aware)
Endpoint
GET /api/v2/sessionResultsWhy this matters
- Tests can have many sessions
- Pagination must be handled correctly for full exports
Example (curl)
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults?testId=TEST_ID&limit=25&offset=50' \
--header 'Authorization: Bearer ACCESS_TOKEN' \
--header 'Content-Type: application/json'Best Practice: Always use pagination query parameters (
limitandoffset) along with the appropriate logic to ensure all sessions are retrieved.
- Use the response data from
limit,offset, andtotalCountin themeta.paginationproperty to calculate the number of page iterations needed to collect all sessions in the test.- By default,
limitis set to25andoffsetto0.- In the response, sessions are sorted in descending order, i.e., from newest to oldest.
Key fields to store
testIdsessionIdstatusstartTimefinishTimeaudienceId(if present)
Pagination logic
- Retrieve
meta.pagination.totalCountto determine the total number of sessions. - Increment the
offsetby thelimitvalue in each request until theoffsetreaches or exceeds thetotalCount, ensuring all sessions are retrieved. - Save a checkpoint if running in incremental mode to track progress and avoid reprocessing.
Step 2 — Retrieve session details
Endpoint
GET /api/v2/sessionResults/SESSION_IDPurpose
This endpoint provides the core structured research data, including:
- Participant demographics
- Task types and responses
- IDs needed for joins
Example (curl)
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID' \
--header 'Authorization: Bearer ACCESS_TOKEN' \
--header 'Content-Type: application/json'Recommended warehouse tables
- session_id
- test_id
- audience_id
- test_plan_id
- participant_id
- demographic_code
- label
- value
- type
- task_id
- task_type
- task_response (JSON / VARIANT)
Take into account that
sessionIdandparticipantIdshare the same value. This can simplify gathering the participant's demographic data.
Step 3 — Retrieve transcripts (text-based insights)
For sessions that have video recordings, such as those based on interaction tests.
Endpoint
GET /api/v2/sessionResults/SESSION_ID/transcriptResponse format
- text/vtt (WebVTT subtitle format)
Example (curl)
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/transcript' \
--header 'Authorization: Bearer ACCESS_TOKEN'Warehouse strategy
- Store raw VTT text as-is
- Optionally:
- Strip timestamps
- Store a derived plain_text column
- Chunk text for LLM usage
Recommended table
- session_id
- transcript_vtt (TEXT)
- transcript_plain_text (TEXT, optional)
Step 4 — (Optional) Video handling strategy
For sessions with recorded videos, such as those based on interaction tests.
Endpoint
GET /api/v2/sessionResults/SESSION_ID/videoDownloadUrlRequest sample
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/videoDownloadUrl' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer ACCESS_TOKEN 'Important behavior
- URLs are pre-signed
- URLs expire after one hour
Best practice for pipelines
- Do not store the video URL itself
- Either:
- Download the video immediately into object storage (S3, GCS)
- Or skip video ingestion for nightly syncs
Recommended table (metadata only)
- session_id
- video_storage_path
- downloaded_at
Rate limits and batching
Take into consideration:
- Endpoints allow 10 requests per minute
Use:
- Request batching
- Exponential backoff on rate limit error messages (429 status code)
- Parallelism with limits (e.g., 3–5 concurrent requests)
Incremental vs full syncs
Full sync (simplest)
- Re-pull all sessions for a test on a nightly basis
- Overwrite or upsert warehouse tables
Incremental sync (recommended at scale)
- Track:
- session
finishTime - last successful sync timestamp
- session
- Only process newly completed sessions
Example pseudo-code
checkpoint = read_checkpoint(test_id) # e.g., last_finish_time or last_run_at
for page in paginate_sessions(test_id, status="completed", page_size=100):
for s in page.sessions:
if checkpoint and s.finishTime <= checkpoint:
continue # incremental mode
details = with_backoff(lambda: get_session_details(s.sessionId))
transcript = with_backoff(lambda: get_transcript(s.sessionId)) # may be optional / not always present
upsert_sessions(details)
upsert_participants(details)
upsert_demographics(details)
upsert_task_results(details)
upsert_transcripts(s.sessionId, transcript)
# optional: if ingesting video
# video_url = with_backoff(lambda: get_video_download_url(s.sessionId))
# download_immediately(video_url) # URL expires quickly
# upsert_session_videos_metadata(...)
update_checkpoint(test_id, max_finish_time_processed)Code overview: The pipeline reads a saved checkpoint (such as the last processed finishTime), then pages through all completed sessions for a test. For each session, it skips records that were already processed, retrieves structured session details and transcripts with retry and backoff handling, and upserts the normalized data into warehouse tables. An optional step shows how video downloads could be handled immediately due to expiring URLs. Finally, the pipeline updates the checkpoint so future runs only process newly completed sessions.
Common errors and troubleshooting
| Error | Meaning | Fix |
|---|---|---|
| 401 | Invalid or missing access token | Verify authorization header |
| 404 | Test or session not found | Validate IDs |
| 429 | Rate limit exceeded | Add backoff + batching |
What you can build next
Once this pipeline is running, you can:
- Power BI dashboards with UX metrics
- Feed transcripts into LLM workflows
- Join UserTesting data with product analytics
- Trigger alerts when certain task patterns appear
Summary
You now have a scalable pattern to:
- Extract UserTesting research data programmatically
- Normalize it for analytics and AI
- Keep insights continuously in sync with your organization’s data ecosystem
This pipeline forms the backbone for advanced dashboards, automation, and AI-powered insight generation.
Updated 17 days ago
