VOOZH about

URL: https://dev.to/newbe36524/pi-agent-integration-message-parsing-retry-and-cancellation-nl0

⇱ Pi Agent Integration: Message Parsing, Retry, and Cancellation - DEV Community


Pi Agent Integration: Message Parsing, Retry, and Cancellation

When integrating a CLI-based AI agent, you can't avoid three things: how to translate its private event stream into stable messages, who's responsible for retry after failures, and how to cleanly stop the process when users click cancel. These three things essentially boil down to "clarifying responsibilities"—it's simple in theory, but you only realize how deep the water is when you actually do it.

Background

Recently, I've been working on an AI coding assistant project, and one of the agents to integrate is pi. It's a TUI/CLI coding agent that outputs JSON events line by line to stdout when running. Sounds simple—spawn the process, read output, parse it—but when you actually start, you realize "integrating an agent CLI" is completely different from "integrating a regular CLI."

With a regular CLI, you read stdout, get an exit code, and that's it. But agent CLIs have three particularly headache-inducing characteristics:

First, its event stream is a private protocol. turn_start, session, message_update, message_end, turn_end, agent_end—these are defined by pi itself, not any industry standard. Every upper layer that wants to consume it has to handle it separately, effectively leaking pi's internal details everywhere. It's like looking at someone from a distance—you think you see them clearly, but you're only seeing the side they want to show you.

Second, its failure semantics are particularly ambiguous. The agent might encounter network jitter, model rate limiting, or process crashes while running. Should it retry? Where to retry? Will retry mess up the session state that's already half-output? This is an architectural decision, not something you can solve by casually writing a for loop.

Third, it's long-running and interruptible. A single turn might run for tens of seconds or even minutes, and users might want to cancel at any time. When canceled, the process can't become an orphan, tool calls can't be left half-finished, and already output content can't be lost. The water here is much deeper than imagined.

To solve these pain points, we spent some time straightening out the integration path. I'll get into specifics later, but here's a spoiler: the real difficulty isn't in "spawning the process," but in "clarifying responsibilities."

About HagiCode

The solution shared in this article comes from the HagiCode project—an AI coding assistant that supports multiple models and multiple agent CLI backends. GitHub repository: HagiCode-org/site, feel free to star it. All the code and all the pitfalls mentioned below are actually running in this project. Writing this out is just leaving myself a memory.

Overall Layering

HagiCode splits AI capability integration into two layers:

  • The bottom layer is Hagicode.Libs, providing reusable provider primitives ICliProvider<TOptions>, specifically responsible for "spawning a CLI agent and normalizing its output into a shared message stream."
  • The upper layer is hagicode-core, providing project-level thin adapters IAIProvider, responsible for "translating business requests into provider parameters, consuming shared message streams, and exposing unified streaming chunks externally."

Pi's integration follows this path. The bottom layer PiProvider spawns the pi process, reads the JSON event stream, and normalizes it into shared messages; the upper layer PiCliProvider translates AIRequest into PiOptions, consumes CliMessage, and outputs AIStreamingChunk.

These three things—message parsing, retry, cancellation—fall into three different places: PiJsonEventMapper, a seemingly strange archiving proposal, and CliProcessManager. Let's talk about each one.

Message Parsing: Converting Pi's Private Events to Shared Messages

pi outputs JSON events line by line under --mode json --print. This event set is private to pi and must not be leaked directly to upper layers, otherwise every consumer would couple to pi's internal details, and the entire project would follow pi's upgrades when its event structure changes. This kind of leakage is similar to writing your thoughts on your face—others find it tiring to look at, and you're not necessarily comfortable either.

We use PiJsonEventMapper as a translation layer to normalize pi's events into shared CliMessage. CliMessage is defined in HagiCode.Libs.Core/Transport/CliMessage.cs with a very simple structure—just a (Type, Content) record. The mapping relationship is roughly as follows:

pi event shared message purpose
session session.started / session.resumed session lifecycle
message_update (text type) assistant streaming body incremental
message_update (thinking type) assistant.thought thought chain
message_update (tool type) tool.call / tool.update tool call initiation
message_end / turn_end (toolResult) tool.completed / tool.failed tool result
turn_end / agent_end terminal.completed current turn end
non-zero exit / parse failure terminal.failed terminal state failure

This table is just a quick reference, but it contains two key techniques that were figured out after stumbling into pitfalls—worth expanding on.

Technique One: Converting Cumulative Snapshot to Delta

This is the easiest place to crash. pi's message_update event doesn't send increments, but cumulative full text—every time a token comes, it resends the "complete text so far."

If you forward received content directly to the frontend, users will see content repeated: the first is "you", the second is "hello", the third is "hello,", the fourth is "hello, wor"... The frontend will think these are four independent outputs. Repetition is fresh the first time you see it, but boring the tenth time.

The solution is prefix comparison to calculate the true delta:

// Key: pi sends cumulative snapshot, not increment
// Use prefix comparison to extract the delta, otherwise frontend sees repeated content
if (text.StartsWith(_lastAssistantTextSnapshot, StringComparison.Ordinal))
{
 var delta = text[_lastAssistantTextSnapshot.Length..];
 _lastAssistantTextSnapshot = text;
 return delta.Length == 0 ? null : delta;
}

There's another hidden pitfall here: cross-turn prefix replay. After tool calls end and the assistant continues speaking, pi will replay that previous text from the beginning again. If you only keep a global snapshot, you'll treat the replayed content as increments, causing repetition after tool calls. PiProviderTests has a dedicated test case ExecuteAsync_deduplicates_replayed_assistant_prefix_after_tool_turns covering this scenario. In other words, snapshots before and after tool calls need aligned processing, not independent handling.

Technique Two: Buffer Thinking Until Turn End

Thought chains (thinking) can't be output as soon as each token is received. pi stuffs a bunch of thinking fragments in the middle of tool calls. If forwarded in real time, the stream order becomes a mess—一会儿是 assistant 正文,一会儿是思考碎片,一会儿又是 tool.call。Does this make sense? It doesn't really, just adding confusion.

Our approach: when receiving thinking events, first put them in BufferThinkingSnapshot for temporary storage, and wait until message_end or turn_end with stopReason != "toolUse", then uniformly DrainBufferedThinkingMessages. This way, thinking fragments in the middle of tool calls won't pollute the main stream, and the complete thought process is given all at once when the turn ends.

Fault Tolerance: Bad Lines Can't Crash the Stream

Agent CLI isn't the ideal system from textbooks. It occasionally spits out a non-JSON line, or a JSON without a type field. If you throw an exception here, the entire stream dies and users see nothing. The real world isn't always perfect—who can guarantee every line is well-behaved?

Our strategy: any line that fails to parse doesn't interrupt the stream, but is collected into _invalidOutputLines. After the process ends, in Complete(), these "bad lines" are spliced into the diagnostic text of terminal.failed. This way, when users see errors, they can directly see what garbage pi actually output, not a dry "parse error."

Retry: If Provider Layer Doesn't Do It, Who Does?

This is the easiest pitfall in the entire integration. Intuitively "integrating a CLI should include retry," but HagiCode in an archiving proposal actively removed all automatic retry from the provider layer. The proposal is called remove-provider-auto-retry-support.

Why No Automatic Retry

The proposal background is written very bluntly. Retry logic was originally scattered in two places: one copy in Hagicode.Libs (OpenCode-style fresh-runtime replay), another in hagicode-core (ProviderErrorAutoRetryCoordinator). Both sides did their own thing, making "whether to retry" a hidden implicit behavior inside the provider, secretly changing failure timing, session continuation method, and chat state flow.

Think about it and your head hurts: user sends a message, provider internally retries three times itself, first two fail, third succeeds. Upper layer has no idea what happened in the middle, session state, token counting, UI progress all don't match. This kind of implicit behavior is chronic poison in architecture.

So the boundary was converged into one sentence:

provider converges to single-attempt semantics, caller needs to treat non-retry state as normal single-execution result.

What Does This Mean for PiProvider

In code, it's three things:

  • PiOptions has no retry-related fields—no maxAttempts, no retryDelay, no retryClassifier.
  • ExecuteAsync ends after one pi process run completes, failures directly give terminal.failed.
  • Classifiers previously serving automatic retry (ClaudeCodeRetryableTerminalFailureClassifier, CodexRetryableTerminalFailureClassifier, etc.)—if they purely served automatic retry—are all removed from the active path.

But please note, retry capability hasn't disappeared, just moved up. The proposal explicitly writes "leaving a stable boundary for higher layers to uniformly take over retry later." The DTO for providerErrorAutoRetry configuration item, normalization, serialization, frontend settings page round-trip are all preserved, it just no longer drives provider execution. Some things aren't really unwanted, just kept in a different way.

What If You Want to Retry

If you want to add retry on top of pi, the correct approach is to do it at the caller of PiCliProvider—for example, your session orchestration layer (in HagiCode it's Orleans's SessionGrain, frontend might be chat orchestration layer). After getting terminal.failed, judge yourself whether it's retryable, decide delay and count yourself, then send ExecuteAsync again.

A minimal viable pattern looks like this:

// Retry logic at caller, don't stuff back into PiProvider
// Otherwise breaks the "single-attempt" boundary just established by provider
async Task<AIResponse> ExecuteWithRetryAsync(AIRequest req, int maxAttempts, CancellationToken ct)
{
 for (var attempt = 1; ; attempt++)
 {
 var response = await provider.ExecuteAsync(req, ct);

 // Return on success or reaching limit
 if (response.FinishReason != FinishReason.Unknown || attempt >= maxAttempts)
 return response;

 // Only retry retryable terminal failures (network, 5xx, process crash)
 // model rejected, auth failure这类重试也无意义,别重试
 await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
 }
}

The classification logic for judging "retryable" is no longer in the provider, caller defines it themselves. providerErrorAutoRetry configuration (maxAttempts, retryDelay, enabled) can still be read from the frontend settings page, but what actually drives retry is your orchestration layer, not PiProvider. Repeat this three times.

Cancellation: Token Pass-through + Three-stage Shutdown

For cancellation, PiProvider almost doesn't implement anything itself, fully delegating to CliProcessManager. PiProvider only handles two things: passing CancellationToken down, and cleanup on exception.

Full-chain Pass-through

The chain looks like this, passing all the way down:

调用方 CancellationToken
 → PiCliProvider.StreamCoreAsync(cancellationToken)
 → PiProvider.ExecuteProcessAsync([EnumeratorCancellation] cancellationToken)
 → ReadLineAsync(cancellationToken) / WaitForExitAsync(cancellationToken)
 → 异常时 _processManager.StopAsync(handle, CancellationToken.None)

Note the last line: cleanup uses CancellationToken.None, not the token the user passed in. This is a detail, but extremely important.

The reason is: the user's token is already canceled. If you use this already-canceled token for cleanup, the cleanup task will be canceled immediately, and the process becomes an orphan—pi still running in the background, no one collecting it, CPU and memory occupied for nothing. So cleanup must use CancellationToken.None, ensuring cleanup actions definitely complete. It's like with people—some things need proper cleanup after they completely stop, otherwise it's just leaving a mess.

Three-stage Progressive Shutdown

CliProcessManager.StopProcessAsync is a three-stage progressive shutdown process, with time constants defined at the top of the file:

// 优雅停止的耐心:先给进程自己收尾的时间
private static readonly TimeSpan GracefulStopTimeout = TimeSpan.FromSeconds(2);
// 强制 kill 后等待进程真正退出的耐心
private static readonly TimeSpan StopWaitTimeout = TimeSpan.FromSeconds(5);

The three stages progress like this:

  1. Interrupt signal. TryInterruptAsync first writes a \u0003 (Ctrl+C character) to stdin, and on Unix additionally does kill -INT <pid>. This step is to let pi gracefully end itself—it can perceive the interrupt and wrap up what it's writing.
  2. Graceful wait. Wait at most 2 seconds to see if the process exits on its own.
  3. Force kill. If it hasn't exited, directly Process.Kill(entireProcessTree: true) to kill the entire process tree together, then wait at most 5 seconds to confirm it's really dead.

Why entireProcessTree: true? Because pi spawns child processes when running tools—for example, local model processes routed by provider, bash subprocesses running. Killing only the parent process leaves child processes as orphans continuing to run. Killing the whole tree together is clean.

Under Windows there's no SIGINT, can only rely on Ctrl+C character, so cross-platform behavior will differ, keep this in mind.

PiProvider's Exception Cleanup

PiProvider's ExecuteProcessAsync when ReadLineAsync throws an exception, uses ExceptionDispatchInfo.Capture to temporarily store the exception, jumps out of the loop to call StopAsync to clean up the process, then pendingException.Throw() rethrows the original exception to the upper layer.

Why store then throw? Because if you throw directly, the process hasn't been recycled yet and becomes an orphan; if you throw before StopAsync, cleanup logic doesn't run at all. Store it temporarily, first guarantee the process is definitely recycled, then preserve the original OperationCanceledException semantics completely for the caller—caller gets this exception and can judge "oh, user actively canceled" not "something went wrong."

Unified Contract for Startup Failures

There's another detail worth mentioning separately. Process startup failure—for example, pi executable doesn't exist, wrong permissions—PiProvider doesn't throw exceptions, but synthesizes a terminal.failed message, then yield break.

Why do this? Because if you throw exceptions, upper layer consumers have to handle two completely different semantics: one is "normal messages during streaming consumption," the other is "exceptions thrown before streaming starts." This makes the consumer's await foreach particularly hard to write.

After unifying to "always give you messages first, then end the stream," consumer logic is consistent: getting terminal.failed counts as failure, getting terminal.completed counts as success, no need for try/catch branching. This is a small but important design decision, stabilizing the contract.

Practice: Correct Way to Consume Streams

Referencing PiScenarioMessageReader (libs console test scenario) and PiCliProvider.StreamCoreAsync (core thin adapter) in HagiCode, consumers look roughly like this:

await foreach (var message in provider.ExecuteAsync(options, prompt, cancellationToken))
{
 // 1. 短路处理失败,别再处理后续消息
 if (NormalizedAcpCliAdapter.TryGetFailureMessage(message.Content, out var failure))
 {
 yield return new AIStreamingChunk { Type = StreamingChunkType.Error, ErrorMessage = failure };
 yield break; // stream ends after terminal.failed
 }

 // 2. assistant 文本是 cumulative snapshot,自己再做一次增量计算
 if (message.Type == "assistant" && TryGetText(message.Content, out var text))
 {
 var delta = ReconcileSnapshot(text); // 前缀比对
 if (!string.IsNullOrEmpty(delta)) yield return Chunk(delta);
 }

 // 3. terminal.completed 是唯一可靠的"结束"信号
 if (message.Type == "terminal.completed") break;
}

Common Pitfalls Quick Reference

Organizing all the pitfalls encountered along the way into a table, for future reference:

现象 原因 处理
前端看到 assistant 文本重复 没做 cumulative 转 delta ReconcileAssistantTextSnapshot 做前缀比对
取消后进程还在跑 清理用了已经取消的 token 改用 CancellationToken.None 做清理
重试不生效 把重试写进了 PiProvider,但 provider 是单次尝试语义 上移到调用方编排层
pi 报错信息丢失 没读 terminal.failed 的诊断字段 完整透传 text / invalid_output_lines / stderr
工具调用中途收到思考碎片 直接转发了 thinking 事件 缓冲到 turn 结束再 DrainBufferedThinkingMessages

How to Verify

The libs layer uses StubCliProcessManager to mock processes, with unit tests covering pure logic like parameter construction, event normalization, incremental deduplication, failure pass-through. The real CLI path uses HAGICODE_REAL_CLI_TESTS environment variable to opt-in, running trip scenarios with real models. The core layer's PiCliProviderTests verifies the thin adapter's AIStreamingChunk projection and session binding.

# 在 Hagicode.Libs 仓库跑 Pi 相关单测
dotnet test --filter "FullyQualifiedName~PiProviderTests"

# 跑真实 CLI 集成测试(需要本地装好 pi)
HAGICODE_REAL_CLI_TESTS=1 dotnet test --filter "FullyQualifiedName~PiProviderTests.RealCli"

Summary

Putting these three things together, the mental model for integrating pi actually boils down to one sentence: let each layer do only its own thing.

  • Message parsing is handed to PiJsonEventMapper: private events are normalized into shared CliMessage, cumulative snapshots converted to deltas, thinking buffered until turn end.
  • Retry is handed to the caller: provider does single attempts, whoever wants retry does it at the upper layer themselves, configuration is preserved but no longer drives provider.
  • Cancellation is handed to CliProcessManager: CancellationToken is passed through the full chain, cleanup uses CancellationToken.None, three-stage progressive shutdown (interrupt signal → graceful wait → force kill entire process tree).

After these boundaries are clearly drawn, integrating a new agent CLI almost becomes pipeline work—you just need to write a new XxxProvider and XxxJsonEventMapper, and cross-cutting logic like retry, cancellation, message contracts, error handling are all reused. This is also the fundamental reason why HagiCode can simultaneously support multiple agent CLI backends (claude code, codex, pi, gemini cli, etc.) without getting messy.

Let me say that most important boundary one more time: don't add retry at the provider layer. Once you understand this, integrating agent CLI is more than halfway done...

Summary

Returning to the theme "Pi Agent Integration: Message Parsing, Retry, and Cancellation," what's really worth repeatedly confirming isn't scattered techniques, but whether constraints, implementation boundaries, and engineering trade-offs have been clearly seen.

As long as the judgment bases in this article are settled into stable checklist items, you can make reliable decisions faster when facing similar problems in the future.

Original Article & License

Thanks for reading. If this article helped, consider liking, bookmarking, or sharing it.
This article was created with AI assistance and reviewed by the author before publication.