956 words Slides

17.4 Streaming Input & Output

Course: Claude Code - Enterprise Development

Section: Claude Agent SDK

Video Length: 3-4 minutes

Presenter: Daniel Treasure


Opening Hook

"Large inputs and outputs can be slow. What if you could process data incrementally, giving users real-time feedback as Claude analyzes? Streaming is the answer—and the SDK makes it simple."


Key Talking Points

What to say:

  • "Streaming reduces latency—users see results as Claude generates them, not waiting for the full response."
  • "Streaming input handles large files or continuous data without loading everything into memory."
  • "Streaming output is perfect for: progress updates, live dashboards, long analyses, chat interfaces."
  • "The SDK handles chunking, backpressure, and error events automatically."

What to show on screen:

  • Standard (non-streaming) vs. streaming code side-by-side
  • Live terminal showing streamed output appearing in real-time
  • User interface updating as chunks arrive
  • Backpressure handling and completion events

Demo Plan

[00:00 - 01:00] Streaming Basics 1. Show non-streaming code: wait for full response 2. Contrast with streaming code: iterate over chunks 3. Explain: same agent/task, different consumption pattern 4. Show timing difference: streaming faster to first output

[01:00 - 02:00] Streaming Output Example 1. Create Python example: async for event in agent.stream(task): 2. Show event types: text, tool_use, completion 3. Print events as they arrive (show streaming in real-time) 4. Demonstrate: accessing event data (event.text, event.tool_name)

[02:00 - 03:00] Streaming Input (Large Files) 1. Scenario: analyze 10MB log file 2. Show: reading file in chunks 3. Pass chunks to agent via streaming API 4. Demonstrate: agent processes chunks without loading full file 5. Show memory usage stays low

[03:00 - 03:45] Error Handling & Completion 1. Show: subscribing to error events 2. Demonstrate: handling incomplete streams 3. Show: completion event (when done) 4. Explain: backpressure (what happens if agent is too fast)


Code Examples & Commands

Streaming Output (Python):

import asyncio
from claude_code import Agent

async def stream_analysis():
    agent = Agent(
        model="claude-sonnet-4-5-20250929",
        system_prompt="You are a detailed code analyzer."
    )

    task = "Analyze this Python function for performance issues: def fibonacci(n): return n if n <= 1 else fibonacci(n-1) + fibonacci(n-2)"

    print("Streaming output:\n")

    # Stream events as they arrive
    async for event in agent.stream(task):
        if event.type == "text":
            # Print text chunks as they arrive
            print(event.text, end="", flush=True)
        elif event.type == "tool_use":
            print(f"\n[Using tool: {event.tool_name}]")
        elif event.type == "completion":
            print(f"\n[Completed with status: {event.status}]")

# Run
asyncio.run(stream_analysis())

Streaming Input (Large File):

import asyncio
from pathlib import Path
from claude_code import Agent

async def analyze_large_file(file_path: str, chunk_size: int = 8192):
    agent = Agent(
        model="claude-sonnet-4-5-20250929",
        system_prompt="Analyze logs and identify errors."
    )

    # Read file in chunks
    chunks = []
    with open(file_path, "r") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            chunks.append(chunk)

    print(f"Processing {len(chunks)} chunks...\n")

    # Stream chunks to agent
    prompt = f"Analyze these log chunks and identify patterns:\n{' '.join(chunks)}"

    async for event in agent.stream(prompt):
        if event.type == "text":
            print(event.text, end="", flush=True)

# Usage
asyncio.run(analyze_large_file("large_logfile.txt"))

TypeScript Streaming:

import Anthropic from "@anthropic-ai/sdk";

async function streamAnalysis(): Promise<void> {
  const client = new Anthropic();

  console.log("Streaming response:\n");

  // Create stream
  const stream = await client.messages.create({
    model: "claude-sonnet-4-5-20250929",
    max_tokens: 1024,
    stream: true, // Enable streaming
    messages: [
      {
        role: "user",
        content:
          "Explain quantum computing in 3 paragraphs, slowly so I can read it.",
      },
    ],
  });

  // Process events
  for await (const event of stream) {
    if (
      event.type === "content_block_delta" &&
      event.delta.type === "text_delta"
    ) {
      process.stdout.write(event.delta.text);
    }
  }

  console.log("\n[Streaming complete]");
}

streamAnalysis().catch(console.error);

With Error Handling:

import asyncio
from claude_code import Agent

async def robust_streaming():
    agent = Agent(model="claude-sonnet-4-5-20250929")

    try:
        async for event in agent.stream("Your task"):
            if event.type == "text":
                print(event.text, end="", flush=True)
            elif event.type == "error":
                print(f"\nError: {event.message}")
                break
            elif event.type == "completion":
                print(f"\nDone: {event.status}")
    except asyncio.TimeoutError:
        print("\nStream timed out")
    except Exception as e:
        print(f"\nUnexpected error: {e}")

asyncio.run(robust_streaming())

Gotchas & Tips

Gotcha 1: Backpressure - If you process events slowly, the stream may pause waiting for you - Solution: Keep event processing fast; offload slow work to background threads

Gotcha 2: Async/Await Required - Streaming requires async context (async for, await) - Mixing sync and async code will fail

Gotcha 3: Incomplete Streams - If client disconnects or times out, stream ends early - Solution: Always check completion event or catch exceptions

Gotcha 4: Memory with Large Files - Even with streaming input, if you accumulate chunks in memory, you lose the benefit - Solution: Process chunks on-the-fly, don't store entire file

Tip 1: Progress Indicators - Use streaming to show progress bars or live updates - Count events, estimate time remaining

Tip 2: Buffering - If output needs formatting, buffer a few chunks before printing - Avoids one character per line (looks slow)

Tip 3: Real-time UI - Stream events to WebSocket, update frontend in real-time - Users see Claude thinking, not waiting for full response

Tip 4: Cancellation - If user cancels operation, break the loop and close stream - Clean shutdown prevents hanging connections


Lead-out

"Streaming gives users immediate feedback and reduces latency. Next video: permissions. Not all agent actions should be automatic—some need human approval. We'll show how to implement permission callbacks in the SDK."


Reference URLs

  • Anthropic Streaming Documentation: https://docs.anthropic.com/
  • Python asyncio: https://docs.python.org/3/library/asyncio.html
  • Node.js Streams: https://nodejs.org/en/docs/guides/backpressuring-in-streams/

Prep Reading

  • Review Python asyncio or TypeScript async/await (5 min)
  • Understand generator/iterator patterns (5 min)

Notes for Daniel

  • Live demo: Streaming is visually impressive—show it live if possible. Real-time output on screen is compelling.
  • Comparison: Show side-by-side timing: standard (wait 3 seconds) vs. streaming (see first word in 0.2 seconds).
  • Use case emphasis: Mention use cases where streaming shines: dashboards, chat interfaces, live reports.