Classify Tests by Type
Build a Bulk Test Classification Workflow
Introduction
This tutorial shows how to build an automated classification workflow that identifies and categorizes all compatible tests in your UserTesting workspace by type: survey, interaction test, or think-out-loud test (beta).
The goal is to enable your research and data teams to programmatically discover which tests are compatible with the Results API and understand what data is available for each test. This is especially valuable when working with large test libraries where manual classification is impractical.
Since the UserTesting API doesn't currently have a straightforward way of detecting test type, the script we propose infers the test type by probing specific endpoints. For each test, it checks whether certain Results API endpoints return success (HTTP 200) or fail (HTTP 404).
What you'll build
A 5-step resumable Python workflow that:
- Discovers all tests across your workspaces
- Identifies which tests are compatible with the Results API (UTZ) versus legacy (ClassicUT)
- Classifies each test by type using endpoint availability checks
- Persists results to JSON files for downstream use with resumable checkpoints
- Verifies success with built-in validation checks and data quality assertions
The workflow automatically refreshes tokens and enforces rate limits throughout, and discovers new tests added since the last run by comparing against saved results.
Target audience
- Research operations teams managing large test libraries
- Data engineers building Results API data pipelines
- Analytics engineers preparing test metadata for dashboards
- ML / AI teams sourcing data from UserTesting research
Prerequisites
- OAuth credentials (
CLIENT_ID,CLIENT_SECRET) to generate and refresh access tokens (ACCESS_TOKEN). Go to Authorization for details. - UserTesting workspace access as a team member
- Python 3.7+ with
requestsandpython-dotenvpackages - A
.envfile with your OAuth credentials and initial tokens - A directory structure for persisting state (e.g.,
state/folder)
High-level architecture
The workflow follows a multi-step process where data flows through discovery, identification, and classification stages:
- OAuth Token Manager ensures access tokens stay fresh for long-running jobs
- Workspace Discovery retrieves all available workspaces and tests
- UTZ Identifier checks which tests are compatible with Results API
- Type Classifier determines test type based on endpoint availability
- Persistence Layer saves results incrementally for resumable processing
graph LR;
OAuth["OAuth Token<br/>Manager"] --> Discover["Workspace<br/>Discovery"];
Discover --> Identify["UTZ vs<br/>ClassicUT"];
Identify --> Classify["Type<br/>Classifier"];
Classify --> Persist["Persist to<br/>JSON Files"];
OAuth -.->|refresh| Discover;
OAuth -.->|refresh| Identify;
OAuth -.->|refresh| Classify;
Example files
We provide some example files that you can use as guides to build your own implementations or use them out-of-the-box to start classifying your workspace tests.
Quick start:
- Create copies of the example files below in your local environment.
- Configure your
.envwith OAuth credentials.- Run
identify-and-classify-tests.pyto execute the workflow and generate classified test lists. For example:python identify-and-classify-tests.py
Environment variables
Create a .env file to store your OAuth credentials and tokens here. The generate-token.py script will update the ACCESS_TOKEN and EXPIRES_IN fields automatically.
.env
CLIENT_ID=YOUR_CLIENT_ID
CLIENT_SECRET=YOUR_CLIENT_SECRET
ACCESS_TOKEN=YOUR_INITIAL_ACCESS_TOKEN
EXPIRES_IN=TOKEN_EXPIRY_TIMESTAMPGenerate and refresh access tokens
This script generates a new OAuth access token using your client credentials and saves it to your .env file. It also calculates the expiry time and saves it so that the main workflow can refresh tokens proactively.
generate-token.py
import argparse
import requests
import json
import os
import sys
import time
from dotenv import load_dotenv
from rich import print_json
load_dotenv() # take environment variables from .env
parser = argparse.ArgumentParser(description="Generate or refresh access token")
parser.add_argument(
"--force",
action="store_true",
help="Force refresh even if the current token has not expired",
)
args = parser.parse_args()
client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
current_access_token = os.getenv("ACCESS_TOKEN")
current_expires_in = os.getenv("EXPIRES_IN")
URL = "https://auth.usertesting.com/oauth2/aus1p3vtd8vtm4Bxv0h8/v1/token"
PAYLOAD = (
f'grant_type=client_credentials'
f'&client_id={client_id}'
f'&client_secret={client_secret}'
f'&scope=studies%3Aread'
)
HEADERS = {
'Content-Type': 'application/x-www-form-urlencoded',
}
# Check if current access token is expired
def is_token_expired():
if not current_access_token or not current_expires_in:
return True
try:
expires_in = int(current_expires_in)
# Assume EXPIRES_IN is the epoch time when the token expires
return time.time() > expires_in
except Exception:
return True
if not args.force and not is_token_expired():
print("Current access token is still valid. No update needed.")
sys.exit(0)
response = requests.post(URL, headers=HEADERS, data=PAYLOAD)
if response.headers.get('Content-Type', '').startswith('application/json'):
data = response.json()
print_json(json.dumps(data))
access_token = data.get("access_token")
expires_in = data.get("expires_in")
if access_token and expires_in:
# Calculate the absolute expiry time (epoch seconds)
expires_at = int(time.time()) + int(expires_in)
env_path = os.path.join(os.path.dirname(__file__), '.env')
with open(env_path, 'r') as file:
lines = file.readlines()
with open(env_path, 'w') as file:
for line in lines:
if line.startswith('ACCESS_TOKEN='):
file.write(f'ACCESS_TOKEN={access_token}\n')
elif line.startswith('EXPIRES_IN='):
file.write(f'EXPIRES_IN={expires_at}\n')
else:
file.write(line)
# Add EXPIRES_IN if not present
if not any(line.startswith('EXPIRES_IN=') for line in lines):
file.write(f'EXPIRES_IN={expires_at}\n')
print('ACCESS_TOKEN and EXPIRES_IN updated in .env file.')
elif access_token:
print('No expires_in found in response. ACCESS_TOKEN not updated.')
else:
print('No access_token found in response.')
else:
print(response.text)
Full reference script
This is a comprehensive script that implements the entire workflow: discovering workspaces and tests, identifying UTZ vs ClassicUT, classifying test types, handling token refresh, enforcing rate limits, and persisting results. It includes robust error handling and is designed for long-running jobs.
identify-and-classify-tests.py
# ----------------------------
# Script: identify-and-classify-tests.py
# Purpose: Single unified script that:
# 1. Identifies which test UUIDs are "UTZ" (have sessionResults) vs "ClassicUT" (404 on sessionResults)
# 2. For UTZ tests, immediately classifies them as "interaction test", "think-out-loud test", or "survey"
# 3. Persists all results to disk for downstream use
# ----------------------------
import argparse
import json
import os
import subprocess
import time
from typing import Any, Dict, List, Optional, Set, Tuple
import requests
from dotenv import load_dotenv
# Run generate-token.py to ensure ACCESS_TOKEN is fresh
subprocess.run(["python", os.path.join(os.path.dirname(__file__), "generate-token.py")], check=True)
load_dotenv()
# ----------------------------
# Configuration
# ----------------------------
# OAuth token from environment (loaded via .env file)
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN", "ACCESS_TOKEN")
# API Endpoints
WORKSPACES_URL = "https://api.use2.usertesting.com/usertesting/api/v1/workspaces"
WORKSPACE_DETAIL_URL = "https://api.use2.usertesting.com/usertesting/api/v1/workspaces/{workspace_uuid}"
SESSION_RESULTS_URL = "https://api.use2.usertesting.com/api/v2/sessionResults/"
BASE_URL = "https://api.use2.usertesting.com/api/v2"
# Rate limiting configuration
# The /sessionResults endpoint allows 10 requests per minute
# To stay within that limit, we enforce a 6-second interval between requests
SESSION_RESULTS_MIN_INTERVAL_SECONDS = 6.0
REQUEST_INTERVAL_SECONDS = 6.0
# Persistence paths
# Results are saved to these JSON files for resumable, incremental processing
STATE_DIR = "state"
UTZ_LIST_PATH = os.path.join(STATE_DIR, "utz_tests.json") # UTZ tests (API-compatible) with test_type field
CLASSIC_LIST_PATH = os.path.join(STATE_DIR, "classicut_tests.json") # ClassicUT tests (legacy)
ERROR_LIST_PATH = os.path.join(STATE_DIR, "errors.json") # Error log
# Token management: OAuth tokens expire after ~3600 seconds (1 hour)
# We refresh tokens proactively if they expire within this threshold
REFRESH_BEFORE_EXPIRY_SECONDS = 5 * 60 # 5 minutes
# Global rate limiting tracker
# Tracks the last API call timestamp to enforce minimum intervals between requests
last_call_time = 0.0
# ----------------------------
# Helpers: file persistence
# ----------------------------
# These functions handle loading and saving JSON files for resumable processing.
# By persisting state incrementally, we can resume after interruptions.
def ensure_dirs() -> None:
"""
Create the state directory and output directories if they don't exist.
This must be called before saving any files.
"""
os.makedirs(STATE_DIR, exist_ok=True)
def load_object_list(path: str) -> List[Dict[str, Any]]:
"""Load a JSON array of objects (e.g., test records with metadata)."""
if not os.path.exists(path):
return []
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"{path} must contain a JSON array")
return data
def save_object_list(path: str, items: List[Dict[str, Any]]) -> None:
"""Save a JSON array of objects, deduped by test_uuid."""
# Deduplicate by test_uuid (keep first occurrence)
seen = set()
deduped = []
for item in items:
if isinstance(item, dict):
test_uuid = item.get("test_uuid")
if test_uuid and test_uuid not in seen:
seen.add(test_uuid)
deduped.append(item)
with open(path, "w", encoding="utf-8") as f:
json.dump(deduped, f, indent=2)
def append_to_object_list_file(
path: str,
test_uuid: str,
workspace_uuid: str,
workspace_name: str,
) -> None:
"""
Append a test record to a list file, avoiding duplicates.
This is called during identification (Step 2) to record whether each test
is UTZ or ClassicUT. Saves immediately to support resumable processing.
Args:
path: File path (e.g., utz_tests.json or classicut_tests.json)
test_uuid: The test ID being classified
workspace_uuid: The workspace this test belongs to
workspace_name: The workspace display name
"""
items = load_object_list(path)
# Check if test_uuid already exists
if not any(item.get("test_uuid") == test_uuid for item in items):
items.append({
"test_uuid": test_uuid,
"workspace_uuid": workspace_uuid,
"workspace_name": workspace_name,
})
save_object_list(path, items)
def load_classifications() -> Dict[str, str]:
"""
Load previously saved test classifications from utz_tests.json.
This allows resuming classification runs without re-classifying tests.
Returns:
Dictionary mapping test_uuid -> test_type
(e.g., "interaction test", "think-out-loud test", "survey")
"""
classifications = {}
utz_records = load_object_list(UTZ_LIST_PATH)
for record in utz_records:
if isinstance(record, dict):
test_uuid = record.get("test_uuid")
test_type = record.get("test_type")
if test_uuid and test_type:
classifications[test_uuid] = test_type
return classifications
def update_test_classification(test_uuid: str, test_type: str) -> None:
"""
Update a specific test's classification in utz_tests.json.
This is called in Step 3 after classifying each test, making the script
interrupt-safe and allowing resumable processing.
Args:
test_uuid: The test UUID to update
test_type: The classification type ("interaction test", "think-out-loud test", or "survey")
"""
utz_records = load_object_list(UTZ_LIST_PATH)
# Find and update the test record
for record in utz_records:
if isinstance(record, dict) and record.get("test_uuid") == test_uuid:
record["test_type"] = test_type
break
# Save back to file
save_object_list(UTZ_LIST_PATH, utz_records)
def load_json_dict(path: str) -> Dict[str, Any]:
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict):
raise ValueError(f"{path} must contain a JSON object")
return data
def save_json_dict(path: str, obj: Dict[str, Any]) -> None:
with open(path, "w", encoding="utf-8") as f:
json.dump(obj, f, indent=2)
def safe_json(resp: requests.Response) -> Any:
try:
return resp.json()
except Exception:
return resp.text
def verify_success_checks(
utz_total: int,
classic_total: int,
errors: Dict[str, Any],
) -> None:
"""
Step 4 verification checks for output artifacts and data quality.
This validates that expected files exist, are parseable JSON, and contain
expected structures for downstream use.
"""
print("\nVerification checks:")
# 1) Output files exist
files_exist = all(
os.path.exists(path)
for path in [UTZ_LIST_PATH, CLASSIC_LIST_PATH, ERROR_LIST_PATH]
)
print(f" [{'PASS' if files_exist else 'FAIL'}] Output files exist")
# 2) Output files are valid JSON and expected container types
json_ok = True
try:
utz_records = load_object_list(UTZ_LIST_PATH)
classic_records = load_object_list(CLASSIC_LIST_PATH)
loaded_errors = load_json_dict(ERROR_LIST_PATH)
json_ok = isinstance(utz_records, list) and isinstance(classic_records, list) and isinstance(loaded_errors, dict)
except Exception as e:
json_ok = False
print(f" [FAIL] JSON parse/type validation error: {e}")
utz_records = []
if json_ok:
print(" [PASS] Output files are valid JSON")
# 3) UTZ schema and test_type values
allowed_test_types = {
"interaction test",
"think-out-loud test",
"survey",
}
required_fields = {"test_uuid", "workspace_uuid", "workspace_name"}
invalid_schema_count = 0
invalid_test_type_count = 0
classified_count = 0
for record in utz_records:
if not isinstance(record, dict):
invalid_schema_count += 1
continue
if not required_fields.issubset(record.keys()):
invalid_schema_count += 1
test_type = record.get("test_type")
if test_type is not None:
if test_type in allowed_test_types:
classified_count += 1
else:
invalid_test_type_count += 1
schema_ok = invalid_schema_count == 0 and invalid_test_type_count == 0
if schema_ok:
print(" [PASS] UTZ records include expected fields and valid test_type values")
else:
print(
" [FAIL] UTZ schema/type issues: "
f"invalid_schema={invalid_schema_count}, invalid_test_type={invalid_test_type_count}"
)
# 4) Classification coverage note
unclassified_utz = max(utz_total - classified_count, 0)
print(
" [INFO] Classification coverage: "
f"classified={classified_count}, unclassified={unclassified_utz}, utz_total={utz_total}"
)
if unclassified_utz > 0:
print(" [INFO] Some UTZ tests may be unclassified (for example, tests with no sessions yet).")
# 5) errors.json interpretation
error_count = len(errors)
if error_count == 0:
print(" [PASS] No processing errors recorded")
else:
print(f" [WARN] errors.json contains {error_count} record(s) (partial success; review recommended)")
# ----------------------------
# Token management
# ----------------------------
# These functions handle OAuth token lifecycle:
# 1. Checking if tokens are expired
# 2. Refreshing tokens before expiry
# 3. Building auth headers with fresh tokens
# This is critical for long-running scripts (multiple hours)
def get_token_expiry_time() -> Optional[float]:
"""Get token expiry epoch time from environment."""
expires_in_str = os.environ.get("EXPIRES_IN")
if expires_in_str:
try:
return float(expires_in_str)
except (ValueError, TypeError):
return None
return None
def should_refresh_token() -> bool:
"""Check if token should be refreshed (expires within REFRESH_BEFORE_EXPIRY_SECONDS)."""
expiry = get_token_expiry_time()
if not expiry:
return True
time_until_expiry = expiry - time.time()
return time_until_expiry < REFRESH_BEFORE_EXPIRY_SECONDS
def refresh_access_token() -> bool:
"""Refresh the access token by running generate-token.py."""
try:
subprocess.run(
["python", os.path.join(os.path.dirname(__file__), "generate-token.py"), "--force"],
check=True,
capture_output=True,
)
# Reload .env to get new token
load_dotenv(override=True)
print("[INFO] Access token refreshed successfully.")
return True
except subprocess.CalledProcessError as e:
print(f"[WARN] Failed to refresh token: {e}")
return False
def ensure_token_fresh() -> str:
"""Check if token needs refresh; if so, refresh. Return current token."""
if should_refresh_token():
print("[INFO] Token expiring soon, refreshing...")
refresh_access_token()
return os.environ.get("ACCESS_TOKEN", "ACCESS_TOKEN")
def get_headers() -> Dict[str, str]:
"""
Build HTTP headers for API requests, ensuring token is fresh.
This is called before every API request to refresh tokens if needed.
Returns:
Dictionary with Authorization header (Bearer token) and Content-Type
"""
token = ensure_token_fresh()
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
# ----------------------------
# API calls
# ----------------------------
# These functions make requests to UserTesting API endpoints.
# Each includes:
# - Rate limiting enforcement (6-second intervals)
# - 429 (Too Many Requests) handling with exponential backoff
# - Timeout protection (10-30 seconds)
# - Error collection instead of hard failures
def get_workspaces() -> List[Dict[str, Any]]:
"""
Step 1: Fetch all workspaces the user has access to.
Returns:
List of workspace objects with UUID, name, and other metadata
"""
resp = requests.get(WORKSPACES_URL, headers=get_headers(), timeout=30)
resp.raise_for_status()
data = resp.json()
if not isinstance(data, list):
raise ValueError(f"Expected list from workspaces endpoint, got: {type(data)}")
return data
def get_all_workspace_studies(workspace_uuid: str) -> List[Dict[str, Any]]:
"""
Step 1 (continued): Fetch all studies (tests) within a workspace using pagination.
The API returns up to 50 studies per request and uses cursor-based pagination.
This function automatically pages through all results to build a complete list.
Args:
workspace_uuid: The workspace to query
Returns:
List of all study/test objects in the workspace
"""
all_studies: List[Dict[str, Any]] = []
after: Optional[str] = None
page_num = 0
start_time = time.time()
seen_after_values: Set[str] = set()
while True:
url = WORKSPACE_DETAIL_URL.format(workspace_uuid=workspace_uuid)
params = {"after": after} if after else None
resp = requests.get(url, headers=get_headers(), params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
studies = data.get("studies", [])
if not isinstance(studies, list):
raise ValueError(f"Expected 'studies' to be a list, got: {type(studies)}")
page_num += 1
all_studies.extend(studies)
elapsed_s = time.time() - start_time
print(
f"Workspace {workspace_uuid}: page {page_num}, "
f"+{len(studies)} studies (total {len(all_studies)}) "
f"elapsed {elapsed_s:.1f}s"
)
if not studies:
break
last_cursor = studies[-1].get("cursor")
if not last_cursor or not isinstance(last_cursor, str):
break
if last_cursor in seen_after_values:
break
seen_after_values.add(last_cursor)
after = last_cursor
return all_studies
def get_session_results(test_uuid: str, offset: int = 0, limit: int = 10) -> requests.Response:
"""
Step 2: Check if a test is UTZ or ClassicUT by querying /sessionResults.
HTTP Status Response:
- 200: UTZ test (has sessions, compatible with Results API)
- 404: ClassicUT test (legacy platform, incompatible)
Args:
test_uuid: The test ID to check
offset: Pagination offset (default 0)
limit: Max results per page (default 10)
Returns:
requests.Response object (status 200, 404, 429, etc.)
"""
params = {"testId": test_uuid, "offset": offset, "limit": limit}
return requests.get(SESSION_RESULTS_URL, headers=get_headers(), params=params, timeout=30)
def check_video_url(session_id: str) -> bool:
"""
Step 3a: Check if a session has a video recording available.
Interaction tests produce video and QX score data. If this endpoint
returns 200, the test is classified as an "interaction test".
This is called during Step 3 classification for each UTZ test with sessions.
Args:
session_id: The session ID from the test
Returns:
True if video is available (200 response), False otherwise
"""
global last_call_time
# Enforce rate limiting
now = time.time()
elapsed = now - last_call_time
if elapsed < REQUEST_INTERVAL_SECONDS:
time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
endpoint = f"{BASE_URL}/sessionResults/{session_id}/videoDownloadUrl"
try:
response = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
if response.status_code == 200:
return True
elif response.status_code == 429:
# Handle rate limiting
retry_after = response.headers.get("Retry-After")
wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
print(f"[WARN] 429 rate limit on video URL. Backing off for {wait_s}s")
time.sleep(wait_s)
# Retry once
response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
return response2.status_code == 200
else:
return False
except Exception as e:
print(f"[WARN] Error checking video URL for {session_id}: {e}")
return False
def check_qx_scores(test_id: str) -> bool:
"""
Step 3a (alternative): Check if a test has QX (Quality of Experience) scores.
Interaction tests produce QX scores. If this endpoint returns 200 and
check_video_url() returned False, the test is still classified as
an "interaction test" (because it has QX scores).
Args:
test_id: The test ID
Returns:
True if QX scores are available (200 response), False otherwise
"""
global last_call_time
# Enforce rate limiting
now = time.time()
elapsed = now - last_call_time
if elapsed < REQUEST_INTERVAL_SECONDS:
time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
endpoint = f"{BASE_URL}/testResults/{test_id}/qxScores"
try:
response = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
if response.status_code == 200:
return True
elif response.status_code == 429:
# Handle rate limiting
retry_after = response.headers.get("Retry-After")
wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
print(f"[WARN] 429 rate limit on QX scores. Backing off for {wait_s}s")
time.sleep(wait_s)
# Retry once
response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
return response2.status_code == 200
else:
return False
except Exception as e:
print(f"[WARN] Error checking QX scores for {test_id}: {e}")
return False
def check_transcript(session_id: str) -> bool:
"""
Step 3b: Check if a session has a transcript available.
Think-out-loud tests produce transcripts. If this endpoint returns 200
(and video/QX checks failed), the test is classified as a "think-out-loud test".
Args:
session_id: The session ID from the test
Returns:
True if transcript is available (200 response), False otherwise
"""
global last_call_time
# Enforce rate limiting
now = time.time()
elapsed = now - last_call_time
if elapsed < REQUEST_INTERVAL_SECONDS:
time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
endpoint = f"{BASE_URL}/sessionResults/{session_id}/transcript"
try:
response = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
if response.status_code == 200:
return True
elif response.status_code == 429:
# Handle rate limiting
retry_after = response.headers.get("Retry-After")
wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
print(f"[WARN] 429 rate limit on transcript. Backing off for {wait_s}s")
time.sleep(wait_s)
# Retry once
response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
last_call_time = time.time()
return response2.status_code == 200
else:
return False
except Exception as e:
print(f"[WARN] Error checking transcript for {session_id}: {e}")
return False
def classify_test(test_id: str, session_id: str) -> str:
"""
Step 3: Classify a single UTZ test by checking endpoint availability.
This implements the classification methodology:
- Ends-to-end call check_video_url, check_qx_scores, check_transcript
- Returns the classification based on which endpoints return 200
Classification logic:
1. If video URL or QX scores endpoint returns 200 -> "interaction test"
2. Else if transcript endpoint returns 200 -> "think-out-loud test"
3. Else -> "survey" (default, no special data available)
Args:
test_id: The test ID (used for QX scores endpoint)
session_id: The session ID (used for video and transcript endpoints)
Returns:
One of: "interaction test", "think-out-loud test", or "survey"
"""
# Check interaction test (video or QX scores)
if check_video_url(session_id):
print(f" → interaction test (video available)")
return "interaction test"
if check_qx_scores(test_id):
print(f" → interaction test (QX scores available)")
return "interaction test"
# Check think-out-loud test (transcript)
if check_transcript(session_id):
print(f" → think-out-loud test (transcript available)")
return "think-out-loud test"
# Default to survey
print(f" → survey (no other data available)")
return "survey"
# ----------------------------
# Workspace selection
# ----------------------------
def _format_workspace_label(workspace: Dict[str, Any]) -> str:
name = workspace.get("name") or workspace.get("label") or workspace.get("title")
if name and isinstance(name, str):
return f"{workspace.get('uuid')} ({name})"
return str(workspace.get("uuid"))
def _select_workspace_uuids(
workspaces: List[Dict[str, Any]],
requested_uuid: Optional[str],
) -> List[str]:
workspace_uuids = [w.get("uuid") for w in workspaces if isinstance(w, dict) and w.get("uuid")]
if requested_uuid:
if requested_uuid in workspace_uuids:
return [requested_uuid]
raise ValueError(
f"Workspace UUID not found: {requested_uuid}. "
"Run without --workspace-uuid to see available workspaces."
)
if not workspace_uuids:
return []
print("Available workspaces:")
for idx, w in enumerate(workspaces, start=1):
if not isinstance(w, dict) or not w.get("uuid"):
continue
print(f" {idx}) {_format_workspace_label(w)}")
while True:
choice = input("Select workspace number (or press Enter for ALL): ").strip()
if choice == "" or choice == "0":
return workspace_uuids
if choice.isdigit():
idx = int(choice)
if 1 <= idx <= len(workspaces):
chosen = workspaces[idx - 1]
if isinstance(chosen, dict) and chosen.get("uuid"):
return [chosen.get("uuid")]
print("Invalid selection. Try again.")
# ----------------------------
# Main workflow
# ----------------------------
def run(workspace_uuid: Optional[str] = None) -> None:
"""
Main workflow orchestrator:
- Step 1: Discover all tests in workspaces
- Step 2: Identify UTZ vs ClassicUT tests
- Step 3: Classify UTZ tests by type
- Step 4: Persist and report results
This function is resumable: it loads existing results, processes new tests,
and re-checks previously discovered UTZ tests that are still unclassified.
This makes it safe to interrupt and re-run.
Args:
workspace_uuid: Optional workspace UUID to process single workspace.
If None, user is prompted to select.
"""
ensure_dirs()
# Load existing results to support resumable processing
utz_records = load_object_list(UTZ_LIST_PATH)
classic_records = load_object_list(CLASSIC_LIST_PATH)
classifications = load_classifications()
errors: Dict[str, Any] = load_json_dict(ERROR_LIST_PATH)
# Build a set of already-processed tests
# This prevents re-identifying tests we've already seen
utz_known: Set[str] = set(r.get("test_uuid") for r in utz_records if isinstance(r, dict) and r.get("test_uuid"))
classic_known: Set[str] = set(r.get("test_uuid") for r in classic_records if isinstance(r, dict) and r.get("test_uuid"))
already_processed = utz_known.union(classic_known)
# Build workspace UUID -> name lookup
all_workspaces = get_workspaces()
workspace_name_map = {
w.get("uuid"): (w.get("name") or w.get("label") or w.get("title") or "Unknown")
for w in all_workspaces
if isinstance(w, dict) and w.get("uuid")
}
# ===== STEP 1: Discover All Tests =====
# Fetch list of workspaces and let user choose which to process
print(f"Found {len(all_workspaces)} workspaces.")
workspace_uuids = _select_workspace_uuids(all_workspaces, workspace_uuid)
print(f"Processing {len(workspace_uuids)} workspace(s).\n")
# ===== STEP 1 (continued): Fetch study UUIDs from workspaces =====
# Use cursor-based pagination to get all tests (studies) in each workspace
study_uuids: Set[str] = set()
test_uuid_to_workspace_uuid: Dict[str, str] = {}
for wuuid in workspace_uuids:
try:
studies = get_all_workspace_studies(wuuid)
print(f"Workspace {wuuid}: {len(studies)} studies found")
for s in studies:
if not isinstance(s, dict):
continue
suuid = s.get("uuid")
if suuid and isinstance(suuid, str):
study_uuids.add(suuid)
test_uuid_to_workspace_uuid[suuid] = wuuid
except requests.HTTPError as e:
errors[f"workspace:{wuuid}"] = {
"error": str(e),
"type": "workspace_detail_http_error",
}
save_json_dict(ERROR_LIST_PATH, errors)
print(f"[WARN] Failed workspace detail for {wuuid}: {e}")
print(f"Found {len(study_uuids)} unique study UUIDs across workspaces.\n")
# ===== STEP 2: Identify UTZ vs ClassicUT Tests =====
# For each test in the processing set, query /sessionResults to determine
# current state and classify when possible.
global last_call_time
last_call_time = 0.0
# Build processing set:
# 1) new tests not yet identified as UTZ/ClassicUT
# 2) previously identified UTZ tests that still have no classification
new_to_process = sorted([u for u in study_uuids if u not in already_processed])
utz_unclassified_to_recheck = sorted(
[u for u in study_uuids if u in utz_known and u not in classifications]
)
to_process = sorted(set(new_to_process).union(utz_unclassified_to_recheck))
print(f"{len(new_to_process)} study UUIDs are new and will be processed.")
print(
f"{len(utz_unclassified_to_recheck)} previously discovered UTZ study UUIDs "
"without classification will be rechecked.\n"
)
stats = {
"interaction_test": 0,
"think_out_loud_test": 0,
"survey": 0,
"classicut": 0,
}
for idx, test_uuid in enumerate(to_process, start=1):
# Enforce rate limiting: 6 second minimum interval between API calls
now = time.time()
elapsed = now - last_call_time
if elapsed < SESSION_RESULTS_MIN_INTERVAL_SECONDS:
time.sleep(SESSION_RESULTS_MIN_INTERVAL_SECONDS - elapsed)
test_workspace_uuid = test_uuid_to_workspace_uuid.get(test_uuid, "Unknown")
ws_name = workspace_name_map.get(test_workspace_uuid, "Unknown")
# --- STEP 2: IDENTIFICATION ---
# Query /sessionResults to check if test is UTZ (200) or ClassicUT (404)
resp = get_session_results(test_uuid, offset=0, limit=10)
last_call_time = time.time()
if resp.status_code == 200:
# ✓ THIS IS A UTZ TEST (API-compatible)
session_data = resp.json()
sessions = session_data.get("sessions", [])
# Record this test as UTZ in persistent storage
append_to_object_list_file(UTZ_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
utz_known.add(test_uuid)
# --- STEP 3: CLASSIFICATION (only for UTZ tests with sessions) ---
# Classify this test by checking which endpoints return 200
if sessions:
session_id = sessions[0].get("sessionId")
if session_id and test_uuid not in classifications:
print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid}")
classification = classify_test(test_uuid, session_id)
classifications[test_uuid] = classification
update_test_classification(test_uuid, classification)
# Update statistics
if classification == "interaction test":
stats["interaction_test"] += 1
elif classification == "think-out-loud test":
stats["think_out_loud_test"] += 1
elif classification == "survey":
stats["survey"] += 1
else:
if test_uuid in classifications:
print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (already classified)")
else:
print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (no sessions to classify)")
else:
print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (no sessions)")
elif resp.status_code == 404:
# ✗ THIS IS A CLASSICCUT TEST (legacy, not Results API compatible)
# Record this test as ClassicUT in persistent storage
append_to_object_list_file(CLASSIC_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
classic_known.add(test_uuid)
stats["classicut"] += 1
print(f"[{idx}/{len(to_process)}] ClassicUT 404: {test_uuid}")
elif resp.status_code == 401:
# Authentication error - token is missing or invalid
# This is a fatal error, stop processing
errors[test_uuid] = {
"status": resp.status_code,
"body": safe_json(resp),
"type": "auth_error",
}
save_json_dict(ERROR_LIST_PATH, errors)
raise RuntimeError(f"Authentication error (401). Check ACCESS_TOKEN. Run generate-token.py to refresh.")
elif resp.status_code == 400:
# Bad request: offset or limit parameters are invalid
# Offset must be 0-10000, limit must be 1-500
errors[test_uuid] = {
"status": resp.status_code,
"body": safe_json(resp),
"type": "invalid_pagination_error",
}
save_json_dict(ERROR_LIST_PATH, errors)
raise RuntimeError(f"Bad request (400). Check offset (0-10000) and limit (1-500) parameters.")
elif resp.status_code == 429:
# Rate limit hit: exceeding 10 requests per minute
# Read Retry-After header to know how long to wait, then retry once
retry_after = resp.headers.get("Retry-After")
wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
print(f"[WARN] 429 rate limit (10 requests/minute max). Backing off for {wait_s} seconds, then retrying once.")
time.sleep(wait_s)
resp2 = get_session_results(test_uuid, offset=0, limit=10)
last_call_time = time.time()
if resp2.status_code == 200:
session_data = resp2.json()
sessions = session_data.get("sessions", [])
append_to_object_list_file(UTZ_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
utz_known.add(test_uuid)
if sessions:
session_id = sessions[0].get("sessionId")
if session_id and test_uuid not in classifications:
print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid}")
classification = classify_test(test_uuid, session_id)
classifications[test_uuid] = classification
update_test_classification(test_uuid, classification)
if classification == "interaction test":
stats["interaction_test"] += 1
elif classification == "think-out-loud test":
stats["think_out_loud_test"] += 1
elif classification == "survey":
stats["survey"] += 1
else:
print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid} (already classified/no session)")
else:
print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid} (no sessions)")
elif resp2.status_code == 404:
append_to_object_list_file(CLASSIC_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
classic_known.add(test_uuid)
stats["classicut"] += 1
print(f"[{idx}/{len(to_process)}] ClassicUT 404 (after retry): {test_uuid}")
else:
errors[test_uuid] = {
"status": resp2.status_code,
"body": safe_json(resp2),
"type": "rate_limited_then_failed",
}
save_json_dict(ERROR_LIST_PATH, errors)
print(f"[WARN] Still failing after 429 retry: {test_uuid} ({resp2.status_code})")
else:
# Unexpected HTTP status (5xx server error, etc.)
# Log the error but continue processing other tests
errors[test_uuid] = {
"status": resp.status_code,
"body": safe_json(resp),
"type": "unclassified_error",
}
save_json_dict(ERROR_LIST_PATH, errors)
print(f"[WARN] Unexpected status {resp.status_code} for {test_uuid}; recorded in errors.json")
# ===== STEPS 4 & 5: Persist Results =====
# Save final state and print summary
# Classifications are saved incrementally during Step 3 (embedded in utz_tests.json)
save_json_dict(ERROR_LIST_PATH, errors)
# Print summary report
print("\n" + "="*60)
print("Done. Processing complete.")
print("="*60)
utz_total = len(load_object_list(UTZ_LIST_PATH))
classic_total = len(load_object_list(CLASSIC_LIST_PATH))
print(f"UTZ total: {utz_total}")
print(f"ClassicUT total: {classic_total}")
print(f"\nClassifications:")
print(f" Interaction tests: {stats['interaction_test']}")
print(f" Think-out-loud tests: {stats['think_out_loud_test']}")
print(f" Surveys: {stats['survey']}")
print(f" ClassicUT tests: {stats['classicut']}")
print(f"\n✓ Results saved to:")
print(f" - {UTZ_LIST_PATH} (UTZ tests with test_type field)")
print(f" - {CLASSIC_LIST_PATH} (ClassicUT tests)")
if any(errors.values()):
print(f" - {ERROR_LIST_PATH} (Errors encountered)")
verify_success_checks(utz_total=utz_total, classic_total=classic_total, errors=errors)
if __name__ == "__main__":
# Entry point: parse arguments and run the main workflow
parser = argparse.ArgumentParser(description="Identify and classify UTZ vs ClassicUT tests")
parser.add_argument(
"--workspace-uuid",
help="Process only this specific workspace UUID (skips interactive selection).",
)
args = parser.parse_args()
# Run the identification and classification workflow
run(workspace_uuid=args.workspace_uuid)Storage file
There's no need to copy this file manually; it's generated by the script. The schema is a list of objects, each representing a test with its classification and workspace context.
state/utz_tests.json
{
"test_uuid": "TEST_ID",
"workspace_uuid": "WORKSPACE_ID",
"workspace_name": "WORKSPACE_NAME",
"test_type": "TEST_TYPE"
}Steps
Step 1 — Discover all tests in workspaces
Endpoints
GET /usertesting/api/v1/workspacesGET /usertesting/api/v1/workspaces/WORKSPACE_UUID
Why this matters
- Large organizations have multiple workspaces with hundreds or thousands of tests
- Tests must be retrieved with cursor-based pagination (50 tests per page)
- Each test UUID is needed for downstream identification and classification
Example (cURL)
List all available workspaces:
curl --location 'https://api.use2.usertesting.com/usertesting/api/v1/workspaces' \
--header 'Authorization: Bearer ACCESS_TOKEN' \
--header 'Content-Type: application/json'Get tests in a workspace (with pagination):
curl --location 'https://api.use2.usertesting.com/usertesting/api/v1/workspaces/WORKSPACE_UUID?after=CURSOR' \
--header 'Authorization: Bearer ACCESS_TOKEN' \
--header 'Content-Type: application/json'Best Practice: Use cursor-based pagination with the
afterquery parameter. Each item instudiesincludes acursorvalue. Use the last study'scursoras theaftervalue for the next request.Continue fetching pages until the response returns an empty
studiesarray.
Sample 200 response: tests in workspace
Each workspace contains a studies array with the tests in the workspace. Each test has a uuid (test ID), title, and orderedAt timestamp, among other metadata.
{
"studies": [
{
"cursor": "Mw",
"uuid": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
"title": "Test 01",
"orderedAt": "2026-02-13T12:07:33Z",
"orderedBy": {
"name": "Blair Fraser",
"email": "[email protected]"
}
},
{
"cursor": "NA",
"uuid": "d244347f-359d-46c9-9d23-045bb0f97600",
"title": "Test 02",
"orderedAt": "2026-02-10T17:13:04Z",
"orderedBy": {
"name": "Blair Fraser",
"email": "[email protected]"
}
},
{
"cursor": "NQ",
"uuid": "7fe65409-cb96-44aa-9004-3d5f27b13465",
"title": "Test 03",
"orderedAt": "2026-02-10T15:02:52Z",
"orderedBy": {
"name": "Rosa Smith",
"email": "[email protected]"
}
}
]
}Key fields to store
uuid(test UUID / test ID)title(test title)workspace_uuid(parent workspace)workspace_name(parent workspace name)
Pagination logic
- Start with no cursor (initial request)
- Read the
cursorvalue from the last item in thestudiesarray - Fetch next page using the last study's
cursorin the?after=query parameter - Stop when
studiesis empty or the last study has nocursor - Save test UUIDs immediately to support resumable processing
Code example
def get_all_workspace_studies(workspace_uuid: str) -> List[Dict[str, Any]]:
"""
Fetch all studies (tests) within a workspace using cursor-based pagination.
"""
all_studies: List[Dict[str, Any]] = []
after: Optional[str] = None
while True:
# Build query parameters with cursor if available
params = {}
if after:
params["after"] = after
# Make request
resp = requests.get(
f"{WORKSPACE_DETAIL_URL.format(workspace_uuid=workspace_uuid)}",
headers=get_headers(),
params=params,
timeout=30
)
resp.raise_for_status()
data = resp.json()
# Collect studies from this page
studies = data.get("studies", [])
all_studies.extend(studies)
# Check for next page using the last study's cursor
if not studies:
break
last_cursor = studies[-1].get("cursor")
if not last_cursor:
break
after = last_cursor
return all_studiesStep 2 — Identify UTZ vs ClassicUT tests
Endpoint
GET /api/v2/sessionResultsWhy this matters
- The Results API only works with newer test types (UTZ)
- Legacy tests (ClassicUT) return 404 and are incompatible
- You must know which tests are compatible before attempting classification or data extraction
Example (cURL)
Get all sessions for a test
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults?testId=TEST_UUID&limit=10' \
--header 'Authorization: Bearer ACCESS_TOKEN' \
--header 'Content-Type: application/json'Key Indicator: The HTTP response status code reveals test compatibility:
- 200 OK - UTZ test (compatible with Results API)
- 404 Not Found - ClassicUT test (legacy platform, not compatible)
- 429 Too Many Requests - Rate limit exceeded; wait 6+ seconds before retrying
Do not treat other error codes as "not found." Handle them explicitly.
Response structure for UTZ tests (200 OK)
{
"sessions": [
{
"sessionId": "5bface5c-2878-428a-938f-26e977a1d976",
"testId": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
"status": "completed",
"startTime": "2025-12-01T10:00:00Z",
"finishTime": "2025-12-01T10:15:00Z"
},
{
"sessionId": "5285a0ca-3d23-469f-82a7-2e83d9cf16a5",
"testId": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
"status": "completed",
"startTime": "2025-12-01T11:00:00Z",
"finishTime": "2025-12-01T11:15:00Z"
}
],
"meta": {
"pagination": {
"limit": 10,
"offset": 0,
"totalCount": 2
}
}
}Response for ClassicUT tests (404 Not Found)
{
"code": "NOT_FOUND",
"message": "Resource not found"
}Data to extract and carry forward
- If 404, classify as ClassicUT (not compatible)
- If 200 response received, mark test as "UTZ" (compatible)
- From 200 response: Extract the first
sessionId(you'll need this for classification) - If no sessions are returned, log this fact and skip classification ("no sessions")
- Save immediately: Persist this information to avoid re-identifying the same test
Rate limiting strategy
- The
/sessionResultsendpoint has a limit of 10 requests per minute - Enforce a minimum 6-second interval between requests
- For 100 tests, expect ~10 minutes for identification alone
- Use a global timestamp tracker to prevent burst requests
Code example
last_call_time = 0.0
REQUEST_INTERVAL_SECONDS = 6.0
def get_session_results(test_uuid: str) -> requests.Response:
"""
Check if a test is UTZ or ClassicUT by querying /sessionResults.
Returns:
Response object with status 200 (UTZ), 404 (ClassicUT), or 429 (rate limit)
"""
global last_call_time
# Enforce rate limiting
now = time.time()
elapsed = now - last_call_time
if elapsed < REQUEST_INTERVAL_SECONDS:
time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
last_call_time = time.time()
params = {"testId": test_uuid, "limit": 10}
return requests.get(
"https://api.use2.usertesting.com/api/v2/sessionResults",
headers=get_headers(),
params=params,
timeout=30
)
# Usage
response = get_session_results("test-uuid")
if response.status_code == 200:
data = response.json()
sessions = data.get("sessions", [])
if sessions:
session_id = sessions[0]["sessionId"]
# Save as UTZ test with sessionId
# Proceed to classification in Step 3
else:
print(f"UTZ test has no sessions yet: test-uuid (skipping classification)")
# Save as UTZ test but skip classification for now
elif response.status_code == 404:
# Save as ClassicUT test (not compatible)
pass
elif response.status_code == 429:
# Rate limit exceeded; implement backoff
time.sleep(10)Note: The guard
data.get("sessions", [])is critical. UTZ tests may return 200 but have an empty sessions array if the test was just created and has no participant completions yet. The reference script skips classification for these tests and rechecks them in later runs once sessions become available. This prevents classification errors and enables discovery of newly-completed test sessions.
Note: In the full reference script, Step 2 rate limiting is enforced in the main processing loop, and additional retry/backoff safeguards are included.
Step 3 — Classify tests by type
Methodology
For each UTZ test identified in Step 2, check which Results API endpoints return 200 (success). Based on endpoint availability, classify the test as one of three types:
- Interaction test — Returns video and/or QXscore data
- Think-out-loud test — Returns transcript data
- Survey — Returns session results only
The classification is a three-stage process, with each stage checking specific endpoints:
3a. Check for interaction test indicators
Endpoints
GET /api/v2/sessionResults/SESSION_ID/videoDownloadUrlGET /api/v2/testResults/TEST_ID/qxScores
Decision logic
If either endpoint returns 200, classify as "interaction test".
Interaction tests involve user interactions with a prototype or website and produce:
- Video recording URLs
- Quality of Experience scores (QXscores) measuring user sentiment and behavior
Example (cURL)
Get video download URL:
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/videoDownloadUrl' \
--header 'Authorization: Bearer ACCESS_TOKEN'Get QXscores:
curl --location 'https://api.use2.usertesting.com/api/v2/testResults/TEST_ID/qxScores' \
--header 'Authorization: Bearer ACCESS_TOKEN'Optimization: Check the video URL endpoint first (it's faster to fail). Only check QXscores if video returns non-200. This reduces API calls for tests without video.
Code example
def check_video_url(session_id: str) -> bool:
"""Check if a session has a video recording available."""
endpoint = f"https://api.use2.usertesting.com/api/v2/sessionResults/{session_id}/videoDownloadUrl"
try:
resp = requests.get(endpoint, headers=get_headers(), timeout=10)
return resp.status_code == 200
except Exception:
return False
def check_qx_scores(test_id: str) -> bool:
"""Check if a test has QX (Quality of Experience) scores available."""
endpoint = f"https://api.use2.usertesting.com/api/v2/testResults/{test_id}/qxScores"
try:
resp = requests.get(endpoint, headers=get_headers(), timeout=10)
return resp.status_code == 200
except Exception:
return False
# Classification logic (Step 3a)
if check_video_url(session_id) or check_qx_scores(test_id):
test_type = "interaction test"
proceed_to_next_step = False
else:
proceed_to_next_step = TrueNote: See the full reference script for timeout/retry handling.
3b. Check for think-out-loud test indicator
Endpoint
GET /api/v2/sessionResults/SESSION_ID/transcriptDecision logic
If the video and QXscore endpoints did not return 200 in Step 3a, and this endpoint returns 200, classify as "think-out-loud test".
Think-out-loud tests capture participant verbal feedback as they complete tasks. The transcript is provided in WebVTT (Web Video Text Tracks) format.
Example (cURL)
Get transcript in WebVTT format:
curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/transcript' \
--header 'Authorization: Bearer ACCESS_TOKEN'Code example
def check_transcript(session_id: str) -> bool:
"""Check if a session has a transcript available."""
endpoint = f"https://api.use2.usertesting.com/api/v2/sessionResults/{session_id}/transcript"
try:
resp = requests.get(endpoint, headers=get_headers(), timeout=10)
return resp.status_code == 200
except Exception:
return False
# Classification logic (Step 3b)
if check_transcript(session_id):
test_type = "think-out-loud test"
else:
test_type = "survey"Note: See the full reference script for timeout/retry handling.
3c. Default classification
If none of the interaction or think-out-loud indicators returned 200, classify as "survey".
Surveys collect structured responses to questions without video, transcript, or QXscore data. They are fully supported by the Results API but do not produce media or interaction metrics.
Step 4 — Persist results incrementally
Storage strategy
Save classification results immediately after each test is processed. This makes your workflow interrupt-safe: if the script stops, you can resume without reprocessing completed tests.
Files to create
| File | Purpose | Example Content |
|---|---|---|
state/utz_tests.json | UTZ test metadata with classifications | [{"test_uuid": "...", "test_type": "interaction test", "workspace_uuid": "...", "workspace_name": "..."}] |
state/classicut_tests.json | Legacy tests (not compatible) | [{"test_uuid": "...", "workspace_uuid": "...", "workspace_name": "..."}] |
state/errors.json | API errors and failures for review | {"test_uuid": {"status_code": 429, "error": "Rate limit exceeded"}} |
Code example
def update_test_classification(test_uuid: str, test_type: str) -> None:
"""
Update a specific test's classification in utz_tests.json.
This is called immediately after classifying each test (interrupt-safe).
"""
# Load existing records
utz_records = load_object_list("state/utz_tests.json")
# Find and update the test record
for record in utz_records:
if record.get("test_uuid") == test_uuid:
record["test_type"] = test_type
break
# Save back to file
save_object_list("state/utz_tests.json", utz_records)
def save_object_list(path: str, items: List[Dict[str, Any]]) -> None:
"""Save JSON array, deduplicating by test_uuid."""
# Deduplicate by test_uuid (keep first occurrence)
seen = set()
deduped = []
for item in items:
test_id = item.get("test_uuid")
if test_id and test_id not in seen:
deduped.append(item)
seen.add(test_id)
with open(path, "w", encoding="utf-8") as f:
json.dump(deduped, f, indent=2)Note: See the full reference script for timeout/retry handling.
Step 5 — Verify success
After the run completes, verify the workflow output using the summary report and the verification checks printed by the script.
Expected summary signals
Done. Processing complete.appears- Totals are printed for
UTZ totalandClassicUT total - Classification counts are printed for interaction tests, think-out-loud tests, and surveys
- Output file paths are printed under
✓ Results saved to:
Expected verification checks
The script runs a built-in validation block and prints PASS/INFO/WARN status lines:
Verification checks:
[PASS] Output files exist
[PASS] Output files are valid JSON
[PASS] UTZ records include expected fields and valid test_type values
[INFO] Classification coverage: classified=..., unclassified=..., utz_total=...
[PASS] No processing errors recordedHow to interpret results
- If
unclassifiedis greater than 0, this can be expected (for example, UTZ tests with no sessions yet) - If
errors.jsoncontains records, treat the run as partial success and review those errors before downstream automation - If any verification line shows
FAIL, fix the data issue and rerun before using outputs in production workflows
Tip: Keep this verification step in every scheduled run so data quality checks happen automatically before dashboards, ETL, or AI pipelines consume the results.
Rate limits and batching
Respect request throttling
The Results API endpoints enforce strict rate limits:
| Endpoint | Limit | Strategy |
|---|---|---|
/api/v2/sessionResults | 10 requests per minute | Enforce 6-second interval between requests |
/api/v2/sessionResults/{id}/videoDownloadUrl | 10 requests per minute | Share the same 6-second interval |
/api/v2/sessionResults/{id}/transcript | 10 requests per minute | Share the same 6-second interval |
/api/v2/testResults/{id}/qxScores | 10 requests per minute | Share the same 6-second interval |
Implementation
last_call_time = 0.0
REQUEST_INTERVAL_SECONDS = 6.0
def enforce_rate_limit():
"""Enforce minimum interval between API calls."""
global last_call_time
now = time.time()
elapsed = now - last_call_time
if elapsed < REQUEST_INTERVAL_SECONDS:
time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
last_call_time = time.time()
def make_api_request(url: str, params: Dict = None) -> requests.Response:
"""Make rate-limited API request."""
enforce_rate_limit()
return requests.get(url, headers=get_headers(), params=params, timeout=30)Exponential backoff for 429 errors
If you receive a 429 (Too Many Requests) response, implement exponential backoff:
import time
def make_request_with_backoff(func, max_retries: int = 5):
"""Retry request with exponential backoff on 429 errors."""
for attempt in range(max_retries):
resp = func()
if resp.status_code != 429:
return resp
wait_time = 2 ** attempt # 1, 2, 4, 8, 16 seconds
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
return respTimeout management
- Estimate runtime: For 100 tests with classification, expect 30-60 minutes
- Use persistent sessions: Run with
nohuporscreento survive terminal disconnections - Monitor progress: Print status updates (e.g., "Processing test 45/100")
- Plan for scaling: 1000+ tests may take several hours
Resumable processing with checkpoints
Why checkpoints matter
Long-running workflows can be interrupted by network issues, token expiration, or system restarts. Saving state incrementally ensures you don't lose progress.
Another reason for checkpoints is that you may want to run the workflow multiple times as your team adds new tests to the workspace or tests that didn't have sessions previously can now be classified.
Checkpoint strategy
During Step 1 (discovery)
- Save test UUIDs immediately after fetching each page
- This prevents re-fetching pages unnecessarily
During Step 2 (identification)
- Append tests to
utz_tests.jsonorclassicut_tests.jsonimmediately - Track which tests have been identified to skip them on resume
During Step 3 (classification)
- Update the
test_typefield immediately after classifying each test - Check if
test_typeis already populated before re-classifying
Resume logic
When you run the workflow multiple times, process only new tests and unclassified UTZ tests from previous interrupted runs:
# Load existing records from disk
utz_records = load_object_list("state/utz_tests.json")
classic_records = load_object_list("state/classicut_tests.json")
classifications = load_classifications()
# Build a set of already-processed tests (both UTZ and ClassicUT)
utz_known = {r.get("test_uuid") for r in utz_records if isinstance(r, dict) and r.get("test_uuid")}
classic_known = {r.get("test_uuid") for r in classic_records if isinstance(r, dict) and r.get("test_uuid")}
already_processed = utz_known.union(classic_known)
# Discover current tests in workspaces
all_current_tests = get_all_workspace_studies(workspace_uuid)
current_test_uuids = {t.get("uuid") for t in all_current_tests if isinstance(t, dict) and t.get("uuid")}
# Calculate processing sets
# 1. New tests not yet identified
new_to_process = sorted([u for u in current_test_uuids if u not in already_processed])
# 2. Previously identified UTZ tests still without classification (resumable gap)
utz_unclassified_to_recheck = sorted([
u for u in current_test_uuids
if u in utz_known and u not in classifications
])
# Combine both sets for processing
to_process = sorted(set(new_to_process).union(utz_unclassified_to_recheck))
print(f"New tests to identify: {len(new_to_process)}")
print(f"Previously discovered UTZ tests without classification: {len(utz_unclassified_to_recheck)}")
print(f"Total to process: {len(to_process)}")
# Process only these tests in the main loop
for test_uuid in to_process:
# Identify (Step 2) and classify (Step 3) this test
response = get_session_results(test_uuid)
if response.status_code == 200:
# UTZ test
sessions = response.json().get("sessions", [])
if sessions:
session_id = sessions[0]["sessionId"]
if test_uuid not in classifications:
test_type = classify_test(test_uuid, session_id)
update_test_classification(test_uuid, test_type)
else:
# UTZ tests without sessions are skipped until sessions appear
pass
elif response.status_code == 404:
# ClassicUT test
passNote: The reference script skips UTZ tests that return 200 but have no sessions, and rechecks them in later runs once sessions are available.
Why this approach
- Efficiency: Avoids redundant API calls for tests already identified as UTZ or ClassicUT
- Resilience: Catches UTZ tests discovered but interrupted before classification (in
utz_unclassified_to_recheck) - Discovery: Automatically picks up tests added to the workspace since last run (in
new_to_process) and tests that were previously unclassifiable due to no sessions but now have sessions - Incremental: Each run processes only deltas, not the entire test library
Token refresh for long-running jobs
Problem
OAuth tokens expire after 3600 seconds (~1 hour). For workflows processing hundreds of tests over 30+ minutes, tokens will expire mid-execution.
Solution
The script should check token expiration and refresh proactively:
import time
import subprocess
import os
from dotenv import load_dotenv
REFRESH_BEFORE_EXPIRY_SECONDS = 5 * 60 # Refresh 5 minutes before expiry
def get_token_expiry_time() -> Optional[float]:
"""Get token expiry epoch time from environment."""
expires_in_str = os.environ.get("EXPIRES_IN")
if expires_in_str:
try:
return float(expires_in_str)
except ValueError:
return None
return None
def should_refresh_token() -> bool:
"""Check if token should be refreshed."""
expiry = get_token_expiry_time()
if not expiry:
return True
time_until_expiry = expiry - time.time()
return time_until_expiry < REFRESH_BEFORE_EXPIRY_SECONDS
def refresh_access_token() -> bool:
"""Refresh the access token by running generate-token.py."""
try:
subprocess.run(
["python", "generate-token.py"],
check=True,
capture_output=True
)
load_dotenv(override=True) # Reload .env
return True
except subprocess.CalledProcessError:
return False
def get_headers() -> Dict[str, str]:
"""Build HTTP headers with fresh token."""
if should_refresh_token():
if not refresh_access_token():
print("Warning: Token refresh failed")
token = os.environ.get("ACCESS_TOKEN")
return {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}Common errors and troubleshooting
| Error | Status | Meaning | Solution |
|---|---|---|---|
| Invalid pagination parameters | 400 | Offset or limit parameters are out of range | Verify offset is 0-10000 and limit is 1-500; see OAS for /api/v2/sessionResults |
| Missing or invalid access token | 401 | Access token is missing, invalid, or expired | Run generate-token.py to refresh; verify .env file contains valid ACCESS_TOKEN |
| Resource not found | 404 | Test UUID doesn't exist, is a ClassicUT test, or resource (video/transcript) is unavailable | Verify test/resource UUID; 404 on /sessionResults indicates ClassicUT test (expected for legacy tests) |
| Rate limit exceeded | 429 | Made too many requests (10 requests per minute limit) | Reduce request frequency; use x-ratelimit-remaining header to monitor; read Retry-After header and wait before retrying |
What you can build next
Once you have a complete classification and inventory of all your tests, you can:
- Power dashboards — Visualize test type distribution, coverage, and age
- Automate data pipelines — Feed classified test UUIDs into scheduled Result API extraction jobs (see Create an Automated Pipeline)
- Drive AI workflows — Route transcripts from think-out-loud tests (beta) to LLM-based insight generation
- Enable self-service — Build an internal API that returns test type and capabilities for any given test UUID
- Integrate with metadata systems — Sync findings to your data warehouse or metadata catalog
Summary
You now have a scalable pattern to:
- Discover all tests across multiple workspaces using cursor-based pagination
- Identify compatibility by checking Results API endpoint responses
- Classify tests based on data availability (survey, interaction, think-out-loud)
- Persist state incrementally for resumable, long-running workflows
- Verify results with built-in checks for file integrity, schema validity, and classification coverage
- Manage tokens and rate limits automatically for production stability
This classification workflow forms the foundation for advanced automation, ensuring your data pipelines only process compatible tests and your teams understand what insights are available from each test in your library.
Updated 1 day ago
