Skip to content

Telemetry & Audit

Structured security event emission for SOC integration, compliance, and observability.

The Problem

Without structured telemetry, security events end up as unstructured log strings scattered across services. This makes it impossible to:

  • Correlate events across a multi-agent chain
  • Build dashboards for guardrail hit rates or judge decisions
  • Feed a SIEM with machine-readable security data
  • Trace a blocked request back to its root cause

Event Schema

Every security-relevant action emits an AISecurityEvent, a typed, self-contained record:

from airs.telemetry import AISecurityEvent, EventType

event = AISecurityEvent(
    event_type=EventType.GUARDRAIL_INPUT,
    correlation_id="abc123",           # ties all events in one request
    request_id="req_456",
    user_id="user_1",
    agent_chain=["orchestrator", "retriever"],
    delegation_depth=1,
    allowed=False,
    verdict="block",
    reason="Prompt injection pattern detected",
    layer="guardrail",
    pace_state="primary",
)

# Machine-readable output
print(event.to_json())
print(event.to_dict())

Event Types

Type When It's Emitted
guardrail.input Input guardrail evaluates a request
guardrail.output Output guardrail evaluates a response
judge.decision Model-as-Judge makes a verdict
tool.policy Tool policy engine allows/denies a tool call
delegation Agent-to-agent delegation is checked
pace.transition PACE state changes (e.g. Primary → Alternate)
circuit_breaker.state_change Circuit breaker opens, closes, or half-opens
pipeline.input Full pipeline input evaluation completes
pipeline.output Full pipeline output evaluation completes

Event Fields

Every event includes:

Field Purpose
event_id Unique ID for this event
event_type One of the types above
timestamp When it happened
correlation_id Ties events across an entire agent chain
request_id Ties events to a specific AIRequest
user_id The originating user
agent_chain List of agent IDs that touched this request
delegation_depth How deep in the delegation chain
allowed Whether the action was permitted
verdict Layer-specific verdict (pass/block/flag/escalate)
reason Human-readable explanation
confidence Judge confidence score (0.0–1.0)
layer Which control layer (guardrail/judge/circuit_breaker)
tool_name For tool policy events
pace_state Current PACE state
metadata Arbitrary key-value pairs

Audit Sinks

Events are sent to audit sinks, pluggable destinations. Register one or more:

Log Sink (structured JSON to Python logging)

from airs.telemetry import LogAuditSink
from airs.telemetry.events import register_sink

register_sink(LogAuditSink(logger_name="airs.audit"))

Output (one JSON line per event):

{"event_id":"a1b2c3","event_type":"pipeline.input","allowed":true,"user_id":"user_1","agent_chain":["orch"],...}

Callback Sink (custom handler)

from airs.telemetry import CallbackAuditSink
from airs.telemetry.events import register_sink

def send_to_siem(event):
    # Send to Splunk, Elastic, Sentinel, etc.
    requests.post(SIEM_URL, json=event.to_dict())

register_sink(CallbackAuditSink(send_to_siem))

Buffer Sink (in-memory, for testing or batch export)

from airs.telemetry.audit import BufferAuditSink
from airs.telemetry.events import register_sink

sink = BufferAuditSink(max_size=10_000)
register_sink(sink)

# After running your pipeline...
for event in sink.events:
    print(event.event_type, event.allowed, event.reason)

sink.clear()

Custom Sink

Implement the AuditSink interface:

from airs.telemetry import AuditSink, AISecurityEvent

class MyDatabaseSink(AuditSink):
    def handle(self, event: AISecurityEvent) -> None:
        db.insert("security_events", event.to_dict())

Automatic Pipeline Telemetry

The SecurityPipeline emits pipeline.input and pipeline.output events automatically. If an AgentContext is attached to the request, agent chain info is included:

from airs.agents import AgentIdentity, AgentContext
from airs.core.models import AIRequest
from airs.runtime import SecurityPipeline
from airs.telemetry.audit import BufferAuditSink
from airs.telemetry.events import register_sink

# Capture events
sink = BufferAuditSink()
register_sink(sink)

# Build request with agent context
ctx = AgentContext(
    user_id="user_1",
    origin_agent=AgentIdentity(agent_id="orch"),
)
request = AIRequest(input_text="Hello", agent_context=ctx)

# Pipeline emits events automatically
pipeline = SecurityPipeline()
await pipeline.evaluate_input(request)

event = sink.events[0]
print(event.user_id)         # "user_1"
print(event.agent_chain)     # ["orch"]
print(event.correlation_id)  # matches ctx.correlation_id

SOC Integration Pattern

For production, pipe events to your SIEM:

import logging
from airs.telemetry import LogAuditSink
from airs.telemetry.events import register_sink

# Configure a dedicated logger that ships to your log aggregator
audit_logger = logging.getLogger("airs.audit")
audit_logger.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address="/dev/log")
audit_logger.addHandler(handler)

# Register the sink - all pipeline events now go to syslog
register_sink(LogAuditSink(logger_name="airs.audit"))

From there, your SIEM can parse the JSON events and build:

  • Dashboards: guardrail block rates, judge escalation trends, PACE state over time
  • Alerts: circuit breaker trips, delegation depth anomalies, denied tool calls
  • Audit trails: full request trace from user → agent chain → decision → action