Problem: Claude Sonnet 4.5 Function Calling and Streaming Don't Work Together Out of the Box
Claude Sonnet 4.5 API function calling and streaming unlock real-time AI responses with live tool execution — but combining them trips up most developers on the first attempt. The tool_use content block arrives differently in a streamed response than in a standard completion, and mishandling the delta accumulation produces silent failures with no error message.
You'll learn:
- How to define and invoke tools (function calling) with the Claude Sonnet 4.5 API
- How to stream responses and correctly reconstruct
tool_useblocks from deltas - Production patterns: retry logic, cost-aware token tracking, and parallel tool calls
Time: 20 min | Difficulty: Intermediate
Why This Happens
Claude's API returns tool calls inside content blocks of type tool_use. In a non-streamed response that block arrives whole. In a streamed response it arrives as a sequence of content_block_start, content_block_delta, and content_block_stop events — and the input field (the JSON arguments) is chunked across multiple input_json_delta events.
Most example code handles text streaming fine. It breaks when a tool_use block appears because the accumulation logic for input_json_delta is never wired up.
Symptoms:
KeyError: 'input'when parsing streamed tool calls- Tool arguments arrive as
Noneor empty string - Works perfectly in non-stream mode, breaks silently when
stream=True - Occurs in
anthropic>=0.25.0on Python 3.11+ and Node 20+
Request flow: client → Anthropic API → streamed SSE events → tool execution → second API turn → final streamed response
Setup
Step 1: Install the Anthropic SDK
# Python — use uv for reproducible installs
uv pip install anthropic>=0.40.0 httpx tenacity
# Node 22
npm install @anthropic-ai/sdk@latest
Expected output: Successfully installed anthropic-0.40.x
If it fails:
ERROR: Could not find a version→ runuv pip install --upgrade pipfirstModuleNotFoundError: anthropic→ confirm your virtual environment is active
Step 2: Configure the API Key
# Never hard-code keys — export once per shell session
export ANTHROPIC_API_KEY="sk-ant-..."
Store the key in AWS Secrets Manager (us-east-1) or a .env file loaded by python-dotenv. Anthropic API pricing starts at $3 / 1M input tokens and $15 / 1M output tokens for Claude Sonnet 4.5 as of March 2026.
Function Calling (Tool Use) — Non-Streaming First
Before combining tools with streaming, nail the non-streamed case. It makes debugging far easier.
Step 3: Define a Tool Schema
import anthropic
import json
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from env
# Tool definitions follow JSON Schema — "required" is mandatory for deterministic calls
tools = [
{
"name": "get_stock_price",
"description": "Fetch the current stock price for a given ticker symbol from a US exchange.",
"input_schema": {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "Stock ticker, e.g. AAPL, MSFT, NVDA"
},
"exchange": {
"type": "string",
"enum": ["NYSE", "NASDAQ", "AMEX"],
"description": "US exchange where the stock is listed"
}
},
"required": ["ticker"] # exchange is optional — Claude fills it from context
}
}
]
Step 4: Send a Tool-Enabled Request
def run_tool_call(user_message: str) -> dict:
response = client.messages.create(
model="claude-sonnet-4-5", # exact model string — no aliases
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": user_message}]
)
# stop_reason == "tool_use" means Claude wants to invoke a function
if response.stop_reason == "tool_use":
tool_block = next(b for b in response.content if b.type == "tool_use")
print(f"Tool: {tool_block.name}")
print(f"Args: {json.dumps(tool_block.input, indent=2)}")
return {"tool": tool_block.name, "args": tool_block.input, "id": tool_block.id}
# stop_reason == "end_turn" — Claude answered without needing a tool
text_block = next(b for b in response.content if b.type == "text")
return {"text": text_block.text}
result = run_tool_call("What's the current price of NVDA on NASDAQ?")
Expected output:
Tool: get_stock_price
Args: {
"ticker": "NVDA",
"exchange": "NASDAQ"
}
Step 5: Return the Tool Result and Get the Final Answer
def execute_tool(name: str, args: dict) -> str:
# Replace with real data source — this stub returns a fixed price
if name == "get_stock_price":
return json.dumps({"ticker": args["ticker"], "price_usd": 875.42, "currency": "USD"})
raise ValueError(f"Unknown tool: {name}")
def full_tool_loop(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
return next(b.text for b in response.content if b.type == "text")
if response.stop_reason == "tool_use":
# Append Claude's response (including tool_use block) to history
messages.append({"role": "assistant", "content": response.content})
# Execute each tool Claude requested (may be multiple in one turn)
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id, # must match the id from Claude's block
"content": result
})
messages.append({"role": "user", "content": tool_results})
answer = full_tool_loop("What's NVDA trading at on NASDAQ right now?")
print(answer)
# → "NVIDIA (NVDA) is currently trading at $875.42 USD on NASDAQ."
Streaming — Text Only First
Step 6: Stream a Plain Text Response
def stream_text(prompt: str) -> None:
# stream=True returns a context manager; iterate over events
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True) # flush=True prevents buffering in terminals
print() # newline after stream ends
stream_text("Explain token streaming in LLM APIs in 3 sentences.")
Expected output: Words print to the terminal one chunk at a time, not all at once.
Streaming + Function Calling Together
This is where most implementations fail. The stream.text_stream helper drops non-text events — you must iterate raw events to capture tool_use blocks.
Step 7: Accumulate Tool Input Across Deltas
import anthropic
from collections import defaultdict
def stream_with_tools(user_message: str) -> str:
messages = [{"role": "user", "content": user_message}]
while True:
accumulated_content = [] # reconstruct full content list from events
current_tool_inputs = defaultdict(str) # index → partial JSON string
stop_reason = None
with client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=messages
) as stream:
for event in stream:
event_type = event.type
if event_type == "content_block_start":
block = event.content_block
if block.type == "text":
accumulated_content.append({"type": "text", "text": ""})
elif block.type == "tool_use":
# Store the block skeleton — input arrives as deltas
accumulated_content.append({
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": {} # filled in after stream ends
})
elif event_type == "content_block_delta":
delta = event.delta
idx = event.index # which content block this delta belongs to
if delta.type == "text_delta":
accumulated_content[idx]["text"] += delta.text
print(delta.text, end="", flush=True)
elif delta.type == "input_json_delta":
# Accumulate raw JSON string — parse ONLY after content_block_stop
current_tool_inputs[idx] += delta.partial_json
elif event_type == "content_block_stop":
idx = event.index
if accumulated_content[idx]["type"] == "tool_use":
# Now safe to parse — the JSON is complete
accumulated_content[idx]["input"] = json.loads(
current_tool_inputs[idx]
)
elif event_type == "message_delta":
stop_reason = event.delta.stop_reason
print() # newline after any streamed text
if stop_reason == "end_turn":
text_blocks = [b["text"] for b in accumulated_content if b["type"] == "text"]
return " ".join(text_blocks)
if stop_reason == "tool_use":
messages.append({"role": "assistant", "content": accumulated_content})
tool_results = []
for block in accumulated_content:
if block["type"] == "tool_use":
result = execute_tool(block["name"], block["input"])
tool_results.append({
"type": "tool_result",
"tool_use_id": block["id"],
"content": result
})
messages.append({"role": "user", "content": tool_results})
# Run it
answer = stream_with_tools("What is the current USD price of NVDA on NASDAQ?")
print(f"\nFinal: {answer}")
Expected output:
NVIDIA (NVDA) is currently trading at
Final: NVIDIA (NVDA) is currently trading at $875.42 USD on NASDAQ.
If it fails:
json.JSONDecodeError→ you parsedpartial_jsonbeforecontent_block_stop— move JSON parsing inside thecontent_block_stopbranchIndexError: list index out of range→event.indexis 0-based; initializeaccumulated_contentbefore the stream loopAttributeError: 'MessageStreamEvent' has no attribute 'index'→ update toanthropic>=0.35.0
Production Patterns
Retry Logic for Rate Limits
Anthropic rate-limits at 50 requests/min on the default tier ($20/month Workbench plan). Use tenacity for exponential backoff:
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic
@retry(
retry=retry_if_exception_type(anthropic.RateLimitError),
wait=wait_exponential(multiplier=1, min=2, max=60), # 2s → 4s → 8s … up to 60s
stop=stop_after_attempt(5)
)
def resilient_tool_call(messages: list, tools: list) -> anthropic.types.Message:
return client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=messages
)
Token Cost Tracking
def track_cost(usage: anthropic.types.Usage) -> float:
# Claude Sonnet 4.5 pricing as of March 2026 (USD)
INPUT_COST_PER_TOKEN = 3.00 / 1_000_000 # $3.00 / 1M input tokens
OUTPUT_COST_PER_TOKEN = 15.00 / 1_000_000 # $15.00 / 1M output tokens
cost = (
usage.input_tokens * INPUT_COST_PER_TOKEN +
usage.output_tokens * OUTPUT_COST_PER_TOKEN
)
print(f"Tokens: {usage.input_tokens} in / {usage.output_tokens} out — ${cost:.6f} USD")
return cost
Parallel Tool Calls
Claude Sonnet 4.5 can request multiple tools in a single turn. The loop in Step 5 and Step 7 already handles this — the for block in accumulated_content loop processes every tool_use block before sending results back.
To execute tools concurrently:
import asyncio
async def execute_tools_parallel(blocks: list) -> list:
tasks = [
asyncio.to_thread(execute_tool, b["name"], b["input"])
for b in blocks if b["type"] == "tool_use"
]
results = await asyncio.gather(*tasks)
return [
{"type": "tool_result", "tool_use_id": b["id"], "content": r}
for b, r in zip(
[b for b in blocks if b["type"] == "tool_use"], results
)
]
Node 22 Equivalent
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic(); // reads process.env.ANTHROPIC_API_KEY
async function streamWithTools(userMessage: string): Promise<string> {
const messages: Anthropic.MessageParam[] = [
{ role: "user", content: userMessage },
];
while (true) {
const accumulatedContent: Anthropic.ContentBlock[] = [];
const toolInputBuffers: Record<number, string> = {};
let stopReason: string | null = null;
const stream = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 1024,
tools: tools as Anthropic.Tool[],
messages,
stream: true,
});
for await (const event of stream) {
if (event.type === "content_block_start") {
const block = event.content_block;
if (block.type === "text") {
accumulatedContent.push({ type: "text", text: "" });
} else if (block.type === "tool_use") {
accumulatedContent.push({ type: "tool_use", id: block.id, name: block.name, input: {} });
}
} else if (event.type === "content_block_delta") {
const { index, delta } = event;
if (delta.type === "text_delta") {
(accumulatedContent[index] as Anthropic.TextBlock).text += delta.text;
process.stdout.write(delta.text);
} else if (delta.type === "input_json_delta") {
toolInputBuffers[index] = (toolInputBuffers[index] ?? "") + delta.partial_json;
}
} else if (event.type === "content_block_stop") {
const block = accumulatedContent[event.index];
if (block.type === "tool_use") {
(block as Anthropic.ToolUseBlock).input = JSON.parse(toolInputBuffers[event.index] ?? "{}");
}
} else if (event.type === "message_delta") {
stopReason = event.delta.stop_reason ?? null;
}
}
process.stdout.write("\n");
if (stopReason === "end_turn") {
return accumulatedContent
.filter((b) => b.type === "text")
.map((b) => (b as Anthropic.TextBlock).text)
.join(" ");
}
if (stopReason === "tool_use") {
messages.push({ role: "assistant", content: accumulatedContent });
const toolResults: Anthropic.ToolResultBlockParam[] = accumulatedContent
.filter((b) => b.type === "tool_use")
.map((b) => {
const tb = b as Anthropic.ToolUseBlock;
return {
type: "tool_result",
tool_use_id: tb.id,
content: executeTool(tb.name, tb.input as Record<string, string>),
};
});
messages.push({ role: "user", content: toolResults });
}
}
}
Comparison: Claude Sonnet 4.5 vs GPT-4o Function Calling
| Claude Sonnet 4.5 | GPT-4o | |
|---|---|---|
| Tool call format | tool_use content block | function_call message role |
| Parallel tools | ✅ Multiple per turn | ✅ Multiple per turn |
| Streaming tool args | input_json_delta events | function_call.arguments delta |
| Forced tool use | tool_choice: {"type": "tool", "name": "..."} | tool_choice: {"function": {"name": "..."}} |
| Input token price (USD) | $3.00 / 1M | $2.50 / 1M |
| Output token price (USD) | $15.00 / 1M | $10.00 / 1M |
| Context window | 200K tokens | 128K tokens |
Choose Claude Sonnet 4.5 if: you need a 200K context window, long multi-turn tool loops, or Anthropic's Constitutional AI safety layer. Choose GPT-4o if: you're already on Azure OpenAI (us-east-1 or us-west-2) and want lower output token cost.
Verification
python - <<'EOF'
import anthropic, os
client = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])
r = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=32,
messages=[{"role": "user", "content": "Reply: OK"}]
)
print(r.content[0].text, r.stop_reason)
EOF
You should see: OK end_turn
What You Learned
tool_useblocks in streamed responses arrive asinput_json_deltaevents — parse JSON only aftercontent_block_stop- Always use the
indexfield on delta events to route chunks to the correct content block - The multi-turn loop (append assistant response → append tool results → repeat) is identical for streaming and non-streaming; only the accumulation step differs
- Claude Sonnet 4.5 supports 200K token context, which makes it practical for tool loops with large retrieved documents
- At $3/$15 per 1M tokens, track
usage.input_tokensandusage.output_tokenson every response to control costs
Tested on anthropic 0.40.0 · Python 3.12.3 · Node 22.3 · macOS Sequoia & Ubuntu 24.04
FAQ
Q: Can I force Claude to always call a specific tool?
A: Yes — set tool_choice={"type": "tool", "name": "get_stock_price"} in the messages.create call. Claude will invoke that tool regardless of the user's message.
Q: What happens if Claude calls a tool that doesn't exist in my tools list?
A: The API validates tool calls against your schema at request time — Claude can only invoke tools you declared. If you remove a tool mid-conversation, send an updated tools list on the next turn.
Q: Does streaming increase latency for the first token? A: Time-to-first-token is identical between stream and non-stream modes. Streaming only changes how tokens arrive on the client — useful when output tokens are numerous (>200) and you want to show progressive UI.
Q: What is the maximum number of tools I can define per request? A: Anthropic's current limit is 64 tool definitions per request. Tool definitions count against your input token budget — a large schema with 20+ tools can add 1,000–3,000 input tokens per call.
Q: Can this run on AWS Lambda (us-east-1)?
A: Yes. Set ANTHROPIC_API_KEY as a Lambda environment variable or pull from AWS Secrets Manager. Set timeout to at least 30 seconds for streamed responses; the default 3-second Lambda timeout will cut the stream short.