Security Pipeline¶
The SecurityPipeline orchestrates all three layers into a single evaluation flow. It is the main entry point for integrating AIRS into your application.
Building a Pipeline¶
from airs.runtime import (
SecurityPipeline, GuardrailChain, RegexGuardrail,
ContentPolicyGuardrail, CircuitBreaker, PACEController,
)
from airs.runtime.judge import RuleBasedJudge
from airs.runtime.pipeline import PipelineConfig
pipeline = SecurityPipeline(
guardrails=GuardrailChain([
RegexGuardrail(),
ContentPolicyGuardrail(blocked_terms=["classified"]),
]),
judge=RuleBasedJudge(),
circuit_breaker=CircuitBreaker(),
pace=PACEController(),
config=PipelineConfig(
input_guardrails=True, # Run guardrails on input
output_guardrails=True, # Run guardrails on output
judge_enabled=True, # Enable judge evaluation
pace_enabled=True, # Let PACE control judge sampling
block_on_review=False, # True = block on REVIEW, False = log only
fallback_response="Service temporarily unavailable.",
),
)
Evaluation Flow¶
The pipeline does not call your AI model. It provides two methods, evaluate_input() and evaluate_output(), that you call around your own model call. This keeps AIRS model-agnostic: it works with any provider, framework, or architecture.
from airs.core.models import AIRequest, AIResponse
# 1. Evaluate input
request = AIRequest(
input_text=user_input,
user_id="user_123",
session_id="sess_abc",
model="gpt-4o",
)
input_result = await pipeline.evaluate_input(request)
if not input_result.allowed:
return error_response(input_result)
# 2. Call your model
ai_output = await your_model(request.input_text)
# 3. Evaluate output
response = AIResponse(
request_id=request.request_id,
output_text=ai_output,
model="gpt-4o",
)
output_result = await pipeline.evaluate_output(request, response)
if not output_result.allowed:
return error_response(output_result)
# 4. Return to user
return ai_output
Input Evaluation¶
evaluate_input() runs these checks in order:
- Circuit breaker: if OPEN, immediately return blocked
- Input guardrails: run the guardrail chain on input text
- If blocked by guardrails → record failure on circuit breaker, return blocked
Output Evaluation¶
evaluate_output() runs these checks in order:
- Output guardrails: run the guardrail chain on output text
- Judge: if PACE sampling triggers (or guardrail flagged), evaluate with judge
- Human approval: if PACE state requires human approval, return pending
- If all pass → record success on circuit breaker, return allowed
Pipeline Result¶
Both methods return a PipelineResult:
result = await pipeline.evaluate_output(request, response)
result.allowed # bool - should this be delivered?
result.pace_state # PACEState - current PACE posture
result.blocked_by # ControlLayer | None - which layer blocked
result.layer_results # list[LayerResult] - each layer's result
result.total_latency_ms # float - total evaluation time
# Convenience accessors
result.guardrail_result # LayerResult for guardrails
result.judge_result # LayerResult for judge
Handling Blocked Responses¶
if not result.allowed:
if result.blocked_by == ControlLayer.CIRCUIT_BREAKER:
# System is in emergency mode
return {"error": "Service unavailable", "retry_after": 300}
elif result.blocked_by == ControlLayer.GUARDRAIL:
# Known-bad pattern detected
log.warning("Guardrail block: %s", result.guardrail_result.reason)
return {"error": "Request could not be processed"}
elif result.blocked_by == ControlLayer.JUDGE:
# Policy violation detected
log.warning("Judge escalation: %s", result.judge_result.reason)
return {"error": "Response withheld pending review"}
elif result.blocked_by == ControlLayer.HUMAN:
# PACE requires human approval
await queue_for_review(request, response)
return {"status": "pending_review"}
Configuration Options¶
| Option | Default | Description |
|---|---|---|
input_guardrails |
True |
Run guardrails on input |
output_guardrails |
True |
Run guardrails on output |
judge_enabled |
True |
Enable judge evaluation |
pace_enabled |
True |
Let PACE control judge behavior |
block_on_review |
False |
Block when judge returns REVIEW (vs. just logging) |
fallback_response |
"Service temporarily unavailable..." |
Response when circuit breaker is open |
Choosing block_on_review¶
False(default): REVIEW verdicts are logged but the response is delivered. Appropriate for most deployments. Review flagged items asynchronously.True: REVIEW verdicts block the response until a human reviews it. Appropriate for HIGH/CRITICAL risk tiers where any uncertainty should halt delivery.
Callbacks¶
Register callbacks for blocked and escalated requests:
def on_block(result: PipelineResult):
"""Called when any layer blocks a request."""
metrics.increment("airs.blocked", tags={
"layer": result.blocked_by.value,
"pace_state": result.pace_state.value,
})
def on_escalate(result: PipelineResult):
"""Called when the judge escalates or returns REVIEW."""
send_to_review_queue(result)
if result.judge_result.verdict == "escalate":
alert_security_team(result)
pipeline = SecurityPipeline(
on_block=on_block,
on_escalate=on_escalate,
)
PACE Integration¶
When pace_enabled=True, the PACE state controls:
- Judge sampling rate: Primary samples 5%, Alternate evaluates 100%
- Human approval: Contingency and Emergency require human approval for all outputs
- Automatic escalation: Judge
ESCALATEverdicts trigger PACE escalation
# Pipeline automatically escalates PACE on judge escalation:
# 1. Judge returns ESCALATE
# 2. Pipeline calls pace.escalate()
# 3. PACE moves Primary → Alternate
# 4. All subsequent requests get 100% judge evaluation
Minimal Pipeline¶
For the simplest possible setup (Fast Lane):
pipeline = SecurityPipeline(
guardrails=GuardrailChain([RegexGuardrail()]),
config=PipelineConfig(
judge_enabled=False, # No judge needed for LOW risk
pace_enabled=False, # No PACE degradation
),
)
This gives you:
- Prompt injection detection on input
- PII detection on output
- Circuit breaker monitoring
- No judge latency, no external dependencies