π Langfuse Tracing & Robust Ingestion
TL;DR π Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 βBody exceeded 4.5mb limitβ block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. π
𧬠Instrumentation & Fallback Flow
flowchart TD subgraph Agent["π€ Pi Coding Agent Run"] P[Prompt Start] -->|Turn Start| T[Turn Span] T -->|Call LLM| G[Generation Observation] T -->|Call Tool| O[Tool Span] T -->|Turn End| E[Turn End] end subgraph Export["π‘ Trace Export & Visibility Fallback"] E -->|agent_end| FLUSH["forceFlush() OTel Spans"] FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"} POLL -->|π’ Yes| DONE[OTel Succeeded] POLL -->|π΄ No| REST[REST Fallback Ingestion] end subgraph Fallback["βοΈ REST Fallback Processing (pi-langfuse)"] REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"] TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"] CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"] end classDef main fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff; classDef helper fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff; classDef state fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff; class E,FLUSH main; class POLL,REST,TRUNC,CHUNK,SEND helper; class DONE state;
π¨ The HTTP 413 Body Size Limit
Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.
When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.
/* β Next.js body-parser rejection raw body */
{
"ok": false,
"error": {
"reason": "non-json",
"statusCode": 413,
"rawBody": "Body exceeded 4.5mb limit"
}
}π‘οΈ The Three-Tier Reliability Fix
We implement three layers of defensive ingestion inside pi-langfuseβs REST fallback:
1οΈβ£ OpenTelemetry Visibility Buffer
Self-hosted Langfuse ingests traces asynchronously (Ingestion queue β Worker β DB). Spans are rarely queryable within ~1.5s after flush.
- Fix: We increase
OTEL_VISIBILITY_TIMEOUT_MSto8_000ms(polling every500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.
2οΈβ£ Recursive Payload Truncation (safeValue)
No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.
// π why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
if (str.length > limit) {
return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]";
}
return str;
}
// π what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
if (val === null || val === undefined) return val;
if (typeof val === "string") return truncateString(val);
if (Array.isArray(val)) {
if (depth > 5) return "[Array truncated due to nesting depth]";
return val.map((item) => safeValue(item, depth + 1));
}
if (typeof val === "object") {
if (depth > 5) return "[Object truncated due to nesting depth]";
const cleaned: Record<string, unknown> = {};
for (const key of Object.keys(val)) {
cleaned[key] = safeValue((val as any)[key], depth + 1);
}
return cleaned;
}
return val;
}3οΈβ£ Chunked REST Ingestion (chunkSize = 15)
Instead of sending the whole execution history in one request, we partition the batch.
// π how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15;
for (let i = 0; i < batch.length; i += chunkSize) {
const chunk = batch.slice(i, i + chunkSize);
try {
await rt.scoreClient.api.ingestion.batch({
batch: chunk,
metadata: {
source: "pi-langfuse",
fallback: "rest-ingestion",
chunkIndex: Math.floor(i / chunkSize),
totalChunks: Math.ceil(batch.length / chunkSize),
},
});
} catch (e) {
// π‘ non-blocking: one chunk failing doesn't abort the remaining queue flushes
console.warn(`π Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`, e);
}
}π‘ Best Practices Checklist
- β³ Deferred Flush: Always defer the final Langfuse shutdown/flush using
setTimeout(shutdownRuntime, 0)onagent_end. Never let telemetry block the main agent execution turn or increase user latency. - π‘οΈ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
- π·οΈ Clean Trace Naming: Choose descriptive names (
code-generation,tool-execution) over generic IDs to ensure UI filtering is actually usable. - π Session Attribution: Bind
sessionIdon traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.