Problem: Robots Need Natural Language Control
You want to control a robot with plain English commands like "move forward 2 meters" instead of writing ROS2 code or using complex UIs, but connecting LLMs to robot systems safely is unclear.
You'll learn:
- How to parse natural language into robot commands
- Safe command validation before execution
- Real-time feedback from robot to chat interface
- Error handling for ambiguous or unsafe requests
Time: 45 min | Level: Advanced
Why This Matters
Traditional robot control requires knowing ROS topics, message formats, and coordinate systems. LLMs can translate natural language into these low-level commands, but naive implementations risk dangerous actions from misinterpreted prompts.
Common challenges:
- LLM generates syntactically correct but physically dangerous commands
- No feedback loop between robot state and chat
- Handling ambiguous requests ("move left" - how far?)
- Latency between command and execution
Prerequisites
# Check you have these installed
python --version # 3.11+
node --version # 22+
ros2 --version # Humble or newer
# Install core dependencies
pip install langchain langchain-anthropic rclpy --break-system-packages
npm install -g bun # Faster than npm for frontend
Solution
Step 1: Set Up LangChain with Safety Tools
# robot_agent.py
from langchain_anthropic import ChatAnthropic
from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
import rclpy
from geometry_msgs.msg import Twist
class RobotController:
def __init__(self):
rclpy.init()
self.node = rclpy.create_node('langchain_controller')
self.cmd_pub = self.node.create_publisher(Twist, '/cmd_vel', 10)
# Safety limits - prevents dangerous commands
self.max_linear = 0.5 # m/s
self.max_angular = 1.0 # rad/s
def validate_and_execute(self, linear: float, angular: float, duration: float) -> str:
"""Execute command only if within safety bounds"""
if abs(linear) > self.max_linear:
return f"Error: Speed {linear} exceeds max {self.max_linear} m/s"
if duration > 10.0:
return "Error: Duration limited to 10 seconds for safety"
# Execute command
msg = Twist()
msg.linear.x = linear
msg.angular.z = angular
start_time = self.node.get_clock().now()
while (self.node.get_clock().now() - start_time).nanoseconds < duration * 1e9:
self.cmd_pub.publish(msg)
rclpy.spin_once(self.node, timeout_sec=0.01)
# Stop robot
msg.linear.x = 0.0
msg.angular.z = 0.0
self.cmd_pub.publish(msg)
return f"Executed: {linear} m/s for {duration}s"
# Initialize controller
robot = RobotController()
def move_forward(distance: str) -> str:
"""Move robot forward by specified distance in meters"""
try:
dist = float(distance)
duration = abs(dist) / 0.3 # Fixed speed 0.3 m/s
speed = 0.3 if dist > 0 else -0.3
return robot.validate_and_execute(speed, 0.0, duration)
except ValueError:
return f"Error: '{distance}' is not a valid number"
def turn_robot(angle: str) -> str:
"""Turn robot by specified angle in degrees"""
try:
degrees = float(angle)
radians = degrees * 3.14159 / 180
duration = abs(radians) / 0.5 # Fixed rotation speed
angular = 0.5 if radians > 0 else -0.5
return robot.validate_and_execute(0.0, angular, duration)
except ValueError:
return f"Error: '{angle}' is not a valid number"
def stop_robot(_: str) -> str:
"""Emergency stop"""
return robot.validate_and_execute(0.0, 0.0, 0.0)
Why this works: Each tool validates inputs before execution. The LLM can't directly publish to ROS topics - it must use these bounded functions.
Step 2: Create the LangChain Agent
# Configure Claude with function calling
llm = ChatAnthropic(
model="claude-sonnet-4-20250514",
temperature=0, # Deterministic for safety
api_key="your-api-key"
)
# Define available tools
tools = [
Tool(
name="move_forward",
func=move_forward,
description="Move robot forward (positive) or backward (negative) by distance in meters. Example: move_forward('2.5')"
),
Tool(
name="turn_robot",
func=turn_robot,
description="Turn robot left (positive) or right (negative) by angle in degrees. Example: turn_robot('90')"
),
Tool(
name="stop_robot",
func=stop_robot,
description="Emergency stop. Call this if user says stop, halt, or freeze."
)
]
# System prompt with safety instructions
template = """You are a robot control assistant. Convert user commands into safe robot actions.
Available tools: {tools}
SAFETY RULES:
1. Always confirm ambiguous commands ("How far should I move?")
2. Refuse dangerous requests (speeds over limits, near obstacles)
3. Break complex commands into steps
4. Use stop_robot if user seems panicked
User request: {input}
Reasoning: {agent_scratchpad}"""
prompt = PromptTemplate(
template=template,
input_variables=["input", "agent_scratchpad"],
partial_variables={"tools": "\n".join([f"{t.name}: {t.description}" for t in tools])}
)
# Create ReAct agent
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True,
max_iterations=3, # Prevent infinite loops
handle_parsing_errors=True
)
Expected: Agent can now interpret commands and call tools. Test with agent_executor.invoke({"input": "move forward 1 meter"})
If it fails:
- Error: "API key not found": Set
ANTHROPIC_API_KEYenvironment variable - Timeout errors: Check ROS2 daemon is running with
ros2 daemon status
Step 3: Build React Chat Interface
// app/page.tsx
'use client';
import { useState } from 'react';
interface Message {
role: 'user' | 'assistant' | 'system';
content: string;
timestamp: Date;
}
export default function RobotChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isExecuting, setIsExecuting] = useState(false);
const sendCommand = async () => {
if (!input.trim() || isExecuting) return;
const userMsg: Message = {
role: 'user',
content: input,
timestamp: new Date()
};
setMessages(prev => [...prev, userMsg]);
setInput('');
setIsExecuting(true);
try {
// Call Python backend
const response = await fetch('http://localhost:8000/execute', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ command: input })
});
const data = await response.json();
setMessages(prev => [...prev, {
role: 'assistant',
content: data.result,
timestamp: new Date()
}]);
} catch (error) {
setMessages(prev => [...prev, {
role: 'system',
content: `Error: ${error.message}`,
timestamp: new Date()
}]);
} finally {
setIsExecuting(false);
}
};
return (
<div className="flex flex-col h-screen bg-gray-900 text-white">
{/* Header */}
<div className="p-4 border-b border-gray-700">
<h1 className="text-xl font-bold">Robot Control Interface</h1>
<div className="flex gap-4 mt-2 text-sm">
<span className={isExecuting ? "text-yellow-400" : "text-green-400"}>
● {isExecuting ? "Executing..." : "Ready"}
</span>
</div>
</div>
{/* Messages */}
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{messages.map((msg, i) => (
<div
key={i}
className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-md px-4 py-2 rounded-lg ${
msg.role === 'user'
? 'bg-blue-600'
: msg.role === 'system'
? 'bg-red-600'
: 'bg-gray-700'
}`}
>
<div className="text-xs opacity-70 mb-1">
{msg.role.toUpperCase()} • {msg.timestamp.toLocaleTimeString()}
</div>
<div className="whitespace-pre-wrap">{msg.content}</div>
</div>
</div>
))}
</div>
{/* Input */}
<div className="p-4 border-t border-gray-700">
<div className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
onKeyPress={(e) => e.key === 'Enter' && sendCommand()}
placeholder="Tell the robot what to do..."
className="flex-1 px-4 py-2 bg-gray-800 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
disabled={isExecuting}
/>
<button
onClick={sendCommand}
disabled={isExecuting || !input.trim()}
className="px-6 py-2 bg-blue-600 rounded-lg hover:bg-blue-700 disabled:opacity-50 disabled:cursor-not-allowed"
>
Send
</button>
</div>
<div className="mt-2 text-xs text-gray-400">
Try: "move forward 2 meters" or "turn left 90 degrees"
</div>
</div>
</div>
);
}
Step 4: Create FastAPI Backend Bridge
# server.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import asyncio
app = FastAPI()
# Enable CORS for React frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_methods=["*"],
allow_headers=["*"]
)
class Command(BaseModel):
command: str
@app.post("/execute")
async def execute_command(cmd: Command):
try:
# Run agent in background thread to avoid blocking
result = await asyncio.to_thread(
agent_executor.invoke,
{"input": cmd.command}
)
return {"result": result["output"], "success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
return {"status": "operational", "ros_connected": robot.node is not None}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Why this architecture: FastAPI handles HTTP, LangChain processes language, ROS2 controls hardware. Each layer has clear responsibilities.
Verification
Test the Complete System
# Terminal 1: Start ROS2 (or simulator)
ros2 launch turtlebot3_gazebo empty_world.launch.py
# Terminal 2: Start backend
python server.py
# Terminal 3: Start frontend
cd app && bun run dev
You should see:
- Backend logs show "Application startup complete"
- Frontend at
http://localhost:3000shows chat interface - Test command: "move forward 0.5 meters then turn right 45 degrees"
Expected behavior: Robot moves forward, pauses, then turns. Chat shows execution feedback at each step.
If it fails:
- "Connection refused": Check all three terminals are running
- Robot doesn't move: Verify topic names with
ros2 topic list - LLM doesn't call tools: Check API key and model name
Production Hardening
Add Obstacle Detection
from sensor_msgs.msg import LaserScan
class RobotController:
def __init__(self):
# ... existing code ...
self.min_distance = float('inf')
self.scan_sub = self.node.create_subscription(
LaserScan,
'/scan',
self.scan_callback,
10
)
def scan_callback(self, msg: LaserScan):
self.min_distance = min(msg.ranges)
def validate_and_execute(self, linear, angular, duration):
# Add obstacle check
if linear > 0 and self.min_distance < 0.5:
return "Error: Obstacle detected within 0.5m. Command blocked."
# ... rest of validation ...
Add Command History
# Store last 50 commands for context
from collections import deque
command_history = deque(maxlen=50)
@app.post("/execute")
async def execute_command(cmd: Command):
command_history.append({
"command": cmd.command,
"timestamp": datetime.now().isoformat()
})
# Include history in agent context
context = "\n".join([f"- {h['command']}" for h in command_history])
result = await asyncio.to_thread(
agent_executor.invoke,
{"input": f"Recent commands:\n{context}\n\nNew command: {cmd.command}"}
)
# ... rest of handler ...
What You Learned
- LangChain agents bridge natural language and robot APIs safely
- Tool-based architecture prevents direct LLM control of hardware
- React provides real-time feedback better than terminal interfaces
- Validation layers are critical - LLMs can generate dangerous commands
Limitations:
- 200-500ms latency per command (LLM inference time)
- Struggles with spatial reasoning ("move to the door")
- Requires explicit tools for each robot capability
Security Considerations
NEVER expose this publicly without:
- Authentication (JWT tokens, API keys)
- Rate limiting (max 10 commands/minute)
- Command logging for audit trails
- Emergency stop accessible outside software
- Geofencing or workspace boundaries
Tested with ROS2 Humble, LangChain 0.1.x, Claude Sonnet 4, Ubuntu 24.04