Agent RPC
Agents in separate deployments call each other like typed functions.
The runtime ships a versioned envelope, a producer tool, a consumer
source, and a transport-agnostic RpcTransport interface every broker
plugin implements.
Introduced in v1.1. Frozen surfaces are tagged @since 1.1.0.
At a glance
Producer daemon Consumer daemon
─────────────── ───────────────
tool call source adapter
┌────────────────┐ ┌─────────┐ ┌──────────────────┐
│ RequestAgent ├───────▶│ broker ├───────▶│ agent-inbox │
│ (sync mode) │ │ (Kafka │ │ (decode + route) │
└────────────────┘ │ / NATS │ └────────┬─────────┘
▲ │ / …) │ │
│ └────┬────┘ ▼
│ (response) │ ┌──────────────────┐
└─────────────────────┴────────────▶│ review-pr skill │
│ ctx.respond(…) │
└──────────────────┘
Every hop carries the same correlationId. tenantId is enforced on
both sides.
The envelope
Wire format frozen as AgentRpcEnvelope v1:
interface AgentRpcEnvelope {
version: 1;
kind: 'request' | 'response' | 'event';
messageId: string;
correlationId: string;
causedBy?: string;
from: `agent://${string}`;
to: `agent://${string}`;
capability: string;
replyTo?: `kafka://…` | `nats://…` | `sqs://…` | `amqp://…` | `mqtt://…` | `memory://…`;
deadline?: number; // ms-epoch
tenantId?: string;
headers?: Record<string, string>;
payload: unknown;
auth?: { kind: 'internal' } | { kind: 'hmac'; keyId: string; signature: string };
}
- Strict mode. Unknown top-level fields are rejected. A typo'd field fails closed on the receiver rather than being silently ignored.
- Additive evolution. v1.x adds new optional fields only. Breaking
changes bump to
version: 2. - Canonical HMAC form.
canonicalizeForSigning(envelope)produces the byte string signed byauth.hmac. Theauthfield itself is excluded; key order is deterministic.
Implemented in @declaragent/core/src/rpc/envelope.ts.
Topic convention
Defaults, overridable per-agent:
| Topic | Purpose |
|---|---|
agents.<agent-id>.requests | Durable inbox. Receives request envelopes. |
agents.<agent-id>.responses | Ephemeral replyTo target. |
agents.<agent-id>.events | Optional pub/sub fan-out. |
Multi-tenant extension:
agents.<agent-id>.<tenantId>.requests
agents.<agent-id>.<tenantId>.responses
Per-transport quirks (Kafka partition keys, SQS FIFO, NATS JetStream, etc.) are documented in each transport plugin's README.
Transports
| Kind | Factory | Peer dep | Notes |
|---|---|---|---|
memory | createMemoryTransport | — | In-process only. Used by the rpc-agents template + unit tests. |
kafka | createKafkaTransport | kafkajs@^2 | One consumer per topic; groupId narrowed per topic so parallel subscriptions don't share rebalance coordination. |
nats | createNatsTransport | nats@^2 | Core-NATS pub/sub (not JetStream). queueGroup opts subscriptions into shared delivery (Kafka-groupId equivalent). |
NATS transport
import { createNatsTransport } from '@declaragent/plugin-agent-rpc';
const transport = await createNatsTransport({
servers: ['nats://nats.internal:4222'],
clientName: 'declaragent-pr-reviewer',
// queueGroup: 'pr-reviewer-workers', // opt-in load-balanced delivery across replicas
// subjectPrefix: 'tenant1', // namespace every publish/subscribe
});
- Dynamic peer dep.
natsis loaded viaimport('nats')at construction time. If it isn't installed, the factory throws a typed error pointing the operator atnpm install nats. - Subject naming. The
topicargument is used verbatim as a NATS subject — so theagents.<id>.requestsconvention carries over. Dots are NATS-hierarchy separators; callers supplying Kafka-style topics with slashes or colons will get a subject NATS accepts but can't be matched with wildcards. - Queue groups. When
queueGroupis set, every subscription joins that queue — replicas load-balance the same way Kafka consumer groups do. Omit it for full fan-out. - No JetStream. Durable streams + consumer state belong to
@declaragent/source-nats(the ingress adapter). The RPC transport is deliberately core-NATS for low latency.
RequestAgent tool
# agent.yaml
tools:
defaults:
- RequestAgent
// From a skill:
const { status, response, correlationId, latencyMs, error } =
await RequestAgent({
to: 'agent://pr-reviewer',
capability: 'review-pr',
payload: { prUrl: '…' },
timeoutMs: 60_000, // default 30_000; clamped to [1, 600_000]
mode: 'sync', // or 'async' | 'fire-and-forget'
});
| Mode | Behavior |
|---|---|
sync (default) | Publishes the request, registers a pending entry, awaits a matching response. Returns { status: 'ok' | 'error' | 'timeout' | 'abandoned' | 'busy', … }. |
async | Publishes and returns { status: 'ok', correlationId } immediately. Response lands as an agent.rpc.response event on the local bus. |
fire-and-forget | Publishes an event-kind envelope. No replyTo, no correlation. |
Permission key: RequestAgent:<to>/<capability>. Glob-matched, so
operators can scope calls:
permissions:
allow:
- "RequestAgent:agent://pr-reviewer/*"
deny:
- "RequestAgent:agent://billing-bot/*"
agent-inbox source
# event-sources.yaml
- type: agent-inbox
config:
id: inbox
agentId: pr-reviewer
# Defaults to `agents.<agentId>.requests` / `.responses`.
# requestsTopic: agents.pr-reviewer.requests
# responsesTopic: agents.pr-reviewer.responses
# eventsTopic: agents.pr-reviewer.events # optional
delivery:
mode: at-least-once
ackStrategy: after-dispatch
idempotency:
strategy: transport-natural
store: sqlite
ttlMs: 900000
limits:
concurrency: 4
maxInflight: 64
Internally, the adapter:
- Decodes + Zod-validates the envelope. Failure → DLQ.
- Verifies
envelope.tenantIdagainst the local bus scope (multi-tenant). Mismatch → audit + drop. - Verifies
envelope.authwhen the agent's config requires it. - Dispatches by envelope
kind:request→ publishAgentEvent { target: { type: 'skill', name: capability } }.response→ settle the producer-side pending-RPC registry.event→ broadcast on the local bus.
ctx.respond — the reply hook
When a skill is invoked via an RPC request, the engine wires
ctx.respond onto the ToolContext:
await ctx.respond?.({
ok: true,
data: { verdict: 'comment', findings: [/* … */], summary: '…' },
});
// or:
await ctx.respond?.({
ok: false,
error: { code: 'EINVAL', message: 'prUrl is required' },
});
- Default hook. Skip
ctx.respondand the runtime publishes{ ok: true, data: assistant.final.content }automatically on turn-end. REPL-style skills "just work" over RPC. - Streaming.
ctx.respondis idempotent per correlationId — multiple calls produce successive response-kind envelopes, useful for progress updates.
Discovery — capabilities.yaml
Optional per-agent declaration of the RPC surface. Operators use it to document the API; future (v1.2) registry aggregation pulls from it.
version: 1
agent: agent://pr-reviewer
transports:
- kind: kafka
brokers: ["${env:KAFKA_BROKERS}"]
topics:
requests: agents.pr-reviewer.requests
responses: agents.pr-reviewer.responses
capabilities:
- name: review-pr
description: "Review a GitHub pull request and emit structured findings."
timeoutMs: 60000
idempotent: true
since: "1.1.0"
inputSchema:
type: object
properties:
prUrl: { type: string }
required: [prUrl]
outputSchema:
type: object
properties:
verdict: { enum: [approve, request-changes, comment] }
findings: { type: array }
summary: { type: string }
Peer table — rpc-peers.yaml
Producer-side routing. Resolves agent://<id> to concrete transport +
topic.
version: 1
peers:
- agent: agent://pr-reviewer
transports:
- kind: kafka
brokers: ["${env:KAFKA_BROKERS}"]
topics:
requests: agents.pr-reviewer.requests
- agent: agent://translator
transports:
- kind: nats
servers: ["nats://nats.internal:4222"]
subjects:
requests: agents.translator.requests
Inspect at runtime:
declaragent rpc peers # print the effective peer table
declaragent rpc peers --verify # live-ping every peer's inbox
declaragent rpc capabilities # print this agent's capabilities
Security
| Mode | When to use |
|---|---|
auth: { kind: 'internal' } | Intra-cluster deployments that trust the bus scope + broker ACLs. Default. |
auth: { kind: 'hmac', keyId, signature } | Shared-secret envelope integrity. Receiver canonicalizes via canonicalizeForSigning and compares SHA-256. |
auth: { kind: 'oidc', token, keyId? } | Bearer JWT from an enterprise OIDC IdP (Okta / Azure AD / Auth0 / Dex). Verified against a cached JWKS. |
auth: { kind: 'oauth2-client', token, scope? } | OAuth2 Client-Credentials access token. For service-to-service calls where no user is present. |
Agents that require signed envelopes declare it in agent.yaml:
rpc:
auth:
required: hmac
keysRef: ${secret:vault:kv/acme/rpc-keys}
mTLS / SPIFFE are transport-layer concerns — each transport plugin
exposes a validateAuth(envelope, raw, transportCtx) hook.
Authenticated RPC (OIDC / OAuth2) — since 1.2.0
Enterprise deployments usually require every inter-agent RPC to carry a verifiable bearer token rather than relying on trusted-network assumptions. Declaragent ships two providers for this:
- OIDC (
@declaragent/plugin-agent-rpc→createOidcAuthProvider) — validates a JWT signed by an OIDC issuer's JWKS. SupportsRS{256,384,512},PS{256,384,512},ES{256,384,512}.alg: noneis always rejected. JWKS is fetched from/.well-known/openid-configuration(or a pinnedjwksUri) and cached for 5 minutes. - OAuth2 Client-Credentials (
createOAuth2ClientAuthProvider) — mints an access token viagrant_type=client_credentials, caches it untilexpires_in - 30s, and verifies incoming tokens via the same JWKS path.
Configure a receiver's peer with an auth block in rpc-peers.yaml:
version: 1
peers:
- agent: agent://peer-b
transports:
- kind: kafka
brokers: ["kafka.internal:9092"]
topics: { requests: agents.peer-b.requests }
auth:
provider: oidc
issuer: "https://dex.example.com"
audience: "declaragent-peer-b"
jwksUri: "https://dex.example.com/keys" # optional; derived from issuer when omitted
scopes: ["rpc:invoke"] # strict AND — every scope must be present
- agent: agent://peer-c
transports:
- kind: kafka
brokers: ["kafka.internal:9092"]
topics: { requests: agents.peer-c.requests }
auth:
provider: oauth2-client
tokenEndpoint: "https://idp.example.com/oauth2/token"
clientId: "decl-agent-a"
clientSecretRef: "secret://platform/decl-agent-a-client-secret"
scopes: ["rpc:invoke"]
Rejection semantics. Envelopes that fail verify are routed to the
local DLQ under kind=auth-rejected with a typed reason code:
missing-auth— envelope has noauthblock.wrong-kind—auth.kinddoesn't match the peer's configured provider.malformed-token— JWT parse failed.alg-none— token advertisedalg: none.expired/not-yet-valid— outsideexp/iat/nbfwith a 60s clock-skew tolerance.wrong-issuer/wrong-audience— claim mismatch.insufficient-scope— one or more required scopes missing.idp-unreachable— JWKS fetch failed AND cache is empty / expired. Fail-closed — we never admit a token we can't verify.bad-signature— JWKS signature verify failed.
Audit. Every verify decision (accept or reject) emits an
auth_check record on the hash-chained audit log. Inspect via
declaragent audit list --kind auth_check; each row carries
peerId, provider, decision, subject (when accepted), and
reason (when rejected).
Secret handling. clientSecretRef is resolved through the
Declaragent secret pipeline (secret://, vault:, aws-sm:, …).
The raw value is never inlined into the config or logged.
Operational notes.
- JWKS is cached for 5 minutes. Graceful stale-fallback during IdP outage is a follow-up; the current policy fails closed if the cache expires while the IdP is unreachable.
- Tokens are bound to the peer, not to envelope content, so HMAC
canonicalization (
canonicalizeForSigning) continues to exclude theauthfield. Rotating a token does not invalidate any in-flight envelopes. - Unit coverage:
packages/plugin-agent-rpc/src/auth/oidc.test.tsandoauth2-client.test.ts. Integration:packages/testkit/src/fleet-integration/rpc-auth.test.ts(gated byRPC_AUTH_INTEGRATION=1, usesdexunder docker-compose).
Error codes
See Troubleshooting → error codes for the full list. The RPC-specific codes:
EAGENTRPC_TIMEOUT— sync-mode deadline elapsed before a response.EAGENTRPC_ABANDONED— daemon shutdown or connection loss.EAGENTRPC_BUSY— pending-RPC registry at capacity.EAGENTRPC_NO_PEER—agent://<id>not inrpc-peers.yaml.EAGENTRPC_NO_TRANSPORT— resolved transport kind has no plugin.
Related
- Cookbook → Agent RPC — walkthrough with the
rpc-client+rpc-servertemplates. - Reference → extensions —
@declaragent/plugin-agent-rpcand per-transport plugins. docs/AGENT_RPC_PLAN.mdin the repo — design doc.