Pipeline
Every request crosses the same eight stages on the way in. Each stage with its Rust module, type signatures, and p50 cost.
MapEngine::handle_request in engine/core/src/engine.rs (lines 1117–1267) executes eight ordered stages for every request. Refusal at any stage produces a structured response with the same audit weight as success.
Pipeline at a glance
| # | Stage | Rust module | Failure mode |
|---|---|---|---|
| 1 | Version Resolution | ProtocolRegistry::best_version | CoreError::UnknownProtocol / UnknownVersion |
| 2 | Context Enrichment | ContextManager::new_request_context | rarely fails; always advances |
| 3 | Rate Limiting | ProductionRateLimiter::check_rate_limit | RateLimited (event recorded) |
| 4 | Security Gating | SecurityGateway::is_operation_allowed | CapabilityDenied (event recorded) |
| 5 | Circuit Breaking | ProductionLoadBalancer::is_available | CircuitOpen (event recorded) |
| 6 | Load Balancing | ProductionLoadBalancer::select_endpoint | NoEndpointAvailable |
| 7 | Router Invocation | ProtocolRouter::invoke → ProtocolModule::invoke | ProtocolError → CoreError (mapped) |
| 8 | Result Handling | AuditPipeline::record + metric emission | non-blocking; never breaks the response |
The full pipeline runs in roughly 3–5ms p50 on a warm cluster, plus the variable cost of stage 7 (the protocol module itself).
Stages in detail
Stage 1 — Version Resolution
let input = self.protocol_registry.best_version(&input).await?;ProtocolRegistry::best_version accepts a HandleRequestInput and resolves to the best installed version of the named protocol — exact match if a specific version was requested, otherwise the highest stable version registered. The registry caches resolutions in an Arc<RwLock<HashMap<_,_>>>.
If the protocol is not registered, CoreError::UnknownProtocol(name) is returned. If a specific version was requested but is not installed, CoreError::UnknownVersion(name, version).
Stage 2 — Context Enrichment
let _ctx = self.context_manager.new_request_context(&input.tenant_id);ContextManager attaches:
- a unique
correlation_id(UUID v4) - the
tenant_idfrom the request - the request timestamp (RFC3339)
- the
TraceContext(W3C:traceparent,tracestate) - a metadata map seeded with the caller DID
This is separate from InvokeContext populated by the identity middleware, which already attached the resolved identity before handle_request was called.
Stage 3 — Rate Limiting
self.rate_limiter
.check_rate_limit(&input.tenant_id, &input.protocol, &input.operation)
.await?;ProductionRateLimiter uses a per-tenant token bucket with operation-specific multipliers. Buckets are persisted (typically Redis) so rates survive engine restarts. Heavy agents (MARC, MACE, MAGI) have lower bucket capacity than infrastructure protocols (MOMENT, MOTET).
A refused request records a RateLimitExceeded event via AuditPipeline and returns CoreError::RateLimited { retry_after: Duration }.
Stage 4 — Security Gating
let tenant_caps = self.security_gateway.tenant_capabilities(&input.tenant_id).await?;
if !self.security_gateway.is_operation_allowed(&tenant_caps, &input.protocol, &input.operation) {
return Err(CoreError::CapabilityDenied { protocol, operation });
}SecurityGateway::is_operation_allowed walks capabilities hierarchically:
- Exact match:
map.{protocol}.{operation}(e.g.map.macs.auth_negotiation) - Protocol wildcard:
map.{protocol}.* - Global wildcard:
map.*.*(reserved for ops/admin tenants)
Any match grants. No match denies. A denied request records a SecurityViolation event with the requested capability.
Stage 5 — Circuit Breaking
if !self.load_balancer.is_available(&input.protocol).await {
return Err(CoreError::CircuitOpen { protocol });
}ProductionLoadBalancer::is_available returns false when the per-protocol circuit breaker is Open (too many recent failures) or HalfOpen (probing). The breaker is per-protocol-per-endpoint and uses the standard CLOSED → OPEN → HALF_OPEN state machine with exponential backoff.
A blocked request records a CircuitBreakerOpen event.
Stage 6 — Load Balancing
let endpoint = self.load_balancer.select_endpoint(&input.protocol).await?;For protocols with multiple healthy endpoints (typically the agent protocols), select_endpoint returns the next endpoint according to the configured strategy (round-robin, weighted least-request, or hash-by-tenant). For first-party native protocols compiled into the binary, this collapses to a singleton "local" endpoint.
Stage 7 — Router Invocation
let response = self.router.invoke(
&input.protocol,
Some(&input.version),
&input.operation,
input.input,
&ctx,
).await?;ProtocolRouter::invoke looks up the registered ProtocolModule for protocol:version, caches it in an LruCache<String, Arc<dyn ProtocolModule>> (capacity 1024), and calls:
module.invoke(operation, payload, &ctx).awaitThe module returns Result<Response, ProtocolError>. The router maps ProtocolError to CoreError via heuristic on the error message — keywords like adapter, timeout, invalid route to specific CoreError variants for downstream callers to handle uniformly.
This is the only stage with variable latency. The other seven add up to a few milliseconds.
Stage 8 — Result Handling
self.audit_pipeline.record(
"ProtocolInvocation",
&json!({
"protocol": input.protocol,
"operation": input.operation,
"tenant": input.tenant_id,
"outcome": outcome,
"latency_ms": elapsed,
})
);
self.metrics.emit_invocation(&input.protocol, &input.operation, outcome, elapsed);
return Ok(HandleRequestOutput { output: response.data });AuditPipeline::record is non-blocking — it spawns the actual write as a task so it never affects the request's tail latency. The event taxonomy is ProtocolInvocation | CapabilityGrant | AuthorizationCheck | RateLimitExceeded | CircuitBreakerOpen | SecurityViolation | Error.
Metrics are emitted via the bound MetricsBackend (Prometheus by default), tagged with protocol, operation, outcome, and quantized latency buckets.
Identity middleware: before stage 1
IdentityMiddleware::enrich (in engine/identity/src/middleware.rs) runs before MapEngine::handle_request. It takes the inbound agent_did from the HTTP gateway, calls the bound IdentityResolver::resolve, and attaches the result to InvokeContext::resolved_identity so every downstream stage can read it.
Three outcomes:
pub enum ResolveOutcome {
Resolved(ResolvedAgentIdentity),
Partial { identity: ResolvedAgentIdentity, warnings: Vec<String> },
Failed { did: String, error: String },
}Failed falls back to ResolvedAgentIdentity::unresolved(did) so requests are not silently dropped — they pass into the pipeline with an unresolved identity and are typically denied at Stage 4 (Security Gating) for lacking the required capability.
Tail of the pipeline
Once stages 1–8 complete, the response flows back through the gateway, with:
- The
traceparentheader propagated for downstream tracing - The audit-chain head hash returned in the response metadata
- Rate-limit headers (
X-RateLimit-*) for client back-off
The audit head you receive in the response metadata is verifiable against the chain via MAX::traceability_graph. This is how MIMESIS reconstructs precedent and how MOOT reconstructs cases.