Overview
ValiqorFAClient is the core of Valiqor — it finds why your AI app fails, not just that it fails. It analyzes AI inputs and outputs to detect failure patterns, categorize them into a taxonomy of buckets, and provide root cause analysis with severity scoring.
Two modes of operation:
- Dataset mode — pass existing inputs/outputs directly as a list of dicts
- Trace mode — pass a
trace_id from a traced execution
from valiqor import ValiqorClient
client = ValiqorClient(api_key="your-api-key")
fa = client.failure_analysis
Or standalone:
from valiqor.failure_analysis import ValiqorFAClient
fa = ValiqorFAClient(api_key="your-api-key")
Supports context manager protocol: with ValiqorFAClient(...) as fa:
Constructor
ValiqorFAClient(
api_key: Optional[str] = None,
project_name: Optional[str] = None,
base_url: Optional[str] = None,
timeout: int = 300,
openai_api_key: Optional[str] = None,
)
| Parameter | Type | Default | Description |
|---|
api_key | Optional[str] | None | Valiqor API key. |
project_name | Optional[str] | None | Default project name. |
base_url | Optional[str] | None | Backend URL override. |
timeout | int | 300 | Request timeout in seconds. |
openai_api_key | Optional[str] | None | OpenAI key for LLM judge calls. |
Core Methods
run()
Run failure analysis. Auto-polls if the backend returns an async response.
def run(
self,
trace_id: Optional[str] = None,
dataset: Optional[List[Dict[str, Any]]] = None,
project_name: Optional[str] = None,
feature_kind: Optional[str] = None,
run_eval: bool = True,
run_security: bool = True,
run_scan: bool = True,
mandatory_eval_metrics: Optional[List[str]] = None,
mandatory_security_categories: Optional[List[str]] = None,
subcategories: Optional[List[str]] = None,
buckets: Optional[List[str]] = None,
openai_api_key: Optional[str] = None,
) -> FARunResult
| Parameter | Type | Default | Description |
|---|
trace_id | Optional[str] | None | Trace ID for trace mode. Mutually exclusive with dataset. |
dataset | Optional[List[Dict]] | None | Items with input, output, and optionally context, tool_calls. |
feature_kind | Optional[str] | None | One of "rag", "agent", "agentic_rag", "generic_llm". |
run_eval | bool | True | Run evaluation metrics as part of FA. |
run_security | bool | True | Run security checks as part of FA. |
run_scan | bool | True | Run code scanning as part of FA. |
mandatory_eval_metrics | Optional[List[str]] | None | Specific eval metrics to always include. |
mandatory_security_categories | Optional[List[str]] | None | Specific security categories to always check. |
subcategories | Optional[List[str]] | None | Filter to specific failure subcategories. |
buckets | Optional[List[str]] | None | Filter to specific failure buckets. |
Returns: FARunResult
result = client.failure_analysis.run(
dataset=[
{
"input": "What is the capital of France?",
"output": "The capital of France is London.",
"context": "Paris is the capital of France."
}
],
feature_kind="rag"
)
print(f"Failures: {result.summary.total_failures_detected}")
print(f"Primary failure: {result.summary.primary_failure_name}")
for tag in result.failure_tags:
print(f" [{tag.decision}] {tag.subcategory_name} (severity: {tag.severity})")
run_async()
Always returns an AsyncJobHandle, even if the backend responds synchronously.
def run_async(
self,
trace_id: Optional[str] = None,
dataset: Optional[List[Dict[str, Any]]] = None,
project_name: Optional[str] = None,
feature_kind: Optional[str] = None,
run_eval: bool = True,
run_security: bool = True,
run_scan: bool = True,
mandatory_eval_metrics: Optional[List[str]] = None,
mandatory_security_categories: Optional[List[str]] = None,
subcategories: Optional[List[str]] = None,
buckets: Optional[List[str]] = None,
openai_api_key: Optional[str] = None,
) -> AsyncJobHandle
Returns: AsyncJobHandle
handle = client.failure_analysis.run_async(dataset=large_dataset)
handle.wait(on_progress=lambda s: print(f"{s.progress_percent}%"))
result = handle.result()
playground()
Lightweight single-item failure analysis for quick testing.
def playground(
self,
input: str,
output: str,
context: Optional[str] = None,
tool_calls: Optional[List[Dict[str, Any]]] = None,
) -> FAPlaygroundResult
Rate limited: 10 calls/day, 2 calls/minute.
result = client.failure_analysis.playground(
input="What is 2+2?",
output="2+2 is 5",
context="Basic arithmetic"
)
print(f"Checks passed: {result.checks_passed}")
print(f"Remaining today: {result.playground_limit_per_day - result.playground_runs_today}")
Result Retrieval
get_run()
Get a completed FA result by run ID.
def get_run(self, run_id: str) -> FARunResult
Get original input items with per-item failure statistics.
def get_run_inputs(self, run_id: str) -> List[FARunInput]
inputs = client.failure_analysis.get_run_inputs("run-id-123")
for item in inputs:
if item.has_failures:
print(f"Item {item.item_index}: {item.failure_count} failures, max severity {item.max_severity}")
Get failure tags for a run, with optional filtering.
def get_tags(
self,
run_id: str,
decision: Optional[str] = None,
bucket_id: Optional[str] = None,
min_severity: Optional[float] = None,
) -> List[FATag]
| Parameter | Type | Description |
|---|
decision | Optional[str] | Filter by "pass", "fail", or "unsure". |
bucket_id | Optional[str] | Filter to a specific failure bucket. |
min_severity | Optional[float] | Minimum severity threshold (0.0–5.0). |
update_tag()
Update a failure tag’s review status or link it to an external issue tracker.
def update_tag(
self,
tag_id: str,
is_reviewed: Optional[bool] = None,
issue_url: Optional[str] = None,
) -> Dict[str, Any]
| Parameter | Type | Description |
|---|
tag_id | str | The failure tag ID (required). |
is_reviewed | Optional[bool] | Set review status — True to mark reviewed, False to unmark. |
issue_url | Optional[str] | URL to an external issue (empty string to unlink). |
Returns: Dict with tag_id, is_reviewed, reviewed_at, issue_url, message.
# Mark a failure as reviewed
client.failure_analysis.update_tag("tag-id-123", is_reviewed=True)
# Link a failure to a GitHub issue
client.failure_analysis.update_tag(
"tag-id-123",
issue_url="https://github.com/org/repo/issues/42"
)
# Mark reviewed and link issue in one call
client.failure_analysis.update_tag(
"tag-id-123",
is_reviewed=True,
issue_url="https://github.com/org/repo/issues/42"
)
At least one of is_reviewed or issue_url must be provided.
Run History
list_runs()
List FA runs with pagination and filtering.
def list_runs(
self,
project_name: Optional[str] = None,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20,
) -> FARunListPage
get_run_count()
def get_run_count(self, project_name: Optional[str] = None) -> int
Job Management
poll_status()
Check the status of an async FA run.
def poll_status(self, run_id: str) -> FAJobStatus
cancel_run()
Cancel a running FA job.
def cancel_run(self, run_id: str) -> Dict[str, str]
Taxonomy
get_taxonomy()
Get the full failure taxonomy — all buckets with their subcategories.
def get_taxonomy(self, project_name: Optional[str] = None) -> List[FABucket]
taxonomy = client.failure_analysis.get_taxonomy()
for bucket in taxonomy:
print(f"\n{bucket.bucket_name}: {bucket.description}")
for sub in bucket.subcategories:
print(f" - {sub.subcategory_name} ({sub.detection_approach})")
get_subcategories()
Get subcategories with optional filtering.
def get_subcategories(
self,
project_name: Optional[str] = None,
bucket_id: Optional[str] = None,
detection_approach: Optional[str] = None,
applies_to: Optional[str] = None,
) -> List[FASubcategory]
| Parameter | Type | Description |
|---|
bucket_id | Optional[str] | Filter to a specific bucket. |
detection_approach | Optional[str] | "deterministic", "llm_judge", or "hybrid". |
applies_to | Optional[str] | Filter by applicable feature kind. |
get_bucket()
Get details for a specific failure bucket.
def get_bucket(self, bucket_id: str) -> FABucket
Analytics
get_insights()
Get aggregated failure insights for a project over a time period.
def get_insights(
self,
project_name: Optional[str] = None,
days: int = 30,
) -> FAInsightsSummary
insights = client.failure_analysis.get_insights(days=7)
print(f"Failure rate: {insights.overall_failure_rate:.1%}")
print(f"Avg severity: {insights.average_severity}")
for failure in insights.top_recurring_failures:
print(f" {failure.subcategory_name}: {failure.occurrence_count} occurrences")
get_trends()
Get failure trends over time.
def get_trends(
self,
project_name: Optional[str] = None,
days: int = 30,
) -> FATrends
get_security_insights()
Get security-specific insights from FA runs.
def get_security_insights(
self,
project_name: Optional[str] = None,
days: int = 30,
) -> FASecurityInsightsSummary
get_projects()
List all projects that have FA runs.
def get_projects(self) -> List[str]