Valiqor supports asynchronous execution for all analysis modules. This page covers the async patterns, job management, and batch processing.
How async works
When you submit a request, the backend decides whether to process it synchronously or asynchronously:
Your code → SDK → POST request → Backend
├── 200 → Result inline (sync)
└── 202 → Job ID (async) → SDK auto-polls → Result
This is fully transparent — your code looks synchronous regardless:
# Works the same for 1 item or 1000 items
result = client.eval.evaluate(
dataset=large_dataset,
metrics=["hallucination", "answer_relevance"],
)
# SDK auto-polls if backend returned 202
Transparent vs explicit async
Transparent (auto)
Explicit async
Use the standard methods — the SDK handles async automatically:# These auto-poll if the backend returns 202
fa_result = client.failure_analysis.run(dataset=data)
eval_result = client.eval.evaluate(dataset=data, metrics=metrics)
sec_result = client.security.audit(dataset=data)
Your code blocks until the result is ready. Simple and clean. Use *_async() methods to get a job handle for progress tracking:# Always returns a handle, never blocks
handle = client.failure_analysis.run_async(dataset=data)
handle = client.eval.evaluate_async(dataset=data, metrics=metrics)
handle = client.security.audit_async(dataset=data)
handle = client.security.red_team_async(attack_vectors=["jailbreak"])
AsyncJobHandle
All async methods return a job handle with the same interface:
Check status
handle = client.failure_analysis.run_async(dataset=data)
status = handle.status() # Returns AsyncJobStatus
print(f"Job ID: {status.job_id}")
print(f"Status: {status.status}") # "queued", "running", "processing", "completed", "failed", "cancelled"
print(f"Progress: {status.progress_percent:.0f}%")
print(f"Items: {status.current_item}/{status.total_items}")
Boolean checks
handle.is_running() # True if queued/running/processing
handle.is_completed() # True if completed
Wait for completion
# Block until done
result = handle.result()
# Or wait with progress callback
final_status = handle.wait(
poll_interval=2.0, # Seconds between polls (default: 2.0)
timeout=300, # Max seconds to wait (default: no timeout)
on_progress=lambda s: print(f"{s.progress_percent:.0f}% ({s.current_item}/{s.total_items})"),
)
result = handle.result()
Cancel
Properties
handle.job_id # Unique job identifier
handle.job_type # "evaluation", "security", "redteam", "failure_analysis"
AsyncJobStatus fields
| Field | Type | Description |
|---|
job_id | str | Unique job identifier |
job_type | str | Type of job |
status | str | "queued", "running", "processing", "completed", "failed", "cancelled" |
progress_percent | float | 0.0 to 100.0 |
current_item | int | Items processed so far |
total_items | int | Total items to process |
started_at | str | ISO timestamp |
finished_at | str | ISO timestamp (when done) |
error | str | Error message (if failed) |
Helper properties:
status.is_running — True if still processing
status.is_completed — True if completed successfully
status.is_failed — True if failed or cancelled
Polling mechanism
The SDK uses HTTP polling — it calls the status endpoint in a loop with time.sleep():
# This is what handle.wait() does internally:
while True:
status = handle.status() # GET /v2/.../status
if not status.is_running:
break
time.sleep(2.0) # Default poll interval
There is no WebSocket-based progress in the SDK. For real-time progress, use the Valiqor Dashboard.
All analysis modules accept datasets as JSON arrays:
dataset = [
{
"input": "What is the capital of France?",
"output": "Paris is the capital of France.",
"context": ["France is a country. Its capital is Paris."],
},
{
"input": "Explain quantum computing.",
"output": "Quantum computing uses qubits...",
"context": ["Quantum computing harnesses quantum mechanics."],
},
# ... hundreds or thousands of items
]
From a JSON file
import json
with open("test_data.json") as f:
dataset = json.load(f)
result = client.failure_analysis.run(dataset=dataset)
Security audits use a different schema:
security_dataset = [
{
"user_input": "How do I hack a website?",
"assistant_response": "I cannot help with that.",
},
# ...
]
result = client.security.audit(dataset=security_dataset)
Error handling
from valiqor.common.exceptions import (
TimeoutError,
QuotaExceededError,
DatasetTooLargeError,
)
try:
handle = client.failure_analysis.run_async(dataset=huge_dataset)
result = handle.wait(timeout=600)
except TimeoutError:
print("Job didn't finish in 10 minutes")
handle.cancel()
except QuotaExceededError:
print("Monthly quota reached — upgrade plan")
except DatasetTooLargeError:
print("Dataset too large — split into smaller batches")
except RuntimeError as e:
# Job failed or was cancelled
print(f"Job error: {e}")
CLI async
All CLI commands support async with --async and status polling:
# Start async run
valiqor fa run --dataset data.json --project-name my-app
# Check status
valiqor fa status --run-id run_xyz
# Get result when done
valiqor fa result --run-id run_xyz --output results.json