πŸ“Š Langfuse Tracing & Robust Ingestion

TL;DR πŸš€ Observe every agent turn safely without blocking execution or hitting body size limits. Langfuse traces are exported via OpenTelemetry (primary) with a REST fallback (backup). Large payloads (long prompt histories, git diffs, file reads) can aggregate to hit the self-hosted Next.js HTTP 413 β€œBody exceeded 4.5mb limit” block on batch ingestion. We fix this via OTel visibility buffers, payload truncation, and chunked batch flushes. πŸ”Œ


🧬 Instrumentation & Fallback Flow

flowchart TD
  subgraph Agent["πŸ€– Pi Coding Agent Run"]
    P[Prompt Start] -->|Turn Start| T[Turn Span]
    T -->|Call LLM| G[Generation Observation]
    T -->|Call Tool| O[Tool Span]
    T -->|Turn End| E[Turn End]
  end

  subgraph Export["πŸ“‘ Trace Export & Visibility Fallback"]
    E -->|agent_end| FLUSH["forceFlush() OTel Spans"]
    FLUSH -->|Wait up to 8s| POLL{"Trace visible in DB?<br/>(POLL_INTERVAL=500ms)"}
    
    POLL -->|🟒 Yes| DONE[OTel Succeeded]
    POLL -->|πŸ”΄ No| REST[REST Fallback Ingestion]
  end

  subgraph Fallback["βœ‚οΈ REST Fallback Processing (pi-langfuse)"]
    REST -->|Tier 1| TRUNC["Truncate String values >200KB<br/>(Recursively prune deep objects)"]
    TRUNC -->|Tier 2| CHUNK["Slice batch into chunks of 15 items"]
    CHUNK -->|Tier 3| SEND["POST sequentially to /api/public/ingestion<br/>(Individual try/catches)"]
  end

  classDef main fill:#5865F2,stroke:#fff,stroke-width:2px,color:#fff;
  classDef helper fill:#2F3136,stroke:#7289DA,stroke-width:1px,color:#fff;
  classDef state fill:#43B581,stroke:#fff,stroke-width:1px,color:#fff;
  class E,FLUSH main;
  class POLL,REST,TRUNC,CHUNK,SEND helper;
  class DONE state;

🚨 The HTTP 413 Body Size Limit

Self-hosted Langfuse servers use Next.js behind the hood, enforcing a default bodyParser: { sizeLimit: '4.5mb' } on the ingestion endpoint.

When a long-running agent session is flushed, the REST fallback bundles all observations (system prompt, conversation history, tool calls, and final answers) into a single batch JSON array. This payload easily aggregates to 5MB - 10MB, causing the server to reject the entire batch with statusCode: 413.

/* ❌ Next.js body-parser rejection raw body */
{
  "ok": false,
  "error": {
    "reason": "non-json",
    "statusCode": 413,
    "rawBody": "Body exceeded 4.5mb limit"
  }
}

πŸ›‘οΈ The Three-Tier Reliability Fix

We implement three layers of defensive ingestion inside pi-langfuse’s REST fallback:

1️⃣ OpenTelemetry Visibility Buffer

Self-hosted Langfuse ingests traces asynchronously (Ingestion queue βž” Worker βž” DB). Spans are rarely queryable within ~1.5s after flush.

  • Fix: We increase OTEL_VISIBILITY_TIMEOUT_MS to 8_000ms (polling every 500ms) so the REST fallback only fires on genuine OTel exporter failures, preventing duplicate trace writes.

2️⃣ Recursive Payload Truncation (safeValue)

No single trace input, output, or metadata object should choke the network. We recursively traverse payloads and truncate individual strings to a safe threshold.

// πŸš€ why: prevents massive string values (like git diffs) from exceeding limits
function truncateString(str: string, limit = 200_000): string {
  if (str.length > limit) {
    return str.slice(0, limit) + "\n\n... [Truncated by pi-langfuse due to length limits]";
  }
  return str;
}
 
// πŸš€ what: recursively prunes objects and limits nesting depth to avoid stack overflows
function safeValue(val: unknown, depth = 0): unknown {
  if (val === null || val === undefined) return val;
  if (typeof val === "string") return truncateString(val);
  
  if (Array.isArray(val)) {
    if (depth > 5) return "[Array truncated due to nesting depth]";
    return val.map((item) => safeValue(item, depth + 1));
  }
  if (typeof val === "object") {
    if (depth > 5) return "[Object truncated due to nesting depth]";
    const cleaned: Record<string, unknown> = {};
    for (const key of Object.keys(val)) {
      cleaned[key] = safeValue((val as any)[key], depth + 1);
    }
    return cleaned;
  }
  return val;
}

3️⃣ Chunked REST Ingestion (chunkSize = 15)

Instead of sending the whole execution history in one request, we partition the batch.

// πŸš€ how: slice the payload array and wrap each POST in an independent try/catch block
const chunkSize = 15;
for (let i = 0; i < batch.length; i += chunkSize) {
  const chunk = batch.slice(i, i + chunkSize);
  try {
    await rt.scoreClient.api.ingestion.batch({
      batch: chunk,
      metadata: {
        source: "pi-langfuse",
        fallback: "rest-ingestion",
        chunkIndex: Math.floor(i / chunkSize),
        totalChunks: Math.ceil(batch.length / chunkSize),
      },
    });
  } catch (e) {
    // πŸ’‘ non-blocking: one chunk failing doesn't abort the remaining queue flushes
    console.warn(`πŸ“Š Langfuse: Failed to ingest fallback batch chunk ${Math.floor(i / chunkSize) + 1}/${Math.ceil(batch.length / chunkSize)}`, e);
  }
}

πŸ’‘ Best Practices Checklist

  • ⏳ Deferred Flush: Always defer the final Langfuse shutdown/flush using setTimeout(shutdownRuntime, 0) on agent_end. Never let telemetry block the main agent execution turn or increase user latency.
  • πŸ›‘οΈ PII & Credential Masking: Never log environment arrays or raw configs. Function arguments containing secret keys must be stripped before logging inputs.
  • 🏷️ Clean Trace Naming: Choose descriptive names (code-generation, tool-execution) over generic IDs to ensure UI filtering is actually usable.
  • πŸ”— Session Attribution: Bind sessionId on traces using the Discord thread ID. This groups conversational multi-turn histories in the Langfuse Sessions view.