Build a ChatGPT for Robotics Interface in 45 Minutes

Create a natural language control system for robots using LangChain, React, and ROS2 with real-time command execution and safety checks.

Problem: Robots Need Natural Language Control

You want to control a robot with plain English commands like "move forward 2 meters" instead of writing ROS2 code or using complex UIs, but connecting LLMs to robot systems safely is unclear.

You'll learn:

  • How to parse natural language into robot commands
  • Safe command validation before execution
  • Real-time feedback from robot to chat interface
  • Error handling for ambiguous or unsafe requests

Time: 45 min | Level: Advanced


Why This Matters

Traditional robot control requires knowing ROS topics, message formats, and coordinate systems. LLMs can translate natural language into these low-level commands, but naive implementations risk dangerous actions from misinterpreted prompts.

Common challenges:

  • LLM generates syntactically correct but physically dangerous commands
  • No feedback loop between robot state and chat
  • Handling ambiguous requests ("move left" - how far?)
  • Latency between command and execution

Prerequisites

# Check you have these installed
python --version  # 3.11+
node --version    # 22+
ros2 --version    # Humble or newer

# Install core dependencies
pip install langchain langchain-anthropic rclpy --break-system-packages
npm install -g bun  # Faster than npm for frontend

Solution

Step 1: Set Up LangChain with Safety Tools

# robot_agent.py
from langchain_anthropic import ChatAnthropic
from langchain.agents import Tool, AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
import rclpy
from geometry_msgs.msg import Twist

class RobotController:
    def __init__(self):
        rclpy.init()
        self.node = rclpy.create_node('langchain_controller')
        self.cmd_pub = self.node.create_publisher(Twist, '/cmd_vel', 10)
        
        # Safety limits - prevents dangerous commands
        self.max_linear = 0.5  # m/s
        self.max_angular = 1.0  # rad/s
    
    def validate_and_execute(self, linear: float, angular: float, duration: float) -> str:
        """Execute command only if within safety bounds"""
        
        if abs(linear) > self.max_linear:
            return f"Error: Speed {linear} exceeds max {self.max_linear} m/s"
        
        if duration > 10.0:
            return "Error: Duration limited to 10 seconds for safety"
        
        # Execute command
        msg = Twist()
        msg.linear.x = linear
        msg.angular.z = angular
        
        start_time = self.node.get_clock().now()
        while (self.node.get_clock().now() - start_time).nanoseconds < duration * 1e9:
            self.cmd_pub.publish(msg)
            rclpy.spin_once(self.node, timeout_sec=0.01)
        
        # Stop robot
        msg.linear.x = 0.0
        msg.angular.z = 0.0
        self.cmd_pub.publish(msg)
        
        return f"Executed: {linear} m/s for {duration}s"

# Initialize controller
robot = RobotController()

def move_forward(distance: str) -> str:
    """Move robot forward by specified distance in meters"""
    try:
        dist = float(distance)
        duration = abs(dist) / 0.3  # Fixed speed 0.3 m/s
        speed = 0.3 if dist > 0 else -0.3
        return robot.validate_and_execute(speed, 0.0, duration)
    except ValueError:
        return f"Error: '{distance}' is not a valid number"

def turn_robot(angle: str) -> str:
    """Turn robot by specified angle in degrees"""
    try:
        degrees = float(angle)
        radians = degrees * 3.14159 / 180
        duration = abs(radians) / 0.5  # Fixed rotation speed
        angular = 0.5 if radians > 0 else -0.5
        return robot.validate_and_execute(0.0, angular, duration)
    except ValueError:
        return f"Error: '{angle}' is not a valid number"

def stop_robot(_: str) -> str:
    """Emergency stop"""
    return robot.validate_and_execute(0.0, 0.0, 0.0)

Why this works: Each tool validates inputs before execution. The LLM can't directly publish to ROS topics - it must use these bounded functions.


Step 2: Create the LangChain Agent

# Configure Claude with function calling
llm = ChatAnthropic(
    model="claude-sonnet-4-20250514",
    temperature=0,  # Deterministic for safety
    api_key="your-api-key"
)

# Define available tools
tools = [
    Tool(
        name="move_forward",
        func=move_forward,
        description="Move robot forward (positive) or backward (negative) by distance in meters. Example: move_forward('2.5')"
    ),
    Tool(
        name="turn_robot",
        func=turn_robot,
        description="Turn robot left (positive) or right (negative) by angle in degrees. Example: turn_robot('90')"
    ),
    Tool(
        name="stop_robot",
        func=stop_robot,
        description="Emergency stop. Call this if user says stop, halt, or freeze."
    )
]

# System prompt with safety instructions
template = """You are a robot control assistant. Convert user commands into safe robot actions.

Available tools: {tools}

SAFETY RULES:
1. Always confirm ambiguous commands ("How far should I move?")
2. Refuse dangerous requests (speeds over limits, near obstacles)
3. Break complex commands into steps
4. Use stop_robot if user seems panicked

User request: {input}

Reasoning: {agent_scratchpad}"""

prompt = PromptTemplate(
    template=template,
    input_variables=["input", "agent_scratchpad"],
    partial_variables={"tools": "\n".join([f"{t.name}: {t.description}" for t in tools])}
)

# Create ReAct agent
agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    max_iterations=3,  # Prevent infinite loops
    handle_parsing_errors=True
)

Expected: Agent can now interpret commands and call tools. Test with agent_executor.invoke({"input": "move forward 1 meter"})

If it fails:

  • Error: "API key not found": Set ANTHROPIC_API_KEY environment variable
  • Timeout errors: Check ROS2 daemon is running with ros2 daemon status

Step 3: Build React Chat Interface

// app/page.tsx
'use client';
import { useState } from 'react';

interface Message {
  role: 'user' | 'assistant' | 'system';
  content: string;
  timestamp: Date;
}

export default function RobotChat() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState('');
  const [isExecuting, setIsExecuting] = useState(false);

  const sendCommand = async () => {
    if (!input.trim() || isExecuting) return;

    const userMsg: Message = {
      role: 'user',
      content: input,
      timestamp: new Date()
    };
    
    setMessages(prev => [...prev, userMsg]);
    setInput('');
    setIsExecuting(true);

    try {
      // Call Python backend
      const response = await fetch('http://localhost:8000/execute', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ command: input })
      });

      const data = await response.json();
      
      setMessages(prev => [...prev, {
        role: 'assistant',
        content: data.result,
        timestamp: new Date()
      }]);
    } catch (error) {
      setMessages(prev => [...prev, {
        role: 'system',
        content: `Error: ${error.message}`,
        timestamp: new Date()
      }]);
    } finally {
      setIsExecuting(false);
    }
  };

  return (
    <div className="flex flex-col h-screen bg-gray-900 text-white">
      {/* Header */}
      <div className="p-4 border-b border-gray-700">
        <h1 className="text-xl font-bold">Robot Control Interface</h1>
        <div className="flex gap-4 mt-2 text-sm">
          <span className={isExecuting ? "text-yellow-400" : "text-green-400"}>
             {isExecuting ? "Executing..." : "Ready"}
          </span>
        </div>
      </div>

      {/* Messages */}
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {messages.map((msg, i) => (
          <div
            key={i}
            className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <div
              className={`max-w-md px-4 py-2 rounded-lg ${
                msg.role === 'user'
                  ? 'bg-blue-600'
                  : msg.role === 'system'
                  ? 'bg-red-600'
                  : 'bg-gray-700'
              }`}
            >
              <div className="text-xs opacity-70 mb-1">
                {msg.role.toUpperCase()}  {msg.timestamp.toLocaleTimeString()}
              </div>
              <div className="whitespace-pre-wrap">{msg.content}</div>
            </div>
          </div>
        ))}
      </div>

      {/* Input */}
      <div className="p-4 border-t border-gray-700">
        <div className="flex gap-2">
          <input
            type="text"
            value={input}
            onChange={(e) => setInput(e.target.value)}
            onKeyPress={(e) => e.key === 'Enter' && sendCommand()}
            placeholder="Tell the robot what to do..."
            className="flex-1 px-4 py-2 bg-gray-800 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500"
            disabled={isExecuting}
          />
          <button
            onClick={sendCommand}
            disabled={isExecuting || !input.trim()}
            className="px-6 py-2 bg-blue-600 rounded-lg hover:bg-blue-700 disabled:opacity-50 disabled:cursor-not-allowed"
          >
            Send
          </button>
        </div>
        <div className="mt-2 text-xs text-gray-400">
          Try: "move forward 2 meters" or "turn left 90 degrees"
        </div>
      </div>
    </div>
  );
}

Step 4: Create FastAPI Backend Bridge

# server.py
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import asyncio

app = FastAPI()

# Enable CORS for React frontend
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_methods=["*"],
    allow_headers=["*"]
)

class Command(BaseModel):
    command: str

@app.post("/execute")
async def execute_command(cmd: Command):
    try:
        # Run agent in background thread to avoid blocking
        result = await asyncio.to_thread(
            agent_executor.invoke,
            {"input": cmd.command}
        )
        return {"result": result["output"], "success": True}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "operational", "ros_connected": robot.node is not None}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Why this architecture: FastAPI handles HTTP, LangChain processes language, ROS2 controls hardware. Each layer has clear responsibilities.


Verification

Test the Complete System

# Terminal 1: Start ROS2 (or simulator)
ros2 launch turtlebot3_gazebo empty_world.launch.py

# Terminal 2: Start backend
python server.py

# Terminal 3: Start frontend
cd app && bun run dev

You should see:

  1. Backend logs show "Application startup complete"
  2. Frontend at http://localhost:3000 shows chat interface
  3. Test command: "move forward 0.5 meters then turn right 45 degrees"

Expected behavior: Robot moves forward, pauses, then turns. Chat shows execution feedback at each step.

If it fails:

  • "Connection refused": Check all three terminals are running
  • Robot doesn't move: Verify topic names with ros2 topic list
  • LLM doesn't call tools: Check API key and model name

Production Hardening

Add Obstacle Detection

from sensor_msgs.msg import LaserScan

class RobotController:
    def __init__(self):
        # ... existing code ...
        self.min_distance = float('inf')
        self.scan_sub = self.node.create_subscription(
            LaserScan,
            '/scan',
            self.scan_callback,
            10
        )
    
    def scan_callback(self, msg: LaserScan):
        self.min_distance = min(msg.ranges)
    
    def validate_and_execute(self, linear, angular, duration):
        # Add obstacle check
        if linear > 0 and self.min_distance < 0.5:
            return "Error: Obstacle detected within 0.5m. Command blocked."
        
        # ... rest of validation ...

Add Command History

# Store last 50 commands for context
from collections import deque

command_history = deque(maxlen=50)

@app.post("/execute")
async def execute_command(cmd: Command):
    command_history.append({
        "command": cmd.command,
        "timestamp": datetime.now().isoformat()
    })
    
    # Include history in agent context
    context = "\n".join([f"- {h['command']}" for h in command_history])
    result = await asyncio.to_thread(
        agent_executor.invoke,
        {"input": f"Recent commands:\n{context}\n\nNew command: {cmd.command}"}
    )
    # ... rest of handler ...

What You Learned

  • LangChain agents bridge natural language and robot APIs safely
  • Tool-based architecture prevents direct LLM control of hardware
  • React provides real-time feedback better than terminal interfaces
  • Validation layers are critical - LLMs can generate dangerous commands

Limitations:

  • 200-500ms latency per command (LLM inference time)
  • Struggles with spatial reasoning ("move to the door")
  • Requires explicit tools for each robot capability

Security Considerations

NEVER expose this publicly without:

  1. Authentication (JWT tokens, API keys)
  2. Rate limiting (max 10 commands/minute)
  3. Command logging for audit trails
  4. Emergency stop accessible outside software
  5. Geofencing or workspace boundaries

Tested with ROS2 Humble, LangChain 0.1.x, Claude Sonnet 4, Ubuntu 24.04