MAP Docs
Engine

Pipeline

Every request crosses the same eight stages on the way in. Each stage with its Rust module, type signatures, and p50 cost.

MapEngine::handle_request in engine/core/src/engine.rs (lines 1117–1267) executes eight ordered stages for every request. Refusal at any stage produces a structured response with the same audit weight as success.

Pipeline at a glance

#StageRust moduleFailure mode
1Version ResolutionProtocolRegistry::best_versionCoreError::UnknownProtocol / UnknownVersion
2Context EnrichmentContextManager::new_request_contextrarely fails; always advances
3Rate LimitingProductionRateLimiter::check_rate_limitRateLimited (event recorded)
4Security GatingSecurityGateway::is_operation_allowedCapabilityDenied (event recorded)
5Circuit BreakingProductionLoadBalancer::is_availableCircuitOpen (event recorded)
6Load BalancingProductionLoadBalancer::select_endpointNoEndpointAvailable
7Router InvocationProtocolRouter::invokeProtocolModule::invokeProtocolError → CoreError (mapped)
8Result HandlingAuditPipeline::record + metric emissionnon-blocking; never breaks the response

The full pipeline runs in roughly 3–5ms p50 on a warm cluster, plus the variable cost of stage 7 (the protocol module itself).

Stages in detail

Stage 1 — Version Resolution

let input = self.protocol_registry.best_version(&input).await?;

ProtocolRegistry::best_version accepts a HandleRequestInput and resolves to the best installed version of the named protocol — exact match if a specific version was requested, otherwise the highest stable version registered. The registry caches resolutions in an Arc<RwLock<HashMap<_,_>>>.

If the protocol is not registered, CoreError::UnknownProtocol(name) is returned. If a specific version was requested but is not installed, CoreError::UnknownVersion(name, version).

Stage 2 — Context Enrichment

let _ctx = self.context_manager.new_request_context(&input.tenant_id);

ContextManager attaches:

  • a unique correlation_id (UUID v4)
  • the tenant_id from the request
  • the request timestamp (RFC3339)
  • the TraceContext (W3C: traceparent, tracestate)
  • a metadata map seeded with the caller DID

This is separate from InvokeContext populated by the identity middleware, which already attached the resolved identity before handle_request was called.

Stage 3 — Rate Limiting

self.rate_limiter
    .check_rate_limit(&input.tenant_id, &input.protocol, &input.operation)
    .await?;

ProductionRateLimiter uses a per-tenant token bucket with operation-specific multipliers. Buckets are persisted (typically Redis) so rates survive engine restarts. Heavy agents (MARC, MACE, MAGI) have lower bucket capacity than infrastructure protocols (MOMENT, MOTET).

A refused request records a RateLimitExceeded event via AuditPipeline and returns CoreError::RateLimited { retry_after: Duration }.

Stage 4 — Security Gating

let tenant_caps = self.security_gateway.tenant_capabilities(&input.tenant_id).await?;
if !self.security_gateway.is_operation_allowed(&tenant_caps, &input.protocol, &input.operation) {
    return Err(CoreError::CapabilityDenied { protocol, operation });
}

SecurityGateway::is_operation_allowed walks capabilities hierarchically:

  1. Exact match: map.{protocol}.{operation} (e.g. map.macs.auth_negotiation)
  2. Protocol wildcard: map.{protocol}.*
  3. Global wildcard: map.*.* (reserved for ops/admin tenants)

Any match grants. No match denies. A denied request records a SecurityViolation event with the requested capability.

Stage 5 — Circuit Breaking

if !self.load_balancer.is_available(&input.protocol).await {
    return Err(CoreError::CircuitOpen { protocol });
}

ProductionLoadBalancer::is_available returns false when the per-protocol circuit breaker is Open (too many recent failures) or HalfOpen (probing). The breaker is per-protocol-per-endpoint and uses the standard CLOSED → OPEN → HALF_OPEN state machine with exponential backoff.

A blocked request records a CircuitBreakerOpen event.

Stage 6 — Load Balancing

let endpoint = self.load_balancer.select_endpoint(&input.protocol).await?;

For protocols with multiple healthy endpoints (typically the agent protocols), select_endpoint returns the next endpoint according to the configured strategy (round-robin, weighted least-request, or hash-by-tenant). For first-party native protocols compiled into the binary, this collapses to a singleton "local" endpoint.

Stage 7 — Router Invocation

let response = self.router.invoke(
    &input.protocol,
    Some(&input.version),
    &input.operation,
    input.input,
    &ctx,
).await?;

ProtocolRouter::invoke looks up the registered ProtocolModule for protocol:version, caches it in an LruCache<String, Arc<dyn ProtocolModule>> (capacity 1024), and calls:

module.invoke(operation, payload, &ctx).await

The module returns Result<Response, ProtocolError>. The router maps ProtocolError to CoreError via heuristic on the error message — keywords like adapter, timeout, invalid route to specific CoreError variants for downstream callers to handle uniformly.

This is the only stage with variable latency. The other seven add up to a few milliseconds.

Stage 8 — Result Handling

self.audit_pipeline.record(
    "ProtocolInvocation",
    &json!({
        "protocol": input.protocol,
        "operation": input.operation,
        "tenant": input.tenant_id,
        "outcome": outcome,
        "latency_ms": elapsed,
    })
);
self.metrics.emit_invocation(&input.protocol, &input.operation, outcome, elapsed);
return Ok(HandleRequestOutput { output: response.data });

AuditPipeline::record is non-blocking — it spawns the actual write as a task so it never affects the request's tail latency. The event taxonomy is ProtocolInvocation | CapabilityGrant | AuthorizationCheck | RateLimitExceeded | CircuitBreakerOpen | SecurityViolation | Error.

Metrics are emitted via the bound MetricsBackend (Prometheus by default), tagged with protocol, operation, outcome, and quantized latency buckets.

Identity middleware: before stage 1

IdentityMiddleware::enrich (in engine/identity/src/middleware.rs) runs before MapEngine::handle_request. It takes the inbound agent_did from the HTTP gateway, calls the bound IdentityResolver::resolve, and attaches the result to InvokeContext::resolved_identity so every downstream stage can read it.

Three outcomes:

pub enum ResolveOutcome {
    Resolved(ResolvedAgentIdentity),
    Partial { identity: ResolvedAgentIdentity, warnings: Vec<String> },
    Failed { did: String, error: String },
}

Failed falls back to ResolvedAgentIdentity::unresolved(did) so requests are not silently dropped — they pass into the pipeline with an unresolved identity and are typically denied at Stage 4 (Security Gating) for lacking the required capability.

Tail of the pipeline

Once stages 1–8 complete, the response flows back through the gateway, with:

  • The traceparent header propagated for downstream tracing
  • The audit-chain head hash returned in the response metadata
  • Rate-limit headers (X-RateLimit-*) for client back-off

The audit head you receive in the response metadata is verifiable against the chain via MAX::traceability_graph. This is how MIMESIS reconstructs precedent and how MOOT reconstructs cases.

On this page