Classify Tests by Type

Build a Bulk Test Classification Workflow

Introduction

This tutorial shows how to build an automated classification workflow that identifies and categorizes all compatible tests in your UserTesting workspace by type: survey, interaction test, or think-out-loud test (beta).

The goal is to enable your research and data teams to programmatically discover which tests are compatible with the Results API and understand what data is available for each test. This is especially valuable when working with large test libraries where manual classification is impractical.

Since the UserTesting API doesn't currently have a straightforward way of detecting test type, the script we propose infers the test type by probing specific endpoints. For each test, it checks whether certain Results API endpoints return success (HTTP 200) or fail (HTTP 404).

What you'll build

A 5-step resumable Python workflow that:

  1. Discovers all tests across your workspaces
  2. Identifies which tests are compatible with the Results API (UTZ) versus legacy (ClassicUT)
  3. Classifies each test by type using endpoint availability checks
  4. Persists results to JSON files for downstream use with resumable checkpoints
  5. Verifies success with built-in validation checks and data quality assertions

The workflow automatically refreshes tokens and enforces rate limits throughout, and discovers new tests added since the last run by comparing against saved results.

Target audience

  • Research operations teams managing large test libraries
  • Data engineers building Results API data pipelines
  • Analytics engineers preparing test metadata for dashboards
  • ML / AI teams sourcing data from UserTesting research

Prerequisites

  • OAuth credentials (CLIENT_ID, CLIENT_SECRET) to generate and refresh access tokens (ACCESS_TOKEN). Go to Authorization for details.
  • UserTesting workspace access as a team member
  • Python 3.7+ with requests and python-dotenv packages
  • A .env file with your OAuth credentials and initial tokens
  • A directory structure for persisting state (e.g., state/ folder)

High-level architecture

The workflow follows a multi-step process where data flows through discovery, identification, and classification stages:

  1. OAuth Token Manager ensures access tokens stay fresh for long-running jobs
  2. Workspace Discovery retrieves all available workspaces and tests
  3. UTZ Identifier checks which tests are compatible with Results API
  4. Type Classifier determines test type based on endpoint availability
  5. Persistence Layer saves results incrementally for resumable processing

graph LR;
    OAuth["OAuth Token<br/>Manager"] --> Discover["Workspace<br/>Discovery"];
    Discover --> Identify["UTZ vs<br/>ClassicUT"];
    Identify --> Classify["Type<br/>Classifier"];
    Classify --> Persist["Persist to<br/>JSON Files"];
    OAuth -.->|refresh| Discover;
    OAuth -.->|refresh| Identify;
    OAuth -.->|refresh| Classify;

Example files

We provide some example files that you can use as guides to build your own implementations or use them out-of-the-box to start classifying your workspace tests.

📘

Quick start:

  • Create copies of the example files below in your local environment.
  • Configure your .env with OAuth credentials.
  • Run identify-and-classify-tests.py to execute the workflow and generate classified test lists. For example:
python identify-and-classify-tests.py

Environment variables

Create a .env file to store your OAuth credentials and tokens here. The generate-token.py script will update the ACCESS_TOKEN and EXPIRES_IN fields automatically.

.env
CLIENT_ID=YOUR_CLIENT_ID
CLIENT_SECRET=YOUR_CLIENT_SECRET
ACCESS_TOKEN=YOUR_INITIAL_ACCESS_TOKEN
EXPIRES_IN=TOKEN_EXPIRY_TIMESTAMP

Generate and refresh access tokens

This script generates a new OAuth access token using your client credentials and saves it to your .env file. It also calculates the expiry time and saves it so that the main workflow can refresh tokens proactively.

generate-token.py
import argparse
import requests
import json
import os
import sys
import time
from dotenv import load_dotenv
from rich import print_json


load_dotenv()  # take environment variables from .env

parser = argparse.ArgumentParser(description="Generate or refresh access token")
parser.add_argument(
    "--force",
    action="store_true",
    help="Force refresh even if the current token has not expired",
)
args = parser.parse_args()


client_id = os.getenv("CLIENT_ID")
client_secret = os.getenv("CLIENT_SECRET")
current_access_token = os.getenv("ACCESS_TOKEN")
current_expires_in = os.getenv("EXPIRES_IN")


URL = "https://auth.usertesting.com/oauth2/aus1p3vtd8vtm4Bxv0h8/v1/token"

PAYLOAD = (
    f'grant_type=client_credentials'
    f'&client_id={client_id}'
    f'&client_secret={client_secret}'
    f'&scope=studies%3Aread'
)
HEADERS = {
    'Content-Type': 'application/x-www-form-urlencoded',
}


# Check if current access token is expired
def is_token_expired():
    if not current_access_token or not current_expires_in:
        return True
    try:
        expires_in = int(current_expires_in)
        # Assume EXPIRES_IN is the epoch time when the token expires
        return time.time() > expires_in
    except Exception:
        return True

if not args.force and not is_token_expired():
    print("Current access token is still valid. No update needed.")
    sys.exit(0)

response = requests.post(URL, headers=HEADERS, data=PAYLOAD)

if response.headers.get('Content-Type', '').startswith('application/json'):
    data = response.json()
    print_json(json.dumps(data))
    access_token = data.get("access_token")
    expires_in = data.get("expires_in")
    if access_token and expires_in:
        # Calculate the absolute expiry time (epoch seconds)
        expires_at = int(time.time()) + int(expires_in)
        env_path = os.path.join(os.path.dirname(__file__), '.env')
        with open(env_path, 'r') as file:
            lines = file.readlines()
        with open(env_path, 'w') as file:
            for line in lines:
                if line.startswith('ACCESS_TOKEN='):
                    file.write(f'ACCESS_TOKEN={access_token}\n')
                elif line.startswith('EXPIRES_IN='):
                    file.write(f'EXPIRES_IN={expires_at}\n')
                else:
                    file.write(line)
            # Add EXPIRES_IN if not present
            if not any(line.startswith('EXPIRES_IN=') for line in lines):
                file.write(f'EXPIRES_IN={expires_at}\n')
        print('ACCESS_TOKEN and EXPIRES_IN updated in .env file.')
    elif access_token:
        print('No expires_in found in response. ACCESS_TOKEN not updated.')
    else:
        print('No access_token found in response.')
else:
    print(response.text)

Full reference script

This is a comprehensive script that implements the entire workflow: discovering workspaces and tests, identifying UTZ vs ClassicUT, classifying test types, handling token refresh, enforcing rate limits, and persisting results. It includes robust error handling and is designed for long-running jobs.

identify-and-classify-tests.py
# ----------------------------
# Script: identify-and-classify-tests.py
# Purpose: Single unified script that:
# 1. Identifies which test UUIDs are "UTZ" (have sessionResults) vs "ClassicUT" (404 on sessionResults)
# 2. For UTZ tests, immediately classifies them as "interaction test", "think-out-loud test", or "survey"
# 3. Persists all results to disk for downstream use
# ----------------------------

import argparse
import json
import os
import subprocess
import time
from typing import Any, Dict, List, Optional, Set, Tuple
import requests
from dotenv import load_dotenv

# Run generate-token.py to ensure ACCESS_TOKEN is fresh
subprocess.run(["python", os.path.join(os.path.dirname(__file__), "generate-token.py")], check=True)

load_dotenv()


# ----------------------------
# Configuration
# ----------------------------
# OAuth token from environment (loaded via .env file)
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN", "ACCESS_TOKEN")

# API Endpoints
WORKSPACES_URL = "https://api.use2.usertesting.com/usertesting/api/v1/workspaces"
WORKSPACE_DETAIL_URL = "https://api.use2.usertesting.com/usertesting/api/v1/workspaces/{workspace_uuid}"
SESSION_RESULTS_URL = "https://api.use2.usertesting.com/api/v2/sessionResults/"
BASE_URL = "https://api.use2.usertesting.com/api/v2"

# Rate limiting configuration
# The /sessionResults endpoint allows 10 requests per minute
# To stay within that limit, we enforce a 6-second interval between requests
SESSION_RESULTS_MIN_INTERVAL_SECONDS = 6.0
REQUEST_INTERVAL_SECONDS = 6.0

# Persistence paths
# Results are saved to these JSON files for resumable, incremental processing
STATE_DIR = "state"
UTZ_LIST_PATH = os.path.join(STATE_DIR, "utz_tests.json")  # UTZ tests (API-compatible) with test_type field
CLASSIC_LIST_PATH = os.path.join(STATE_DIR, "classicut_tests.json")  # ClassicUT tests (legacy)
ERROR_LIST_PATH = os.path.join(STATE_DIR, "errors.json")  # Error log


# Token management: OAuth tokens expire after ~3600 seconds (1 hour)
# We refresh tokens proactively if they expire within this threshold
REFRESH_BEFORE_EXPIRY_SECONDS = 5 * 60  # 5 minutes

# Global rate limiting tracker
# Tracks the last API call timestamp to enforce minimum intervals between requests
last_call_time = 0.0

# ----------------------------
# Helpers: file persistence
# ----------------------------
# These functions handle loading and saving JSON files for resumable processing.
# By persisting state incrementally, we can resume after interruptions.

def ensure_dirs() -> None:
    """
    Create the state directory and output directories if they don't exist.
    This must be called before saving any files.
    """
    os.makedirs(STATE_DIR, exist_ok=True)

def load_object_list(path: str) -> List[Dict[str, Any]]:
    """Load a JSON array of objects (e.g., test records with metadata)."""
    if not os.path.exists(path):
        return []
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, list):
        raise ValueError(f"{path} must contain a JSON array")
    return data

def save_object_list(path: str, items: List[Dict[str, Any]]) -> None:
    """Save a JSON array of objects, deduped by test_uuid."""
    # Deduplicate by test_uuid (keep first occurrence)
    seen = set()
    deduped = []
    for item in items:
        if isinstance(item, dict):
            test_uuid = item.get("test_uuid")
            if test_uuid and test_uuid not in seen:
                seen.add(test_uuid)
                deduped.append(item)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(deduped, f, indent=2)

def append_to_object_list_file(
    path: str,
    test_uuid: str,
    workspace_uuid: str,
    workspace_name: str,
) -> None:
    """
    Append a test record to a list file, avoiding duplicates.
    This is called during identification (Step 2) to record whether each test
    is UTZ or ClassicUT. Saves immediately to support resumable processing.
    
    Args:
        path: File path (e.g., utz_tests.json or classicut_tests.json)
        test_uuid: The test ID being classified
        workspace_uuid: The workspace this test belongs to
        workspace_name: The workspace display name
    """
    items = load_object_list(path)
    # Check if test_uuid already exists
    if not any(item.get("test_uuid") == test_uuid for item in items):
        items.append({
            "test_uuid": test_uuid,
            "workspace_uuid": workspace_uuid,
            "workspace_name": workspace_name,
        })
        save_object_list(path, items)

def load_classifications() -> Dict[str, str]:
    """
    Load previously saved test classifications from utz_tests.json.
    This allows resuming classification runs without re-classifying tests.
    
    Returns:
        Dictionary mapping test_uuid -> test_type
        (e.g., "interaction test", "think-out-loud test", "survey")
    """
    classifications = {}
    utz_records = load_object_list(UTZ_LIST_PATH)
    for record in utz_records:
        if isinstance(record, dict):
            test_uuid = record.get("test_uuid")
            test_type = record.get("test_type")
            if test_uuid and test_type:
                classifications[test_uuid] = test_type
    return classifications

def update_test_classification(test_uuid: str, test_type: str) -> None:
    """
    Update a specific test's classification in utz_tests.json.
    This is called in Step 3 after classifying each test, making the script
    interrupt-safe and allowing resumable processing.
    
    Args:
        test_uuid: The test UUID to update
        test_type: The classification type ("interaction test", "think-out-loud test", or "survey")
    """
    utz_records = load_object_list(UTZ_LIST_PATH)
    
    # Find and update the test record
    for record in utz_records:
        if isinstance(record, dict) and record.get("test_uuid") == test_uuid:
            record["test_type"] = test_type
            break
    
    # Save back to file
    save_object_list(UTZ_LIST_PATH, utz_records)

def load_json_dict(path: str) -> Dict[str, Any]:
    if not os.path.exists(path):
        return {}
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, dict):
        raise ValueError(f"{path} must contain a JSON object")
    return data

def save_json_dict(path: str, obj: Dict[str, Any]) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(obj, f, indent=2)

def safe_json(resp: requests.Response) -> Any:
    try:
        return resp.json()
    except Exception:
        return resp.text

def verify_success_checks(
    utz_total: int,
    classic_total: int,
    errors: Dict[str, Any],
) -> None:
    """
    Step 4 verification checks for output artifacts and data quality.

    This validates that expected files exist, are parseable JSON, and contain
    expected structures for downstream use.
    """
    print("\nVerification checks:")

    # 1) Output files exist
    files_exist = all(
        os.path.exists(path)
        for path in [UTZ_LIST_PATH, CLASSIC_LIST_PATH, ERROR_LIST_PATH]
    )
    print(f"  [{'PASS' if files_exist else 'FAIL'}] Output files exist")

    # 2) Output files are valid JSON and expected container types
    json_ok = True
    try:
        utz_records = load_object_list(UTZ_LIST_PATH)
        classic_records = load_object_list(CLASSIC_LIST_PATH)
        loaded_errors = load_json_dict(ERROR_LIST_PATH)
        json_ok = isinstance(utz_records, list) and isinstance(classic_records, list) and isinstance(loaded_errors, dict)
    except Exception as e:
        json_ok = False
        print(f"  [FAIL] JSON parse/type validation error: {e}")
        utz_records = []

    if json_ok:
        print("  [PASS] Output files are valid JSON")

    # 3) UTZ schema and test_type values
    allowed_test_types = {
        "interaction test",
        "think-out-loud test",
        "survey",
    }
    required_fields = {"test_uuid", "workspace_uuid", "workspace_name"}

    invalid_schema_count = 0
    invalid_test_type_count = 0
    classified_count = 0

    for record in utz_records:
        if not isinstance(record, dict):
            invalid_schema_count += 1
            continue

        if not required_fields.issubset(record.keys()):
            invalid_schema_count += 1

        test_type = record.get("test_type")
        if test_type is not None:
            if test_type in allowed_test_types:
                classified_count += 1
            else:
                invalid_test_type_count += 1

    schema_ok = invalid_schema_count == 0 and invalid_test_type_count == 0
    if schema_ok:
        print("  [PASS] UTZ records include expected fields and valid test_type values")
    else:
        print(
            "  [FAIL] UTZ schema/type issues: "
            f"invalid_schema={invalid_schema_count}, invalid_test_type={invalid_test_type_count}"
        )

    # 4) Classification coverage note
    unclassified_utz = max(utz_total - classified_count, 0)
    print(
        "  [INFO] Classification coverage: "
        f"classified={classified_count}, unclassified={unclassified_utz}, utz_total={utz_total}"
    )
    if unclassified_utz > 0:
        print("  [INFO] Some UTZ tests may be unclassified (for example, tests with no sessions yet).")

    # 5) errors.json interpretation
    error_count = len(errors)
    if error_count == 0:
        print("  [PASS] No processing errors recorded")
    else:
        print(f"  [WARN] errors.json contains {error_count} record(s) (partial success; review recommended)")

# ----------------------------
# Token management
# ----------------------------
# These functions handle OAuth token lifecycle:
# 1. Checking if tokens are expired
# 2. Refreshing tokens before expiry
# 3. Building auth headers with fresh tokens
# This is critical for long-running scripts (multiple hours)

def get_token_expiry_time() -> Optional[float]:
    """Get token expiry epoch time from environment."""
    expires_in_str = os.environ.get("EXPIRES_IN")
    if expires_in_str:
        try:
            return float(expires_in_str)
        except (ValueError, TypeError):
            return None
    return None

def should_refresh_token() -> bool:
    """Check if token should be refreshed (expires within REFRESH_BEFORE_EXPIRY_SECONDS)."""
    expiry = get_token_expiry_time()
    if not expiry:
        return True
    time_until_expiry = expiry - time.time()
    return time_until_expiry < REFRESH_BEFORE_EXPIRY_SECONDS

def refresh_access_token() -> bool:
    """Refresh the access token by running generate-token.py."""
    try:
        subprocess.run(
            ["python", os.path.join(os.path.dirname(__file__), "generate-token.py"), "--force"],
            check=True,
            capture_output=True,
        )
        # Reload .env to get new token
        load_dotenv(override=True)
        print("[INFO] Access token refreshed successfully.")
        return True
    except subprocess.CalledProcessError as e:
        print(f"[WARN] Failed to refresh token: {e}")
        return False

def ensure_token_fresh() -> str:
    """Check if token needs refresh; if so, refresh. Return current token."""
    if should_refresh_token():
        print("[INFO] Token expiring soon, refreshing...")
        refresh_access_token()
    return os.environ.get("ACCESS_TOKEN", "ACCESS_TOKEN")

def get_headers() -> Dict[str, str]:
    """
    Build HTTP headers for API requests, ensuring token is fresh.
    This is called before every API request to refresh tokens if needed.
    
    Returns:
        Dictionary with Authorization header (Bearer token) and Content-Type
    """
    token = ensure_token_fresh()
    return {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

# ----------------------------
# API calls
# ----------------------------
# These functions make requests to UserTesting API endpoints.
# Each includes:
# - Rate limiting enforcement (6-second intervals)
# - 429 (Too Many Requests) handling with exponential backoff
# - Timeout protection (10-30 seconds)
# - Error collection instead of hard failures

def get_workspaces() -> List[Dict[str, Any]]:
    """
    Step 1: Fetch all workspaces the user has access to.
    
    Returns:
        List of workspace objects with UUID, name, and other metadata
    """
    resp = requests.get(WORKSPACES_URL, headers=get_headers(), timeout=30)
    resp.raise_for_status()
    data = resp.json()
    if not isinstance(data, list):
        raise ValueError(f"Expected list from workspaces endpoint, got: {type(data)}")
    return data

def get_all_workspace_studies(workspace_uuid: str) -> List[Dict[str, Any]]:
    """
    Step 1 (continued): Fetch all studies (tests) within a workspace using pagination.
    
    The API returns up to 50 studies per request and uses cursor-based pagination.
    This function automatically pages through all results to build a complete list.
    
    Args:
        workspace_uuid: The workspace to query
        
    Returns:
        List of all study/test objects in the workspace
    """
    all_studies: List[Dict[str, Any]] = []
    after: Optional[str] = None
    page_num = 0
    start_time = time.time()
    seen_after_values: Set[str] = set()

    while True:
        url = WORKSPACE_DETAIL_URL.format(workspace_uuid=workspace_uuid)
        params = {"after": after} if after else None

        resp = requests.get(url, headers=get_headers(), params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()

        studies = data.get("studies", [])
        if not isinstance(studies, list):
            raise ValueError(f"Expected 'studies' to be a list, got: {type(studies)}")

        page_num += 1
        all_studies.extend(studies)
        elapsed_s = time.time() - start_time
        print(
            f"Workspace {workspace_uuid}: page {page_num}, "
            f"+{len(studies)} studies (total {len(all_studies)}) "
            f"elapsed {elapsed_s:.1f}s"
        )

        if not studies:
            break

        last_cursor = studies[-1].get("cursor")
        if not last_cursor or not isinstance(last_cursor, str):
            break

        if last_cursor in seen_after_values:
            break

        seen_after_values.add(last_cursor)
        after = last_cursor

    return all_studies

def get_session_results(test_uuid: str, offset: int = 0, limit: int = 10) -> requests.Response:
    """
    Step 2: Check if a test is UTZ or ClassicUT by querying /sessionResults.
    
    HTTP Status Response:
    - 200: UTZ test (has sessions, compatible with Results API)
    - 404: ClassicUT test (legacy platform, incompatible)
    
    Args:
        test_uuid: The test ID to check
        offset: Pagination offset (default 0)
        limit: Max results per page (default 10)
        
    Returns:
        requests.Response object (status 200, 404, 429, etc.)
    """
    params = {"testId": test_uuid, "offset": offset, "limit": limit}
    return requests.get(SESSION_RESULTS_URL, headers=get_headers(), params=params, timeout=30)

def check_video_url(session_id: str) -> bool:
    """
    Step 3a: Check if a session has a video recording available.
    
    Interaction tests produce video and QX score data. If this endpoint
    returns 200, the test is classified as an "interaction test".
    
    This is called during Step 3 classification for each UTZ test with sessions.
    
    Args:
        session_id: The session ID from the test
        
    Returns:
        True if video is available (200 response), False otherwise
    """
    global last_call_time
    
    # Enforce rate limiting
    now = time.time()
    elapsed = now - last_call_time
    if elapsed < REQUEST_INTERVAL_SECONDS:
        time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
    
    endpoint = f"{BASE_URL}/sessionResults/{session_id}/videoDownloadUrl"
    try:
        response = requests.get(endpoint, headers=get_headers(), timeout=10)
        last_call_time = time.time()
        
        if response.status_code == 200:
            return True
        elif response.status_code == 429:
            # Handle rate limiting
            retry_after = response.headers.get("Retry-After")
            wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
            print(f"[WARN] 429 rate limit on video URL. Backing off for {wait_s}s")
            time.sleep(wait_s)
            
            # Retry once
            response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
            last_call_time = time.time()
            return response2.status_code == 200
        else:
            return False
    except Exception as e:
        print(f"[WARN] Error checking video URL for {session_id}: {e}")
        return False

def check_qx_scores(test_id: str) -> bool:
    """
    Step 3a (alternative): Check if a test has QX (Quality of Experience) scores.
    
    Interaction tests produce QX scores. If this endpoint returns 200 and
    check_video_url() returned False, the test is still classified as
    an "interaction test" (because it has QX scores).
    
    Args:
        test_id: The test ID
        
    Returns:
        True if QX scores are available (200 response), False otherwise
    """
    global last_call_time
    
    # Enforce rate limiting
    now = time.time()
    elapsed = now - last_call_time
    if elapsed < REQUEST_INTERVAL_SECONDS:
        time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
    
    endpoint = f"{BASE_URL}/testResults/{test_id}/qxScores"
    try:
        response = requests.get(endpoint, headers=get_headers(), timeout=10)
        last_call_time = time.time()
        
        if response.status_code == 200:
            return True
        elif response.status_code == 429:
            # Handle rate limiting
            retry_after = response.headers.get("Retry-After")
            wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
            print(f"[WARN] 429 rate limit on QX scores. Backing off for {wait_s}s")
            time.sleep(wait_s)
            
            # Retry once
            response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
            last_call_time = time.time()
            return response2.status_code == 200
        else:
            return False
    except Exception as e:
        print(f"[WARN] Error checking QX scores for {test_id}: {e}")
        return False

def check_transcript(session_id: str) -> bool:
    """
    Step 3b: Check if a session has a transcript available.
    
    Think-out-loud tests produce transcripts. If this endpoint returns 200
    (and video/QX checks failed), the test is classified as a "think-out-loud test".
    
    Args:
        session_id: The session ID from the test
        
    Returns:
        True if transcript is available (200 response), False otherwise
    """
    global last_call_time
    
    # Enforce rate limiting
    now = time.time()
    elapsed = now - last_call_time
    if elapsed < REQUEST_INTERVAL_SECONDS:
        time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
    
    endpoint = f"{BASE_URL}/sessionResults/{session_id}/transcript"
    try:
        response = requests.get(endpoint, headers=get_headers(), timeout=10)
        last_call_time = time.time()
        
        if response.status_code == 200:
            return True
        elif response.status_code == 429:
            # Handle rate limiting
            retry_after = response.headers.get("Retry-After")
            wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
            print(f"[WARN] 429 rate limit on transcript. Backing off for {wait_s}s")
            time.sleep(wait_s)
            
            # Retry once
            response2 = requests.get(endpoint, headers=get_headers(), timeout=10)
            last_call_time = time.time()
            return response2.status_code == 200
        else:
            return False
    except Exception as e:
        print(f"[WARN] Error checking transcript for {session_id}: {e}")
        return False

def classify_test(test_id: str, session_id: str) -> str:
    """
    Step 3: Classify a single UTZ test by checking endpoint availability.
    
    This implements the classification methodology:
    - Ends-to-end call check_video_url, check_qx_scores, check_transcript
    - Returns the classification based on which endpoints return 200
    
    Classification logic:
    1. If video URL or QX scores endpoint returns 200 -> "interaction test"
    2. Else if transcript endpoint returns 200 -> "think-out-loud test"
    3. Else -> "survey" (default, no special data available)
    
    Args:
        test_id: The test ID (used for QX scores endpoint)
        session_id: The session ID (used for video and transcript endpoints)
        
    Returns:
        One of: "interaction test", "think-out-loud test", or "survey"
    """
    # Check interaction test (video or QX scores)
    if check_video_url(session_id):
        print(f"  → interaction test (video available)")
        return "interaction test"
    
    if check_qx_scores(test_id):
        print(f"  → interaction test (QX scores available)")
        return "interaction test"
    
    # Check think-out-loud test (transcript)
    if check_transcript(session_id):
        print(f"  → think-out-loud test (transcript available)")
        return "think-out-loud test"
    
    # Default to survey
    print(f"  → survey (no other data available)")
    return "survey"

# ----------------------------
# Workspace selection
# ----------------------------
def _format_workspace_label(workspace: Dict[str, Any]) -> str:
    name = workspace.get("name") or workspace.get("label") or workspace.get("title")
    if name and isinstance(name, str):
        return f"{workspace.get('uuid')} ({name})"
    return str(workspace.get("uuid"))

def _select_workspace_uuids(
    workspaces: List[Dict[str, Any]],
    requested_uuid: Optional[str],
) -> List[str]:
    workspace_uuids = [w.get("uuid") for w in workspaces if isinstance(w, dict) and w.get("uuid")]

    if requested_uuid:
        if requested_uuid in workspace_uuids:
            return [requested_uuid]
        raise ValueError(
            f"Workspace UUID not found: {requested_uuid}. "
            "Run without --workspace-uuid to see available workspaces."
        )

    if not workspace_uuids:
        return []

    print("Available workspaces:")
    for idx, w in enumerate(workspaces, start=1):
        if not isinstance(w, dict) or not w.get("uuid"):
            continue
        print(f"  {idx}) {_format_workspace_label(w)}")

    while True:
        choice = input("Select workspace number (or press Enter for ALL): ").strip()
        if choice == "" or choice == "0":
            return workspace_uuids
        if choice.isdigit():
            idx = int(choice)
            if 1 <= idx <= len(workspaces):
                chosen = workspaces[idx - 1]
                if isinstance(chosen, dict) and chosen.get("uuid"):
                    return [chosen.get("uuid")]
        print("Invalid selection. Try again.")

# ----------------------------
# Main workflow
# ----------------------------
def run(workspace_uuid: Optional[str] = None) -> None:
    """
    Main workflow orchestrator:
    - Step 1: Discover all tests in workspaces
    - Step 2: Identify UTZ vs ClassicUT tests
    - Step 3: Classify UTZ tests by type
    - Step 4: Persist and report results
    
    This function is resumable: it loads existing results, processes new tests,
    and re-checks previously discovered UTZ tests that are still unclassified.
    This makes it safe to interrupt and re-run.
    
    Args:
        workspace_uuid: Optional workspace UUID to process single workspace.
                       If None, user is prompted to select.
    """
    ensure_dirs()

    # Load existing results to support resumable processing
    utz_records = load_object_list(UTZ_LIST_PATH)
    classic_records = load_object_list(CLASSIC_LIST_PATH)
    classifications = load_classifications()
    errors: Dict[str, Any] = load_json_dict(ERROR_LIST_PATH)

    # Build a set of already-processed tests
    # This prevents re-identifying tests we've already seen
    utz_known: Set[str] = set(r.get("test_uuid") for r in utz_records if isinstance(r, dict) and r.get("test_uuid"))
    classic_known: Set[str] = set(r.get("test_uuid") for r in classic_records if isinstance(r, dict) and r.get("test_uuid"))
    already_processed = utz_known.union(classic_known)
    
    # Build workspace UUID -> name lookup
    all_workspaces = get_workspaces()
    workspace_name_map = {
        w.get("uuid"): (w.get("name") or w.get("label") or w.get("title") or "Unknown")
        for w in all_workspaces
        if isinstance(w, dict) and w.get("uuid")
    }

    # ===== STEP 1: Discover All Tests =====
    # Fetch list of workspaces and let user choose which to process
    print(f"Found {len(all_workspaces)} workspaces.")
    workspace_uuids = _select_workspace_uuids(all_workspaces, workspace_uuid)
    print(f"Processing {len(workspace_uuids)} workspace(s).\n")

    # ===== STEP 1 (continued): Fetch study UUIDs from workspaces =====
    # Use cursor-based pagination to get all tests (studies) in each workspace
    study_uuids: Set[str] = set()
    test_uuid_to_workspace_uuid: Dict[str, str] = {}
    for wuuid in workspace_uuids:
        try:
            studies = get_all_workspace_studies(wuuid)
            print(f"Workspace {wuuid}: {len(studies)} studies found")
            for s in studies:
                if not isinstance(s, dict):
                    continue
                suuid = s.get("uuid")
                if suuid and isinstance(suuid, str):
                    study_uuids.add(suuid)
                    test_uuid_to_workspace_uuid[suuid] = wuuid
        except requests.HTTPError as e:
            errors[f"workspace:{wuuid}"] = {
                "error": str(e),
                "type": "workspace_detail_http_error",
            }
            save_json_dict(ERROR_LIST_PATH, errors)
            print(f"[WARN] Failed workspace detail for {wuuid}: {e}")

    print(f"Found {len(study_uuids)} unique study UUIDs across workspaces.\n")

    # ===== STEP 2: Identify UTZ vs ClassicUT Tests =====
    # For each test in the processing set, query /sessionResults to determine
    # current state and classify when possible.
    global last_call_time
    last_call_time = 0.0
    
    # Build processing set:
    # 1) new tests not yet identified as UTZ/ClassicUT
    # 2) previously identified UTZ tests that still have no classification
    new_to_process = sorted([u for u in study_uuids if u not in already_processed])
    utz_unclassified_to_recheck = sorted(
        [u for u in study_uuids if u in utz_known and u not in classifications]
    )
    to_process = sorted(set(new_to_process).union(utz_unclassified_to_recheck))

    print(f"{len(new_to_process)} study UUIDs are new and will be processed.")
    print(
        f"{len(utz_unclassified_to_recheck)} previously discovered UTZ study UUIDs "
        "without classification will be rechecked.\n"
    )

    stats = {
        "interaction_test": 0,
        "think_out_loud_test": 0,
        "survey": 0,
        "classicut": 0,
    }

    for idx, test_uuid in enumerate(to_process, start=1):
        # Enforce rate limiting: 6 second minimum interval between API calls
        now = time.time()
        elapsed = now - last_call_time
        if elapsed < SESSION_RESULTS_MIN_INTERVAL_SECONDS:
            time.sleep(SESSION_RESULTS_MIN_INTERVAL_SECONDS - elapsed)

        test_workspace_uuid = test_uuid_to_workspace_uuid.get(test_uuid, "Unknown")
        ws_name = workspace_name_map.get(test_workspace_uuid, "Unknown")

        # --- STEP 2: IDENTIFICATION ---
        # Query /sessionResults to check if test is UTZ (200) or ClassicUT (404)
        resp = get_session_results(test_uuid, offset=0, limit=10)
        last_call_time = time.time()

        if resp.status_code == 200:
            # ✓ THIS IS A UTZ TEST (API-compatible)
            session_data = resp.json()
            sessions = session_data.get("sessions", [])

            # Record this test as UTZ in persistent storage
            append_to_object_list_file(UTZ_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
            utz_known.add(test_uuid)

            # --- STEP 3: CLASSIFICATION (only for UTZ tests with sessions) ---
            # Classify this test by checking which endpoints return 200
            if sessions:
                session_id = sessions[0].get("sessionId")
                if session_id and test_uuid not in classifications:
                    print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid}")
                    classification = classify_test(test_uuid, session_id)
                    classifications[test_uuid] = classification
                    update_test_classification(test_uuid, classification)

                    # Update statistics
                    if classification == "interaction test":
                        stats["interaction_test"] += 1
                    elif classification == "think-out-loud test":
                        stats["think_out_loud_test"] += 1
                    elif classification == "survey":
                        stats["survey"] += 1
                else:
                    if test_uuid in classifications:
                        print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (already classified)")
                    else:
                        print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (no sessions to classify)")
            else:
                print(f"[{idx}/{len(to_process)}] UTZ 200: {test_uuid} (no sessions)")

        elif resp.status_code == 404:
            # ✗ THIS IS A CLASSICCUT TEST (legacy, not Results API compatible)
            # Record this test as ClassicUT in persistent storage
            append_to_object_list_file(CLASSIC_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
            classic_known.add(test_uuid)
            stats["classicut"] += 1
            print(f"[{idx}/{len(to_process)}] ClassicUT 404: {test_uuid}")

        elif resp.status_code == 401:
            # Authentication error - token is missing or invalid
            # This is a fatal error, stop processing
            errors[test_uuid] = {
                "status": resp.status_code,
                "body": safe_json(resp),
                "type": "auth_error",
            }
            save_json_dict(ERROR_LIST_PATH, errors)
            raise RuntimeError(f"Authentication error (401). Check ACCESS_TOKEN. Run generate-token.py to refresh.")

        elif resp.status_code == 400:
            # Bad request: offset or limit parameters are invalid
            # Offset must be 0-10000, limit must be 1-500
            errors[test_uuid] = {
                "status": resp.status_code,
                "body": safe_json(resp),
                "type": "invalid_pagination_error",
            }
            save_json_dict(ERROR_LIST_PATH, errors)
            raise RuntimeError(f"Bad request (400). Check offset (0-10000) and limit (1-500) parameters.")

        elif resp.status_code == 429:
            # Rate limit hit: exceeding 10 requests per minute
            # Read Retry-After header to know how long to wait, then retry once
            retry_after = resp.headers.get("Retry-After")
            wait_s = float(retry_after) if (retry_after and retry_after.isdigit()) else 60.0
            print(f"[WARN] 429 rate limit (10 requests/minute max). Backing off for {wait_s} seconds, then retrying once.")
            time.sleep(wait_s)

            resp2 = get_session_results(test_uuid, offset=0, limit=10)
            last_call_time = time.time()

            if resp2.status_code == 200:
                session_data = resp2.json()
                sessions = session_data.get("sessions", [])

                append_to_object_list_file(UTZ_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
                utz_known.add(test_uuid)

                if sessions:
                    session_id = sessions[0].get("sessionId")
                    if session_id and test_uuid not in classifications:
                        print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid}")
                        classification = classify_test(test_uuid, session_id)
                        classifications[test_uuid] = classification
                        update_test_classification(test_uuid, classification)

                        if classification == "interaction test":
                            stats["interaction_test"] += 1
                        elif classification == "think-out-loud test":
                            stats["think_out_loud_test"] += 1
                        elif classification == "survey":
                            stats["survey"] += 1
                    else:
                        print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid} (already classified/no session)")
                else:
                    print(f"[{idx}/{len(to_process)}] UTZ 200 (after retry): {test_uuid} (no sessions)")

            elif resp2.status_code == 404:
                append_to_object_list_file(CLASSIC_LIST_PATH, test_uuid, test_workspace_uuid, ws_name)
                classic_known.add(test_uuid)
                stats["classicut"] += 1
                print(f"[{idx}/{len(to_process)}] ClassicUT 404 (after retry): {test_uuid}")

            else:
                errors[test_uuid] = {
                    "status": resp2.status_code,
                    "body": safe_json(resp2),
                    "type": "rate_limited_then_failed",
                }
                save_json_dict(ERROR_LIST_PATH, errors)
                print(f"[WARN] Still failing after 429 retry: {test_uuid} ({resp2.status_code})")

        else:
            # Unexpected HTTP status (5xx server error, etc.)
            # Log the error but continue processing other tests
            errors[test_uuid] = {
                "status": resp.status_code,
                "body": safe_json(resp),
                "type": "unclassified_error",
            }
            save_json_dict(ERROR_LIST_PATH, errors)
            print(f"[WARN] Unexpected status {resp.status_code} for {test_uuid}; recorded in errors.json")

    # ===== STEPS 4 & 5: Persist Results =====
    # Save final state and print summary
    # Classifications are saved incrementally during Step 3 (embedded in utz_tests.json)
    save_json_dict(ERROR_LIST_PATH, errors)

    # Print summary report
    print("\n" + "="*60)
    print("Done. Processing complete.")
    print("="*60)
    utz_total = len(load_object_list(UTZ_LIST_PATH))
    classic_total = len(load_object_list(CLASSIC_LIST_PATH))
    print(f"UTZ total: {utz_total}")
    print(f"ClassicUT total: {classic_total}")
    print(f"\nClassifications:")
    print(f"  Interaction tests: {stats['interaction_test']}")
    print(f"  Think-out-loud tests: {stats['think_out_loud_test']}")
    print(f"  Surveys: {stats['survey']}")
    print(f"  ClassicUT tests: {stats['classicut']}")
    print(f"\n✓ Results saved to:")
    print(f"  - {UTZ_LIST_PATH} (UTZ tests with test_type field)")
    print(f"  - {CLASSIC_LIST_PATH} (ClassicUT tests)")
    if any(errors.values()):
        print(f"  - {ERROR_LIST_PATH} (Errors encountered)")

    verify_success_checks(utz_total=utz_total, classic_total=classic_total, errors=errors)

if __name__ == "__main__":
    # Entry point: parse arguments and run the main workflow
    parser = argparse.ArgumentParser(description="Identify and classify UTZ vs ClassicUT tests")
    parser.add_argument(
        "--workspace-uuid",
        help="Process only this specific workspace UUID (skips interactive selection).",
    )
    args = parser.parse_args()
    
    # Run the identification and classification workflow
    run(workspace_uuid=args.workspace_uuid)

Storage file

There's no need to copy this file manually; it's generated by the script. The schema is a list of objects, each representing a test with its classification and workspace context.

state/utz_tests.json
{
    "test_uuid": "TEST_ID",
    "workspace_uuid": "WORKSPACE_ID",
    "workspace_name": "WORKSPACE_NAME",
    "test_type": "TEST_TYPE"
}

Steps

Step 1 — Discover all tests in workspaces

Endpoints

GET /usertesting/api/v1/workspaces
GET /usertesting/api/v1/workspaces/WORKSPACE_UUID

Why this matters

  • Large organizations have multiple workspaces with hundreds or thousands of tests
  • Tests must be retrieved with cursor-based pagination (50 tests per page)
  • Each test UUID is needed for downstream identification and classification

Example (cURL)

List all available workspaces:

curl --location 'https://api.use2.usertesting.com/usertesting/api/v1/workspaces' \
  --header 'Authorization: Bearer ACCESS_TOKEN' \
  --header 'Content-Type: application/json'

Get tests in a workspace (with pagination):

curl --location 'https://api.use2.usertesting.com/usertesting/api/v1/workspaces/WORKSPACE_UUID?after=CURSOR' \
  --header 'Authorization: Bearer ACCESS_TOKEN' \
  --header 'Content-Type: application/json'
👍

Best Practice: Use cursor-based pagination with the after query parameter. Each item in studies includes a cursor value. Use the last study's cursor as the after value for the next request.

Continue fetching pages until the response returns an empty studies array.


Sample 200 response: tests in workspace

Each workspace contains a studies array with the tests in the workspace. Each test has a uuid (test ID), title, and orderedAt timestamp, among other metadata.

{
    "studies": [
        {
            "cursor": "Mw",
            "uuid": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
            "title": "Test 01",
            "orderedAt": "2026-02-13T12:07:33Z",
            "orderedBy": {
                "name": "Blair Fraser",
                "email": "[email protected]"
            }
        },
        {
            "cursor": "NA",
            "uuid": "d244347f-359d-46c9-9d23-045bb0f97600",
            "title": "Test 02",
            "orderedAt": "2026-02-10T17:13:04Z",
            "orderedBy": {
                "name": "Blair Fraser",
                "email": "[email protected]"
            }
        },
        {
            "cursor": "NQ",
            "uuid": "7fe65409-cb96-44aa-9004-3d5f27b13465",
            "title": "Test 03",
            "orderedAt": "2026-02-10T15:02:52Z",
            "orderedBy": {
                "name": "Rosa Smith",
                "email": "[email protected]"
            }
        }
    ]
}

Key fields to store

  • uuid (test UUID / test ID)
  • title (test title)
  • workspace_uuid (parent workspace)
  • workspace_name (parent workspace name)

Pagination logic

  • Start with no cursor (initial request)
  • Read the cursor value from the last item in the studies array
  • Fetch next page using the last study's cursor in the ?after= query parameter
  • Stop when studies is empty or the last study has no cursor
  • Save test UUIDs immediately to support resumable processing

Code example

def get_all_workspace_studies(workspace_uuid: str) -> List[Dict[str, Any]]:
    """
    Fetch all studies (tests) within a workspace using cursor-based pagination.
    """
    all_studies: List[Dict[str, Any]] = []
    after: Optional[str] = None

    while True:
        # Build query parameters with cursor if available
        params = {}
        if after:
            params["after"] = after

        # Make request
        resp = requests.get(
            f"{WORKSPACE_DETAIL_URL.format(workspace_uuid=workspace_uuid)}",
            headers=get_headers(),
            params=params,
            timeout=30
        )
        resp.raise_for_status()
        data = resp.json()

        # Collect studies from this page
        studies = data.get("studies", [])
        all_studies.extend(studies)

        # Check for next page using the last study's cursor
        if not studies:
            break

        last_cursor = studies[-1].get("cursor")
        if not last_cursor:
            break

        after = last_cursor

    return all_studies

Step 2 — Identify UTZ vs ClassicUT tests

Endpoint

GET /api/v2/sessionResults

Why this matters

  • The Results API only works with newer test types (UTZ)
  • Legacy tests (ClassicUT) return 404 and are incompatible
  • You must know which tests are compatible before attempting classification or data extraction

Example (cURL)

Get all sessions for a test

curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults?testId=TEST_UUID&limit=10' \
  --header 'Authorization: Bearer ACCESS_TOKEN' \
  --header 'Content-Type: application/json'
📘

Key Indicator: The HTTP response status code reveals test compatibility:

  • 200 OK - UTZ test (compatible with Results API)
  • 404 Not Found - ClassicUT test (legacy platform, not compatible)
  • 429 Too Many Requests - Rate limit exceeded; wait 6+ seconds before retrying

Do not treat other error codes as "not found." Handle them explicitly.


Response structure for UTZ tests (200 OK)

{
  "sessions": [
    {
      "sessionId": "5bface5c-2878-428a-938f-26e977a1d976",
      "testId": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
      "status": "completed",
      "startTime": "2025-12-01T10:00:00Z",
      "finishTime": "2025-12-01T10:15:00Z"
    },
    {
      "sessionId": "5285a0ca-3d23-469f-82a7-2e83d9cf16a5",
      "testId": "32348d72-7473-4ce1-b17d-4a4c2202bd61",
      "status": "completed",
      "startTime": "2025-12-01T11:00:00Z",
      "finishTime": "2025-12-01T11:15:00Z"
    }
  ],
  "meta": {
    "pagination": {
      "limit": 10,
      "offset": 0,
      "totalCount": 2
    }
  }
}

Response for ClassicUT tests (404 Not Found)

{
    "code": "NOT_FOUND",
    "message": "Resource not found"
}

Data to extract and carry forward

  • If 404, classify as ClassicUT (not compatible)
  • If 200 response received, mark test as "UTZ" (compatible)
  • From 200 response: Extract the first sessionId (you'll need this for classification)
  • If no sessions are returned, log this fact and skip classification ("no sessions")
  • Save immediately: Persist this information to avoid re-identifying the same test

Rate limiting strategy

  • The /sessionResults endpoint has a limit of 10 requests per minute
  • Enforce a minimum 6-second interval between requests
  • For 100 tests, expect ~10 minutes for identification alone
  • Use a global timestamp tracker to prevent burst requests

Code example

last_call_time = 0.0
REQUEST_INTERVAL_SECONDS = 6.0

def get_session_results(test_uuid: str) -> requests.Response:
    """
    Check if a test is UTZ or ClassicUT by querying /sessionResults.
    
    Returns:
        Response object with status 200 (UTZ), 404 (ClassicUT), or 429 (rate limit)
    """
    global last_call_time
    
    # Enforce rate limiting
    now = time.time()
    elapsed = now - last_call_time
    if elapsed < REQUEST_INTERVAL_SECONDS:
        time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
    
    last_call_time = time.time()
    
    params = {"testId": test_uuid, "limit": 10}
    return requests.get(
        "https://api.use2.usertesting.com/api/v2/sessionResults",
        headers=get_headers(),
        params=params,
        timeout=30
    )

# Usage
response = get_session_results("test-uuid")

if response.status_code == 200:
    data = response.json()
    sessions = data.get("sessions", [])
    
    if sessions:
        session_id = sessions[0]["sessionId"]
        # Save as UTZ test with sessionId
        # Proceed to classification in Step 3
    else:
        print(f"UTZ test has no sessions yet: test-uuid (skipping classification)")
        # Save as UTZ test but skip classification for now
    
elif response.status_code == 404:
    # Save as ClassicUT test (not compatible)
    pass
    
elif response.status_code == 429:
    # Rate limit exceeded; implement backoff
    time.sleep(10)
📘

Note: The guard data.get("sessions", []) is critical. UTZ tests may return 200 but have an empty sessions array if the test was just created and has no participant completions yet. The reference script skips classification for these tests and rechecks them in later runs once sessions become available. This prevents classification errors and enables discovery of newly-completed test sessions.

📘

Note: In the full reference script, Step 2 rate limiting is enforced in the main processing loop, and additional retry/backoff safeguards are included.

Step 3 — Classify tests by type

Methodology

For each UTZ test identified in Step 2, check which Results API endpoints return 200 (success). Based on endpoint availability, classify the test as one of three types:

  1. Interaction test — Returns video and/or QXscore data
  2. Think-out-loud test — Returns transcript data
  3. Survey — Returns session results only

The classification is a three-stage process, with each stage checking specific endpoints:

3a. Check for interaction test indicators

Endpoints

GET /api/v2/sessionResults/SESSION_ID/videoDownloadUrl
GET /api/v2/testResults/TEST_ID/qxScores

Decision logic

If either endpoint returns 200, classify as "interaction test".

Interaction tests involve user interactions with a prototype or website and produce:

  • Video recording URLs
  • Quality of Experience scores (QXscores) measuring user sentiment and behavior

Example (cURL)

Get video download URL:

curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/videoDownloadUrl' \
  --header 'Authorization: Bearer ACCESS_TOKEN'

Get QXscores:

curl --location 'https://api.use2.usertesting.com/api/v2/testResults/TEST_ID/qxScores' \
  --header 'Authorization: Bearer ACCESS_TOKEN'
👍

Optimization: Check the video URL endpoint first (it's faster to fail). Only check QXscores if video returns non-200. This reduces API calls for tests without video.


Code example

def check_video_url(session_id: str) -> bool:
    """Check if a session has a video recording available."""
    endpoint = f"https://api.use2.usertesting.com/api/v2/sessionResults/{session_id}/videoDownloadUrl"
    try:
        resp = requests.get(endpoint, headers=get_headers(), timeout=10)
        return resp.status_code == 200
    except Exception:
        return False

def check_qx_scores(test_id: str) -> bool:
    """Check if a test has QX (Quality of Experience) scores available."""
    endpoint = f"https://api.use2.usertesting.com/api/v2/testResults/{test_id}/qxScores"
    try:
        resp = requests.get(endpoint, headers=get_headers(), timeout=10)
        return resp.status_code == 200
    except Exception:
        return False

# Classification logic (Step 3a)
if check_video_url(session_id) or check_qx_scores(test_id):
    test_type = "interaction test"
    proceed_to_next_step = False
else:
    proceed_to_next_step = True
📝

Note: See the full reference script for timeout/retry handling.

3b. Check for think-out-loud test indicator

Endpoint

GET /api/v2/sessionResults/SESSION_ID/transcript

Decision logic

If the video and QXscore endpoints did not return 200 in Step 3a, and this endpoint returns 200, classify as "think-out-loud test".

Think-out-loud tests capture participant verbal feedback as they complete tasks. The transcript is provided in WebVTT (Web Video Text Tracks) format.

Example (cURL)

Get transcript in WebVTT format:

curl --location 'https://api.use2.usertesting.com/api/v2/sessionResults/SESSION_ID/transcript' \
  --header 'Authorization: Bearer ACCESS_TOKEN'

Code example

def check_transcript(session_id: str) -> bool:
    """Check if a session has a transcript available."""
    endpoint = f"https://api.use2.usertesting.com/api/v2/sessionResults/{session_id}/transcript"
    try:
        resp = requests.get(endpoint, headers=get_headers(), timeout=10)
        return resp.status_code == 200
    except Exception:
        return False

# Classification logic (Step 3b)
if check_transcript(session_id):
    test_type = "think-out-loud test"
else:
    test_type = "survey"
📘

Note: See the full reference script for timeout/retry handling.


3c. Default classification

If none of the interaction or think-out-loud indicators returned 200, classify as "survey".

Surveys collect structured responses to questions without video, transcript, or QXscore data. They are fully supported by the Results API but do not produce media or interaction metrics.


Step 4 — Persist results incrementally

Storage strategy

Save classification results immediately after each test is processed. This makes your workflow interrupt-safe: if the script stops, you can resume without reprocessing completed tests.

Files to create

FilePurposeExample Content
state/utz_tests.jsonUTZ test metadata with classifications[{"test_uuid": "...", "test_type": "interaction test", "workspace_uuid": "...", "workspace_name": "..."}]
state/classicut_tests.jsonLegacy tests (not compatible)[{"test_uuid": "...", "workspace_uuid": "...", "workspace_name": "..."}]
state/errors.jsonAPI errors and failures for review{"test_uuid": {"status_code": 429, "error": "Rate limit exceeded"}}

Code example

def update_test_classification(test_uuid: str, test_type: str) -> None:
    """
    Update a specific test's classification in utz_tests.json.
    This is called immediately after classifying each test (interrupt-safe).
    """
    # Load existing records
    utz_records = load_object_list("state/utz_tests.json")
    
    # Find and update the test record
    for record in utz_records:
        if record.get("test_uuid") == test_uuid:
            record["test_type"] = test_type
            break
    
    # Save back to file
    save_object_list("state/utz_tests.json", utz_records)

def save_object_list(path: str, items: List[Dict[str, Any]]) -> None:
    """Save JSON array, deduplicating by test_uuid."""
    # Deduplicate by test_uuid (keep first occurrence)
    seen = set()
    deduped = []
    for item in items:
        test_id = item.get("test_uuid")
        if test_id and test_id not in seen:
            deduped.append(item)
            seen.add(test_id)
    
    with open(path, "w", encoding="utf-8") as f:
        json.dump(deduped, f, indent=2)
📘

Note: See the full reference script for timeout/retry handling.

Step 5 — Verify success

After the run completes, verify the workflow output using the summary report and the verification checks printed by the script.

Expected summary signals

  • Done. Processing complete. appears
  • Totals are printed for UTZ total and ClassicUT total
  • Classification counts are printed for interaction tests, think-out-loud tests, and surveys
  • Output file paths are printed under ✓ Results saved to:

Expected verification checks

The script runs a built-in validation block and prints PASS/INFO/WARN status lines:

Verification checks:
  [PASS] Output files exist
  [PASS] Output files are valid JSON
  [PASS] UTZ records include expected fields and valid test_type values
  [INFO] Classification coverage: classified=..., unclassified=..., utz_total=...
  [PASS] No processing errors recorded

How to interpret results

  • If unclassified is greater than 0, this can be expected (for example, UTZ tests with no sessions yet)
  • If errors.json contains records, treat the run as partial success and review those errors before downstream automation
  • If any verification line shows FAIL, fix the data issue and rerun before using outputs in production workflows
👍

Tip: Keep this verification step in every scheduled run so data quality checks happen automatically before dashboards, ETL, or AI pipelines consume the results.


Rate limits and batching

Respect request throttling

The Results API endpoints enforce strict rate limits:

EndpointLimitStrategy
/api/v2/sessionResults10 requests per minuteEnforce 6-second interval between requests
/api/v2/sessionResults/{id}/videoDownloadUrl10 requests per minuteShare the same 6-second interval
/api/v2/sessionResults/{id}/transcript10 requests per minuteShare the same 6-second interval
/api/v2/testResults/{id}/qxScores10 requests per minuteShare the same 6-second interval

Implementation

last_call_time = 0.0
REQUEST_INTERVAL_SECONDS = 6.0

def enforce_rate_limit():
    """Enforce minimum interval between API calls."""
    global last_call_time
    now = time.time()
    elapsed = now - last_call_time
    if elapsed < REQUEST_INTERVAL_SECONDS:
        time.sleep(REQUEST_INTERVAL_SECONDS - elapsed)
    last_call_time = time.time()

def make_api_request(url: str, params: Dict = None) -> requests.Response:
    """Make rate-limited API request."""
    enforce_rate_limit()
    return requests.get(url, headers=get_headers(), params=params, timeout=30)

Exponential backoff for 429 errors

If you receive a 429 (Too Many Requests) response, implement exponential backoff:

import time

def make_request_with_backoff(func, max_retries: int = 5):
    """Retry request with exponential backoff on 429 errors."""
    for attempt in range(max_retries):
        resp = func()
        if resp.status_code != 429:
            return resp
        wait_time = 2 ** attempt  # 1, 2, 4, 8, 16 seconds
        print(f"Rate limited. Waiting {wait_time} seconds...")
        time.sleep(wait_time)
    return resp

Timeout management

  • Estimate runtime: For 100 tests with classification, expect 30-60 minutes
  • Use persistent sessions: Run with nohup or screen to survive terminal disconnections
  • Monitor progress: Print status updates (e.g., "Processing test 45/100")
  • Plan for scaling: 1000+ tests may take several hours

Resumable processing with checkpoints

Why checkpoints matter

Long-running workflows can be interrupted by network issues, token expiration, or system restarts. Saving state incrementally ensures you don't lose progress.

Another reason for checkpoints is that you may want to run the workflow multiple times as your team adds new tests to the workspace or tests that didn't have sessions previously can now be classified.

Checkpoint strategy

During Step 1 (discovery)

  • Save test UUIDs immediately after fetching each page
  • This prevents re-fetching pages unnecessarily

During Step 2 (identification)

  • Append tests to utz_tests.json or classicut_tests.json immediately
  • Track which tests have been identified to skip them on resume

During Step 3 (classification)

  • Update the test_type field immediately after classifying each test
  • Check if test_type is already populated before re-classifying

Resume logic

When you run the workflow multiple times, process only new tests and unclassified UTZ tests from previous interrupted runs:

# Load existing records from disk
utz_records = load_object_list("state/utz_tests.json")
classic_records = load_object_list("state/classicut_tests.json")
classifications = load_classifications()

# Build a set of already-processed tests (both UTZ and ClassicUT)
utz_known = {r.get("test_uuid") for r in utz_records if isinstance(r, dict) and r.get("test_uuid")}
classic_known = {r.get("test_uuid") for r in classic_records if isinstance(r, dict) and r.get("test_uuid")}
already_processed = utz_known.union(classic_known)

# Discover current tests in workspaces
all_current_tests = get_all_workspace_studies(workspace_uuid)
current_test_uuids = {t.get("uuid") for t in all_current_tests if isinstance(t, dict) and t.get("uuid")}

# Calculate processing sets
# 1. New tests not yet identified
new_to_process = sorted([u for u in current_test_uuids if u not in already_processed])

# 2. Previously identified UTZ tests still without classification (resumable gap)
utz_unclassified_to_recheck = sorted([
    u for u in current_test_uuids 
    if u in utz_known and u not in classifications
])

# Combine both sets for processing
to_process = sorted(set(new_to_process).union(utz_unclassified_to_recheck))

print(f"New tests to identify: {len(new_to_process)}")
print(f"Previously discovered UTZ tests without classification: {len(utz_unclassified_to_recheck)}")
print(f"Total to process: {len(to_process)}")

# Process only these tests in the main loop
for test_uuid in to_process:
    # Identify (Step 2) and classify (Step 3) this test
    response = get_session_results(test_uuid)
    
    if response.status_code == 200:
        # UTZ test
        sessions = response.json().get("sessions", [])
        if sessions:
            session_id = sessions[0]["sessionId"]
            if test_uuid not in classifications:
                test_type = classify_test(test_uuid, session_id)
                update_test_classification(test_uuid, test_type)
        else:
            # UTZ tests without sessions are skipped until sessions appear
            pass
    elif response.status_code == 404:
        # ClassicUT test
        pass
📘

Note: The reference script skips UTZ tests that return 200 but have no sessions, and rechecks them in later runs once sessions are available.

Why this approach

  • Efficiency: Avoids redundant API calls for tests already identified as UTZ or ClassicUT
  • Resilience: Catches UTZ tests discovered but interrupted before classification (in utz_unclassified_to_recheck)
  • Discovery: Automatically picks up tests added to the workspace since last run (in new_to_process) and tests that were previously unclassifiable due to no sessions but now have sessions
  • Incremental: Each run processes only deltas, not the entire test library

Token refresh for long-running jobs

Problem

OAuth tokens expire after 3600 seconds (~1 hour). For workflows processing hundreds of tests over 30+ minutes, tokens will expire mid-execution.

Solution

The script should check token expiration and refresh proactively:

import time
import subprocess
import os
from dotenv import load_dotenv

REFRESH_BEFORE_EXPIRY_SECONDS = 5 * 60  # Refresh 5 minutes before expiry

def get_token_expiry_time() -> Optional[float]:
    """Get token expiry epoch time from environment."""
    expires_in_str = os.environ.get("EXPIRES_IN")
    if expires_in_str:
        try:
            return float(expires_in_str)
        except ValueError:
            return None
    return None

def should_refresh_token() -> bool:
    """Check if token should be refreshed."""
    expiry = get_token_expiry_time()
    if not expiry:
        return True
    time_until_expiry = expiry - time.time()
    return time_until_expiry < REFRESH_BEFORE_EXPIRY_SECONDS

def refresh_access_token() -> bool:
    """Refresh the access token by running generate-token.py."""
    try:
        subprocess.run(
            ["python", "generate-token.py"],
            check=True,
            capture_output=True
        )
        load_dotenv(override=True)  # Reload .env
        return True
    except subprocess.CalledProcessError:
        return False

def get_headers() -> Dict[str, str]:
    """Build HTTP headers with fresh token."""
    if should_refresh_token():
        if not refresh_access_token():
            print("Warning: Token refresh failed")
    
    token = os.environ.get("ACCESS_TOKEN")
    return {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

Common errors and troubleshooting

ErrorStatusMeaningSolution
Invalid pagination parameters400Offset or limit parameters are out of rangeVerify offset is 0-10000 and limit is 1-500; see OAS for /api/v2/sessionResults
Missing or invalid access token401Access token is missing, invalid, or expiredRun generate-token.py to refresh; verify .env file contains valid ACCESS_TOKEN
Resource not found404Test UUID doesn't exist, is a ClassicUT test, or resource (video/transcript) is unavailableVerify test/resource UUID; 404 on /sessionResults indicates ClassicUT test (expected for legacy tests)
Rate limit exceeded429Made too many requests (10 requests per minute limit)Reduce request frequency; use x-ratelimit-remaining header to monitor; read Retry-After header and wait before retrying

What you can build next

Once you have a complete classification and inventory of all your tests, you can:

  • Power dashboards — Visualize test type distribution, coverage, and age
  • Automate data pipelines — Feed classified test UUIDs into scheduled Result API extraction jobs (see Create an Automated Pipeline)
  • Drive AI workflows — Route transcripts from think-out-loud tests (beta) to LLM-based insight generation
  • Enable self-service — Build an internal API that returns test type and capabilities for any given test UUID
  • Integrate with metadata systems — Sync findings to your data warehouse or metadata catalog

Summary

You now have a scalable pattern to:

  • Discover all tests across multiple workspaces using cursor-based pagination
  • Identify compatibility by checking Results API endpoint responses
  • Classify tests based on data availability (survey, interaction, think-out-loud)
  • Persist state incrementally for resumable, long-running workflows
  • Verify results with built-in checks for file integrity, schema validity, and classification coverage
  • Manage tokens and rate limits automatically for production stability

This classification workflow forms the foundation for advanced automation, ensuring your data pipelines only process compatible tests and your teams understand what insights are available from each test in your library.