# Agent Engine — AI Agent Instructions

A programmable observability investigation engine. Instead of fixed endpoints, you write
small pipeline programs from composable operators that execute server-side across metrics
and logs backends.

**Base URL:** `http://<host>:8090` (or `https://app.oliverdb.ai` for the hosted demo)
**Content-Type:** `application/json` on all requests
**Required:** `tenant_id` on all write/execute requests

**This doc is also served live, version-locked to the running binary.** A fresh
agent landing on a new cluster can bootstrap before fetching credentials:

| Endpoint | Auth | Returns |
|---|---|---|
| `GET /help` | public | `text/markdown` — this document, embedded in the binary at compile time via `include_str!()` so it can never drift from the running code |
| `GET /help/operators` | public | `application/json` — the same `parser::operator_catalog()` the executor uses at runtime; single source of truth for operator names + parameter shapes |
| `GET /help/version` | public | `application/json` — `{service, version, operator_count, auth, tenants_url}`. The `auth` block is **live runtime state** — see "Discover live auth state first" below |
| `GET /help/tenants` | public | `application/json` — live tenant list aggregated from logs-writer + metrics-writer, with per-tenant partition / record / sample counts. Cached 60s. Use this to discover which `tenant_id` values have data without guessing |
| `GET /help/ingest` | public | `text/markdown` — integrator-facing ingest reference. How to POST logs/metrics into the cluster, mapping CRUD, conventional field-name detection, OTel + custom shapes. This document covers READING data — `/help/ingest` covers WRITING it. |

### Discover live auth state first

The `/help/version` response carries the **current** auth posture so
agents don't have to assume from the docs (which describe the design,
not the live config). Always check it before composing your first
authenticated call:

```bash
curl -s "$BASE/help/version" | jq '.auth'
# {
#   "disabled": true,                       <-- AUTH_DISABLED is on right now
#   "anonymous_routes": [                   <-- these accept no Authorization header
#     "POST /v1/agent/execute",
#     "POST /v1/agent/execute_batch",
#     "POST /v1/agent/validate_program",
#     "POST /v1/agent/explain_plan"
#   ],
#   "admin_routes_require": "Authorization: Bearer <BOOTSTRAP_ROOT_KEY>",
#   "dashboard_root_requires": "Authorization: Basic or Bearer (validate_strict ignores AUTH_DISABLED)",
#   "public_routes": ["GET /health", "GET /help", ...]
# }
```

If `auth.disabled == true`, you can skip the bearer header on the
four routes listed in `anonymous_routes`. Admin routes still require
a real root-key bearer regardless of the flag — the dev escape hatch
deliberately doesn't unlock `/v1/admin/*`.

### Discover live tenant list

Don't guess `tenant_id`. The writers know which tenants have data
(they keep a per-tenant partition map for routing) and `/help/tenants`
fans out to them, merges by tenant_id, and returns the union with
per-source stats. Cached 60s; safe to poll cheaply.

```bash
curl -s "/help/tenants" | jq '.tenants[] | {id, has_metrics: (.metrics != null), has_logs: (.logs != null)}'
# {
#   "tenants": [
#     {
#       "id": "demo",
#       "metrics": { "partition_count": 10, "sample_count": 19211721 },
#       "logs":    { "partition_count": 4,  "record_count": 8456 }
#     }
#   ],
#   "total_count": 1,
#   "sources": ["logs-writer", "metrics-writer"],
#   "cached_at": "...",
#   "cache_ttl_sec": 60
# }
```

Tenants only show up here if they have **active in-memory state** on
at least one writer. A tenant that has only S3-flushed historical data
(no current ingest) won't appear — use the dashboard's tenant picker
or query `agent.scan` directly if you suspect that case.

---

## Auth

### Public routes (no credentials required)

`/health`, `/healthz`, `/readyz`, `/help`, `/help/operators`,
`/help/version`, `/help/ingest`, and `/share/v/:token`
(visualization share links — the HMAC signature in the URL *is* the
authn). The root path `/` redirects agents (any client whose `Accept`
header does not include `text/html`) with `307 → /help` — that's the
discovery entry point.

### Bearer token

Every other endpoint requires `Authorization: Bearer <your-key>`. A
request without a valid bearer returns `401 Unauthorized`. Examples
below elide the header for brevity; **add it to every request**:

```bash
curl -s -X POST "$BASE/v1/agent/execute" \
  -H "Authorization: Bearer $OLIVER_KEY" \
  -H 'Content-Type: application/json' \
  -d '{ ... }'
```

### Routes you can call

| Method | Path | Purpose |
|---|---|---|
| POST | `/v1/agent/execute` | Run an agent program |
| POST | `/v1/agent/execute_batch` | Run multiple programs in parallel |
| POST | `/v1/agent/validate_program` | Syntax-check a program without running |
| POST | `/v1/agent/explain_plan` | Show plan stages without running |

That's the entire surface. Everything you might want — reading any
pipeline, writing investigation context, querying code-index —
happens **inside a program** via the Operator Catalog further down.
Any other route returns `403 Forbidden`.

---

## Core API

### POST /v1/agent/execute

Run a pipeline program.

**Request:**

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "logs.raw", "params": {"body_contains": "error timeout", "severity": "ERROR", "last": "15m"}},
      {"op": "where", "params": {"expr": "severity == 'ERROR'"}},
      {"op": "group_by", "params": {"by": "service_name"}},
      {"op": "top_n", "params": {"by": "count", "n": 5}}
    ]
  },
  "timeout_ms": 30000,
  "max_signals": 10000
}
```

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `tenant_id` | string | yes | — | Tenant identifier |
| `program` | object | no | — | Pipeline program (required if `template` is not set) |
| `template` | string | no | — | Template ID to resolve (alternative to inline program) |
| `params` | object | no | `{}` | Parameters for template resolution (key-value pairs) |
| `timeout_ms` | u64 | no | 30000 | Max wall-clock time in ms |
| `max_signals` | usize | no | 10000 | Hard cap on signal count per stage |

**Template-based execution** — instead of writing an inline program, reference a template:

```json
{
  "tenant_id": "tenant-00001",
  "template": "my-error-scan",
  "params": { "service": "payments-api", "window": "2h" }
}
```

Templates are programs with `{{placeholder}}` parameters resolved at execution time. See [Templates](#templates).

**Response:**

```json
{
  "signals": [ ... ],
  "signal_count": 42,
  "stages": [
    {"op": "logs.raw", "input_count": 0, "output_count": 87, "wall_time_ms": 120},
    {"op": "where", "input_count": 87, "output_count": 42, "wall_time_ms": 1}
  ],
  "wall_time_ms": 125
}
```

If the pipeline hit a limit, `exceeded_limit` will be set to `"timeout"`, `"max_signals"`, or `"error: <message>"`.

---

### POST /v1/agent/execute_batch

Run N programs in parallel and return one result per program in the
same order. Useful when you want to compare several candidate programs
or fan out to multiple tenants in one round-trip. Inline programs only
— template resolution isn't supported in the batch path; resolve
templates client-side and submit the resulting programs here.

**Request:**

```json
{
  "tenant_id": "tenant-00001",
  "programs": [
    {"steps": [{"op": "metrics.timeseries", "params": {"name": "cpu.usage", "last": "5m"}}]},
    {"steps": [{"op": "logs.raw", "params": {"severity": "ERROR", "last": "5m", "limit": 100}}]},
    {"steps": [{"op": "logs.aggregate", "params": {"type": "count_by", "field": "service_name", "last": "5m"}}]}
  ],
  "timeout_ms": 30000,
  "max_signals": 10000
}
```

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `tenant_id` | string | yes | — | Applied to every program in the batch unless the program overrides |
| `programs` | array | yes | — | Non-empty list of inline programs. Max 20 steps each |
| `timeout_ms` | u64 | no | 30000 | Wall-clock cap applied **per program** (not per batch) |
| `max_signals` | usize | no | 10000 | Signal cap applied **per program** |
| `trigger` | string | no | `"batch"` | Audit label for the activity feed |

**Response** — one entry per input program, in input order:

```json
{
  "entries": [
    {"index": 0, "result": {"signals": [...], "signal_count": 12, "stages": [...], "wall_time_ms": 38}},
    {"index": 1, "result": {"signals": [...], "signal_count":  3, "stages": [...], "wall_time_ms": 21}},
    {"index": 2, "error": "parse error: Unknown operator 'logz.raw' at step 0"}
  ],
  "total_wall_time_ms": 52
}
```

| Field | Description |
|---|---|
| `entries[].index` | Position in the input `programs` array (preserved even though programs run concurrently) |
| `entries[].result` | Same shape as `/v1/agent/execute`'s response. Set when parse + execute completed (may still carry `exceeded_limit` if the program hit timeout/max_signals at runtime) |
| `entries[].error` | Set instead of `result` when the program failed before/during parse (unknown op, bad first step, etc.) |
| `total_wall_time_ms` | Longest of the per-program wall-times — programs run concurrently inside the server |

---

### POST /v1/agent/validate_program

Pre-flight a program — parse it, optionally run it once with a small
limit, and report any errors. The endpoint runs through the same
`parse_program()` the production execute path uses, so a passing
validation guarantees `/v1/agent/execute` will accept the same payload.

Useful for LLM agents that compose programs and want to verify the
shape before spending tokens on a real run.

**Request:**

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "logs.raw", "params": {"severity": "ERROR", "last": "15m", "limit": 50}},
      {"op": "group_by", "params": {"by": "service_name"}}
    ]
  },
  "parse_only": false,
  "timeout_ms": 5000,
  "expected_min_signals": 1
}
```

| Field | Type | Required | Default | Description |
|-------|------|----------|---------|-------------|
| `tenant_id` | string | yes | — | Tenant for the dry-run |
| `program` | object | yes | — | Inline program (templates not supported here) |
| `parse_only` | bool | no | `false` | Skip the dry-run; only return parse_ok/parse_errors |
| `timeout_ms` | u64 | no | 5000 | Per-program timeout for the dry-run (capped by server config) |
| `expected_min_signals` | usize | no | 0 | If `signal_count < expected_min_signals`, `exec_ok` is reported `false` |

**Response (parse failure):**

```json
{
  "parse_ok": false,
  "parse_errors": ["Step 1: unknown op 'logz.raw'", "Step 2: 'where' requires 'expr' parameter"]
}
```

**Response (parse + dry-run success):**

```json
{
  "parse_ok": true,
  "exec_ok": true,
  "signal_count": 4,
  "wall_time_ms": 38,
  "stage_summary": [
    "logs.raw: 0->50 (32ms)",
    "group_by: 50->4 (6ms)"
  ]
}
```

`exec_error` populates only when the dry-run failed at runtime (e.g.
upstream service unavailable). `exec_ok=false` with no `exec_error`
means the program parsed but `signal_count` was below
`expected_min_signals`.

---

### POST /v1/agent/explain_plan

Dry-run: shows what each stage would do without executing. Same request body as `/execute`.

**Response:**

```json
{
  "stages": [
    {"op": "logs.raw", "estimated_signals": 100, "data_sources": ["logs-query"]},
    {"op": "where", "estimated_signals": 100}
  ],
  "total_data_sources": ["logs-query"],
  "estimated_wall_time_ms": 100
}
```

---

## Templates

Parameterized program templates with `{{placeholder}}` syntax. Reference
a template by ID in `/v1/agent/execute` instead of writing the program
inline. The substitution fields are supplied in the top-level `params`
object (NOT `template_params` — serde silently drops the wrong field
name and runs the template with placeholder defaults, which often
produces zero signals):

```json
{
  "tenant_id": "tenant-00001",
  "template": "my-error-scan",
  "params": {"service": "payments", "window": "2h"}
}
```

There are no built-in templates. Create custom templates out-of-band via the
template CRUD API; their IDs are prefixed `tpl-`. Reference one only after it
exists for your tenant — an unknown ID resolves to nothing.

### Parameter Types

| Type | Description | Example |
|------|-------------|---------|
| `string` | Any string value | `"payments"` |
| `number` | Numeric value | `"3.0"` |
| `duration` | Time duration | `"1h"`, `"30m"`, `"7d"` |
| `bool` | Boolean | `"true"` |

---

## Pipeline Language

A **program** is a JSON object with a `steps` array. Each step has an `op` (operator name) and `params` (operator-specific parameters).

### Rules

1. Programs must have **1-20 steps**
2. **Step 0 must be a source operator** (fetches data from a backend)
3. Source operators **cannot appear after step 0** (except `recall`)
4. `recall` is allowed at any position (replaces pipeline signals from a stashed snapshot)
5. **Filter at source for rare values, not downstream.** Source operators apply `severity` / `service` / `body_contains` / `body_regex` / `attributes` filters *before* `limit`. A composition like `logs.raw {limit:1000} → where severity=="ERROR"` will frequently return 0 hits because the writer caps at 1000 most-recent records (which are mostly INFO/DEBUG at typical rates) before `where` ever runs. Push the `severity:"ERROR"` parameter onto the source instead.
6. **`where` operates on `signal.fields` (flat map), NOT `signal.payload`.** Aggregation results carry their `count` / `group_key` in `payload`. To filter on those, insert a `values` step first to flatten payload into top-level fields. See [Field promotion](#field-promotion) below.

### Execution Model

Operators execute **sequentially in batch mode**. Each stage receives the previous stage's output signals as input. The pipeline halts early on timeout, max_signals exceeded, or error.

### Duration Format

Many operators accept time windows. Supported suffixes:

| Suffix | Meaning | Examples |
|--------|---------|---------|
| `m` | minutes | `5m`, `15m`, `30m` |
| `h` | hours | `1h`, `6h`, `24h` |
| `d` | days | `1d`, `7d` |

### Signal shape

Every signal has three layers and `where` only sees the middle one:

```
signal {
  source, timestamp, tenant_id, name        ← top-level (NOT visible to where)
  payload: { type, body, count, group_key, … }   ← typed payload (NOT visible to where)
  fields: { severity, service_name, … }     ← FLAT MAP — this is what where reads
}
```

#### Field promotion

Each source operator decides which payload keys it copies into `fields`. As a rule:

- `logs.raw` promotes `body, severity, service_name, template_id, trace_id, span_id, host_id, attr.*, tenant_id, source, name`. **Not** `severity_number` (still in payload).
- `metrics.*` promotes `metric_name, last_value, sample_count, mean, std, labels.*, tenant_id`.
- `logs.aggregate` promotes only the *grouping field* (e.g. `service_name` for a `count_by` on service). `count` and `group_key` stay in payload.
- `logs.facets` promotes `field, value` derived from the facet key.

To filter or sort on a payload field, insert `values` (flattens `payload.*` into `fields.*`) — then the field is visible to subsequent `where`/`sort`/`top_n`.

```json
// Filter aggregate results by count: NOT THIS — `where` gets count=undefined → false
[{"op":"logs.aggregate","params":{"type":"count_by","field":"service_name"}},
 {"op":"where","params":{"expr":"count > 5000"}}]

// Filter aggregate results by count: THIS WORKS
[{"op":"logs.aggregate","params":{"type":"count_by","field":"service_name"}},
 {"op":"values","params":{}},
 {"op":"where","params":{"expr":"count > 5000"}}]
```

### Aggregation signal shapes

`logs.aggregate` emits one signal per group/bucket. The `name` field encodes the bucket key:

| `type` | `signal.name` | Example |
|---|---|---|
| `count` | `"log_count"` | one signal, `payload.count = total` |
| `count_by` | the grouping value | `"game-engine"` with `payload.count = 20179` |
| `count_by_time` | `"log_count:<RFC3339-bucket>"` | `"log_count:2026-05-03T01:47:00Z"` |
| `count_by_time_and_field` | `"<RFC3339-bucket>:<field-value>"` | `"2026-05-03T01:47:00Z:debug"` (colon-separated composite) |

For `count_by_time_and_field`, both pieces are also exposed in `fields.bucket` and `fields.<your-grouping-field>` if you need to filter by either independently — no parsing required.

**Severity case:** `severity` values in `count_by` / `count_by_time_and_field` results are normalized to lowercase (`debug`, `info`, `warn`, `error`) — write `where severity == "debug"`, not `"DEBUG"`. Other categorical fields (`service_name`, `host_id`) preserve their as-ingested case.

---

## Operator Catalog

> **76 operators** (39 sources + 37 transforms/sinks). This catalog is
> autogenerated from `parser::operator_catalog()` in
> `agent-engine/src/parser.rs`, which is the **single source of truth**
> the executor uses at runtime. To regenerate after adding ops:
>
> ```bash
> BASE=http://localhost:8090 OLIVER_KEY=$KEY \
>   ./agent-engine/scripts/gen_operator_catalog.sh
> ```

### Sources (step 0 only)

Sources fetch data from a backend; they accept no input. Step 0 of every program **must** be a source.

#### `fork`

Run N sub-programs in parallel from one program and merge their outputs into a single signal stream. Use `fork` when you have independent queries that don't feed into each other — e.g. one investigation looking at metrics + logs + recent deploys simultaneously, in one tool call, in one LLM turn.

Each branch is itself a program (`{steps: […]}`); the first step of each branch must be its own source. Output signals carry `fields.branch_idx` (and `fields.branch_label` if you named the branch) so downstream `where` / `group_by` / `sort` can distinguish them.

Branches that hit a runtime error don't fail the fork — they emit a single marker signal with `fields.is_error_marker = true` and `fields.branch_error = "..."`. The other branches' results land normally.

**Constraints**
- `fork` is itself a source — must be step 0 of the outer program
- Max **64 branches** per fork
- Output is a mixed-type stream — most downstream ops handle this, but if a transform requires a specific `SignalType`, filter first via `where signal_type == "..."` or `where branch_idx == N`

**Params:** `branches (required — array of {label?, steps[…]})`

**Example — 3 parallel investigation lenses on `checkout-api`:**

```json
{"op": "fork", "params": {
  "branches": [
    {"label": "metrics", "steps": [
      {"op": "metrics.timeseries", "params": {"query": "avg:flat.request.duration{service:checkout-api} by {http.method}", "last": "1h"}}
    ]},
    {"label": "logs", "steps": [
      {"op": "logs.aggregate", "params": {"type": "count_by", "field": "severity", "service": "checkout-api", "last": "1h"}}
    ]},
    {"label": "deploys", "steps": [
      {"op": "agent.scan", "params": {"last": "1h", "agent_id": "deploy-watcher"}}
    ]}
  ]
}}
```

After this `fork`, the next step in the outer program receives all three branches' signals as one mixed-type stream. To process them separately, filter by `branch_label`:

```json
{"op": "where", "params": {"expr": "branch_label == 'logs'"}}
```

For the cross-branch synthesis pattern (look at all results together), use `agent.write_context` or `summarize` as the next step.

#### `metrics.list`
Discover available metric names

**Params:** `prefix?, limit?`

#### `metrics.labels`
Discover distinct values of a given label on a given metric. Backs onto
`/v1/metrics/labels` in metrics-query — walks the memory + SSD tiers (S3
not scanned because label introspection there would mean fetching every
parquet sidecar). Use this before composing `metrics.timeseries` filters
so the values aren't guessed.

Returns one signal per distinct value, with `metric`, `label`, and
`value` promoted to `fields` so downstream `where`/`sort`/`take` apply
without a `values` step.

**Params:** `metric (required), label (required), limit?`

Example — list distinct HTTP methods on a metric:
```json
{"op":"metrics.labels","params":{"metric":"flat.request.duration","label":"http.method"}}
```

#### `metrics.timeseries`
Time-bucketed metric values via DogQL (tenant_id auto-injected, step auto-computed)

**Params:** `query (required), last?, step? (auto-computed from time range), raw? (bool, default false)`

##### Read freshness — default rollups vs `raw`

By default this reads the **per-minute `.agg` rollups** (the pushdown path), which is fast but means the **current, un-rolled minute can lag up to ~1m** — a metric you ingested seconds ago may not appear until its minute closes and rolls.

Set **`raw: true`** to force the raw-sample path instead: it reads the **memory + accumulator + SSD + S3** tiers and unions them, so just-ingested samples in the current minute are visible **immediately**. Trade-off: it scans raw rows instead of precomputed buckets, so it's slower / heavier — use it when freshness matters more than cost (e.g. confirming an ingest just landed), not for wide historical windows.

```json
{"op": "metrics.timeseries", "params": {"query": "avg:checkout.latency_ms{*}", "last": "5m", "raw": true}}
```

##### DogQL cheat sheet

The exact filter syntax this implementation accepts. Bitmap-indexed; selective filters scan only matching rows, not the full series.

| Form | Syntax | Example |
|---|---|---|
| Aggregator (required) | `<agg>:` | `avg:`, `sum:`, `min:`, `max:`, `count:` |
| Equality | `{key:value}` | `{service:checkout-api}` |
| Slash values (URL paths) | `{key:/path/with/slashes}` | `{endpoint:/api/checkout}` |
| Quoted values (spaces, commas, braces) | `{key:"value"}` | `{label:"a, b"}` |
| Negation (5 equivalent forms) | `{key:!val}`, `{!key:val}`, `{-key:val}`, `{NOT key:val}` | `{NOT method:GET}` |
| Wildcard (prefix only) | `{key:prefix*}` | `{endpoint:/api/*}` |
| OR / IN | `{key:(v1 OR v2)}` — parens, **`OR` keyword (not pipe)** | `{method:(GET OR POST)}` |
| Multi-filter AND | `{k1:v1, k2:v2}` — comma-separated | `{method:GET, kind:http}` |
| `group_by` | `... by {label1, label2}` | `avg:cpu{*} by {host}` |
| Modifiers | `.rollup(agg, secs)`, `.rate()`, `.abs()` | `sum:errors{*}.rate()` |

**Won't work — common mistakes** (parse error or zero results):

- SQL `IN`: `{method IN (GET,POST)}` → use `{method:(GET OR POST)}`
- Brackets: `{method IN [GET,POST]}` → use `(... OR ...)`
- Pipe-OR: `{method:GET|POST}` → parses as the literal value `GET|POST`
- Suffix/contains wildcards: `{method:*ET}`, `{method:*ET*}` → only trailing `*` works
- Negated IN: `{!method:(GET OR POST)}` → no `NotIn` op exists; combine multiple NotEqs (`{!method:GET, !method:POST}`) for the same effect
- SQL keywords `WHERE`, `AS`, `LIMIT` — use program steps instead
- Spaces around the colon: `{method : POST}` → drop the spaces
- Single-quoted values: `{method:'POST'}` → use double quotes

**OR across different labels** (`method=GET OR path=/admin`) isn't a single-query construct — fan out into N parallel `metrics.timeseries` queries client-side and merge.

#### `logs.raw`
Raw log records with filters. Routes to `logs-query`, scans every
tier (memory → SSD → S3 if enabled), and uses the full set of
sidecar indexes (Tantivy for body FTS, term for severity/service,
sparse for trace_id/span_id, bloom for file-skip). **This is the
canonical FTS path** — pass body text to `body_contains` for
indexed full-text matching across all tiers.

For regex matching, use `body_regex` (case-insensitive by default;
opt into case-sensitivity with `body_regex_case_sensitive: true`).
ANDed with `body_contains` if both are set. Guardrails: pattern
≤ 512 bytes; no leading `.*` / `.+`; at least one of
`service` / `host_id` / `attributes` / `severity` required; time
window ≤ 24h. Returns 400 with a `regex_*` error code on violation.

**Params:** `severity?, service?, body_contains?, body_regex?, body_regex_case_sensitive?, trace_id?, host_id?, attributes?, source?, last?, limit?`

#### `agent.scan`
Read existing .aiagent index files from S3 with optional filtering

**Params:** `last?, bucket?, query?, agent_id?, risk_level?, has_anomalies?, metric?, service?, min_z_score?`

#### `agent.diff`
Compare anomalies between two time windows — emits new, resolved, escalated, improved

**Params:** `window_a (required), window_b (required), bucket?`

#### `agent.timeline`
Emit one signal per time bucket for a metric or service

**Params:** `metric?, service? (at least one required), last?, bucket?`

#### `agent.gc`
Garbage-collect old .aiagent files

**Params:** `max_age?, bucket?`

#### `logs.aggregate`
Distributed log aggregation across all tiers (count, count_by, count_by_time, count_by_time_and_field). Accepts the full logs-query filter set — same shapes as logs.raw, including `body_regex` with identical guardrails.

**Params:** `type? (count|count_by|count_by_time|count_by_time_and_field), field?, interval_secs?, last?, severity? (string|string[]), service? (alias service_name; string|string[]), body_contains?, body_regex?, body_regex_case_sensitive?, trace_id?, host_id? (string|string[]), attributes? ({k:v}), source? (string|string[])`

#### `logs.facets`
Efficient field value distributions from index-only scans

**Params:** `fields? (default: [severity, service_name]), last?, severity?, service?, limit?`

#### `code.search`
Lexical (ripgrep) search across the indexed workspace

**Params:** `query (required), workspace?, language?, limit?, case_insensitive?, fixed_string?, whole_word?, context? (lines, 0-10)`

#### `code.define`
Symbol definition lookup by name — returns CodeSymbol signals

**Params:** `name (required), workspace?, kind?`

#### `code.symbols`
List symbols in the index with filters

**Params:** `workspace?, kind?, name? (prefix), language?, limit?`

#### `code.callers`
Upstream call chain — who calls this symbol (bounded recursive walk). Pass symbol_id when chaining from another code.* operator (no name ambiguity); symbol_name for ad-hoc lookup.

**Params:** `symbol_id OR symbol_name (one required), workspace?, max_hops?`

#### `code.callees`
Downstream call chain — what this symbol calls. Pass symbol_id when chaining (precise); symbol_name for ad-hoc lookup.

**Params:** `symbol_id OR symbol_name (one required), workspace?, max_hops?`

#### `code.refs`
Edge references pointing at a symbol_id

**Params:** `symbol_id (required), edge_type?, limit?`

#### `code.deps`
File-level import graph — upstream or downstream dependencies of a file

**Params:** `file (required), workspace?, direction? (downstream|upstream)`

#### `code.symbol_at_line`
Enclosing symbol at (file, line). Maps a log/stacktrace location back to its function.

**Params:** `path (required), line (required), workspace?`

#### `code.sg`
Structural pattern search (ast-grep-style): patterns are written in the target language with $X / $$$ARGS metavariables. Returns matches with bindings + enclosing symbol.

**Params:** `language (required), pattern (required), workspace?, files? (rel_paths), limit?`

#### `code.cfg`
Per-function control-flow graph. Returns blocks (with kind/lines/statements) and successor edges (with condition labels). One signal per call, with the CFG attached in payload context. symbol_id is precise (chain-friendly); path+(line|symbol_name) is the ergonomic ad-hoc form.

**Params:** `symbol_id OR path+(line|symbol_name) (one required), workspace?, language?`

#### `code.dead`
Dead-code detection — unreachable basic blocks (intra-procedural CFG walk) and unused private/internal functions (call-graph holes). Each finding becomes one signal with kind=unreachable_block|unused_function.

**Params:** `workspace?, kinds? (subset of [unreachable_block, unused_function]), limit?, extra_skip_names? (project-specific patterns to skip)`

#### `code.effects`
Side-effect inference: returns direct + transitive effects (io.network, io.disk, io.process, blocking, nondeterminism, panic, unsafe, eval, crypto, unknown). Each effect carries a 'via' call chain. has_unknown flags soundness gaps from unresolved calls. extra_sinks lets callers add project-specific effect-bearing functions at query time.

**Params:** `symbol_name? OR symbol_id? (one required), workspace?, extra_sinks? ({tag:[name]}), max_depth?`

#### `code.callsites`
Find every call site by qualified name (e.g. 'os.Open'), prefix ('crypto/'), or bare last-identifier ('Open'). Surfaces external-API usage that code.refs can't see (refs requires a symbol_id; stdlib/3rd-party calls don't have one indexed). Filters: language, file_prefix, resolved (resolved=in-workspace, unresolved=external).

**Params:** `dst_qualname OR dst_qualname_prefix OR dst_name (one required), workspace?, language?, file_prefix?, resolved? (resolved|unresolved), edge_type? (default 'calls'), limit?`

---

### Filters

#### `where`
Expression-based filter

**Params:** `expr (required) — e.g. "z_score > 3.0 and event_type == 'Spike'"`

#### `take`
Keep first N signals

**Params:** `n (required)`

#### `skip`
Skip first N signals

**Params:** `n (required)`

#### `dedup`
Deduplicate by field

**Params:** `by (required)`

---

### Ordering

#### `sort`
Sort signals by field

**Params:** `by (required), order? (asc|desc)`

---

### Aggregation

#### `group_by`
Group + aggregate (count, sum, avg, min, max)

**Params:** `by (required), field?`

#### `top_n`
Top/bottom N by numeric field

**Params:** `by (required), n?`

#### `stats`
Compute count, mean, stddev, min, max, p95

**Params:** `field (required)`

---

### Temporal

#### `trend`
Linear regression: slope, direction, pct_change, volatility

**Params:** `min_slope?`

---

### Correlation

#### `correlate`
Temporal co-incidence join within a time window

**Params:** `within_secs?, source?, match?, max_per_signal?`

---

### Graph

### Enrichment

#### `annotate`
Add boolean field based on threshold comparison

**Params:** `field (required), threshold (required), as (required), op?`

#### `classify`
Assign tier labels based on numeric thresholds

**Params:** `field (required), as?, preset?, tiers?`

#### `set`
Inject static fields into all signals

**Params:** `fields (required) — object of key-value pairs`

---

### Feature extraction

#### `features`
Extract trend_slope, volatility, entropy, autocorrelation, seasonality

**Params:** `min_entropy?`

#### `rates`
Extract rate-change metrics and z-scores

**Params:** `min_z?`

#### `values`
Flatten payload fields into signal fields map

**Params:** `(none)`

#### `similar`
Jaccard similarity between signal names

**Params:** `max?, min_similarity?`

### State (stash / recall)

#### `stash`
Save current signals to a named buffer (passthrough)

**Params:** `label (required)`

#### `recall`
Replace pipeline with previously stashed signals

**Params:** `label (required)`

---

### Join

#### `join`
Inner join with a stashed signal set

**Params:** `right (required — stash label), on (required — field names)`

#### `anti_join`
Keep signals with no match in stashed set

**Params:** `right (required), on (required)`

---

### LLM

#### `summarize`
Send signals to LLM for natural-language summary

**Params:** `prompt?, system_prompt?, model?, max_tokens?`

---

### Output / sinks

#### `webhook`
POST signals to an external URL

**Params:** `url (required), headers?, include_payload?`

#### `alert`
Fire structured alert if signal count meets threshold

**Params:** `url (required), title?, severity?, min_signals?, headers?`

---

### Agent index analysis

#### `agent.expand`
Expand AgentIndex signals into per-entry signals for a specific section

**Params:** `section (required — anomalies, causal_edges, correlations, hypotheses, program_history)`

#### `agent.summary`
Collapse all input AgentIndex signals into one compact digest

**Params:** `max_entries?`

#### `agent.topics`
Collect distinct topic strings across all AgentIndex sections, sorted by mention count

**Params:** `min_mentions?`

---

### Agent persistence

#### `agent.write`
Write signals into .aiagent index files on S3

**Params:** `bucket?`

#### `agent.write_context`
Write summary/context/trace to agent index

**Params:** `bucket_start (required), bucket?, summary?, context?, decision_trace?`

#### `agent.compact`
Roll up fine-grained indexes into coarser buckets

**Params:** `target_bucket?, delete_source?`

#### `note`
agent.write automatically records program_history (steps, params, signal counts, timing, trigger) into the .aiagent index — agents can scan this to see what queries were used in past investigations

**Params:** ``


## Expression Language

Used by the `where` operator's `expr` parameter.

### Comparison Operators

| Operator | Description | Types |
|----------|-------------|-------|
| `==` | Equal | string, number, bool |
| `!=` | Not equal | string, number, bool |
| `>` | Greater than | number |
| `<` | Less than | number |
| `>=` | Greater or equal | number |
| `<=` | Less or equal | number |
| `contains` | Substring match | string |
| `matches` | Regex match | string |

### Logical Operators

| Operator | Precedence |
|----------|------------|
| `or` | lowest |
| `and` | middle |
| `not` | highest (unary) |
| `(...)` | grouping |

### Value Literals

- Strings: `'Spike'` or `"ERROR"` (single or double quoted)
- Numbers: `3.0`, `-1`, `100`
- Booleans: `true`, `false`

### Field Access

- Simple: `event_type`, `z_score`, `severity`
- Dotted: `labels.endpoint`, `edge.strength`

### Behavior

- **Reads from `signal.fields` only** — payload is invisible. See [Signal shape](#signal-shape) and [Field promotion](#field-promotion). To filter on a payload key (e.g. `count` from an aggregate, or `severity_number` from a log), insert `values` first to flatten payload → fields.
- Missing field in expression evaluates to `false` (so a typo or a payload-only field silently filters everything out — beware).
- String compared to number: attempts `f64` parse; `false` if it fails

### Examples

```
z_score > 3.0
event_type == 'Spike' and z_score > 3.0
severity != "DEBUG"
metric_name contains "error"
name matches "cpu_.*"
(event_type == 'Spike' or event_type == 'Drop') and z_score > 2.0
not severity == 'DEBUG'
labels.endpoint contains 'checkout'
has_anomalies == true
```

---

## Cookbook

### 1. Quick Error Scan

Find which services have the most errors in the last 15 minutes.

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "logs.raw", "params": {"body_contains": "error", "severity": "ERROR", "last": "15m", "limit": 500}},
      {"op": "group_by", "params": {"by": "service_name"}},
      {"op": "top_n", "params": {"by": "count", "n": 10}}
    ]
  }
}
```

### 2. Capacity Planning

Identify metrics with rising trends.

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "metrics.timeseries", "params": {"query": "avg:system.cpu.user{*}", "last": "24h"}},
      {"op": "trend", "params": {}},
      {"op": "where", "params": {"expr": "trend_direction == 'rising'"}},
      {"op": "features", "params": {}},
      {"op": "sort", "params": {"by": "trend_slope", "order": "desc"}}
    ]
  }
}
```

### 3. Investigation with Persistence

Query a metric, then persist a finding via `agent.write_context` as the last
step (writes to the S3 agent index — no separate HTTP call needed).

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "metrics.timeseries", "params": {"query": "avg:checkout.response.p99_ms{} by {region}", "last": "1h"}},
      {"op": "where", "params": {"expr": "last_value > 500"}},
      {"op": "agent.write_context", "params": {
        "bucket_start": 1740355200,
        "summary": "Checkout p99 elevated in ewr",
        "context": {
          "root_cause": "Missing index on payments.transactions",
          "affected_services": "payments-api, web-gateway"
        },
        "decision_trace": [
          "checkout.response.p99_ms > 500ms in ewr",
          "Correlated with payments-api ERROR logs",
          "Root cause: missing index on payments.transactions"
        ]
      }}
    ]
  }
}
```

### 4. Incident Report from Previous Investigations

Read stored investigations (the agent index) and send a summary alert.

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "agent.scan", "params": {"last": "24h", "bucket": "5m"}},
      {"op": "where", "params": {"expr": "has_anomalies == true"}},
      {"op": "sort", "params": {"by": "max_z_score", "order": "desc"}},
      {"op": "summarize", "params": {"prompt": "Create an incident report from these investigations"}},
      {"op": "webhook", "params": {"url": "https://hooks.slack.com/..."}}
    ]
  }
}
```

### 5. Diff: What's New Since Last Check

Use stash/recall to compare a metric window against stashed state.

```json
{
  "tenant_id": "tenant-00001",
  "program": {
    "steps": [
      {"op": "metrics.timeseries", "params": {"query": "avg:llm.api.timeout.count{} by {model_family}", "last": "1h"}},
      {"op": "stash", "params": {"label": "current"}},
      {"op": "recall", "params": {"label": "current"}},
      {"op": "where", "params": {"expr": "last_value > 0"}},
      {"op": "dedup", "params": {"by": "metric_name"}},
      {"op": "sort", "params": {"by": "last_value", "order": "desc"}},
      {"op": "take", "params": {"n": 20}}
    ]
  }
}
```
---

## Error Handling

All error responses include structured feedback:

```json
{
  "error": "Invalid bucket: '10s'",
  "hint": "bucket must be one of: 5m, 1h, 1d",
  "schema": { "...field descriptions..." },
  "example": { "...valid payload you can copy..." }
}
```

Parse errors on `/execute` and `/explain_plan` also include an `operators` field with the full catalog of valid operators and their params.

**Strategy:** If you get an error, read the `hint` field and adjust your request accordingly. The `example` field contains a valid payload you can use as a starting point.

---

## Limits & Defaults

| Setting | Default | Max | Description |
|---------|---------|-----|-------------|
| `timeout_ms` | 30000 | server-configured | Pipeline timeout |
| `max_signals` | 10000 | server-configured | Signal cap per stage |
| `steps` | — | 20 | Max pipeline steps |
| `tenant_id` | — | 128 chars | Tenant identifier |
| `summary` | — | 8 KB | Context write summary |
| `context` map | — | 50 entries | Context key-value pairs |
| `context` keys | — | 128 chars | Context key length |
| `context` values | — | 4 KB each | Context value length |
| `decision_trace` | — | 100 entries | Decision trace steps |
| `decision_trace` entries | — | 2 KB each | Per-step length |
