Router
How the engine looks up a protocol module, caches it, and maps protocol errors to engine errors.
ProtocolRouter lives in engine/core/src/router.rs (177 lines). It owns the runtime registry of Arc<dyn ProtocolModule> instances and dispatches stage 7 of the pipeline.
Signature
impl ProtocolRouter {
pub async fn invoke(
&self,
protocol: &str,
version: Option<&str>,
operation: &str,
input: serde_json::Value,
ctx: &InvokeContext,
) -> CoreResult<serde_json::Value>;
}It returns the unwrapped Response::data. Response::metadata is recorded to the audit pipeline before this returns.
Lookup
let resolved_version = match version {
Some(v) => self.registry.exact(protocol, v).await?,
None => self.registry.best_version(protocol).await?,
};
let module = self.module_cache
.get_or_insert(format!("{}:{}", protocol, resolved_version), || {
self.registry.module(protocol, &resolved_version)
});
module.invoke(operation, input, ctx).awaitThe cache is LruCache<String, Arc<dyn ProtocolModule>> with capacity NonZeroUsize::new(1024).unwrap(). Modules are reference-counted, so eviction is safe — outstanding invocations keep the Arc alive past eviction.
Error mapping
ProtocolModule::invoke returns Result<Response, ProtocolError>. The router maps the protocol-level error into the engine-level CoreError using simple substring heuristics on the error message:
fn map_error(err: ProtocolError) -> CoreError {
let msg = err.to_string();
let lower = msg.to_lowercase();
if lower.contains("adapter") { return CoreError::Internal(msg); }
if lower.contains("timeout") { return CoreError::Timeout(std::time::Duration::from_secs(30)); }
if lower.contains("invalid") { return CoreError::ProtocolError(err); }
if lower.contains("missing capability") { return CoreError::CapabilityDenied { /* ... */ }; }
CoreError::ProtocolError(err)
}This is intentionally simple. Protocols are expected to return structured ProtocolError variants and let the router classify them — not every protocol cares about every error class.
Best-version discovery
ProtocolRegistry::best_version selects the highest stable version registered. Pre-release versions (v1.2.3-rc1) are skipped unless a tenant has the map.{protocol}.pre-release capability. If multiple versions are deployed (canary rollout), the registry routes a percentage of traffic by tenant hash.
Concurrency
ProtocolRouter is Clone + Send + Sync. The engine holds a single Arc<ProtocolRouter> and reuses it across all worker tasks. The LRU cache is wrapped in an RwLock for write-on-miss; reads are lock-free via DashMap for the version registry.
Tests
engine/core/src/tests.rs covers:
- module cache hit/miss semantics
- best-version selection under canary configuration
- error mapping from each
ProtocolErrorvariant - concurrent invocation against the same module (no double-init)