Build a Factory Digital Twin with Conveyors and Arms in 30 Minutes

Simulate a real factory line—conveyors, robotic arms, and sensors—using Python and a lightweight digital twin framework. No hardware required.

Problem: You Can't Test Factory Logic Without a Real Factory

Physical factory lines are expensive, dangerous to modify, and impossible to test failure scenarios on safely. You need a way to validate control logic, spot bottlenecks, and train ML models before a single bolt is turned.

A digital twin solves this. It's a live software model that mirrors physical behavior in real time.

You'll learn:

  • How to model conveyors, robotic arms, and sensors as Python objects
  • How to run a time-stepped simulation loop that generates realistic telemetry
  • How to detect jams and trigger corrective arm movements automatically

Time: 30 min | Level: Intermediate


Why This Happens

Most tutorials show digital twins as dashboards—they display data, they don't simulate it. A real digital twin has internal state that evolves over time, just like physical hardware.

The key insight: a factory line is just a state machine. Each component holds state (position, speed, load), receives inputs (commands, sensor readings), and produces outputs (telemetry, events).

Common symptoms that send people here:

  • "How do I test my PLC logic without live hardware?"
  • Control code that works in isolation but breaks when components interact
  • No way to generate training data for anomaly detection models

Solution

Step 1: Model the Conveyor Belt

A conveyor has speed, a queue of items, and a jam state. That's it.

# conveyor.py
import time
from dataclasses import dataclass, field
from typing import List

@dataclass
class ConveyorItem:
    id: str
    weight_kg: float
    position_m: float = 0.0  # Distance along belt in meters

class ConveyorBelt:
    def __init__(self, name: str, length_m: float = 5.0, max_speed_mps: float = 0.5):
        self.name = name
        self.length_m = length_m
        self.max_speed_mps = max_speed_mps
        
        self.speed_mps: float = 0.0
        self.items: List[ConveyorItem] = []
        self.jammed: bool = False
        self._last_tick = time.monotonic()
    
    def set_speed(self, speed: float) -> None:
        # Clamp speed to safe range — negative means reverse
        self.speed_mps = max(-self.max_speed_mps, min(speed, self.max_speed_mps))
    
    def load_item(self, item: ConveyorItem) -> bool:
        if self.jammed:
            return False  # Don't accept items when jammed
        item.position_m = 0.0
        self.items.append(item)
        return True
    
    def tick(self, dt: float) -> List[ConveyorItem]:
        """Advance simulation by dt seconds. Returns items that exited the belt."""
        if self.jammed or abs(self.speed_mps) < 0.001:
            return []
        
        exited = []
        for item in self.items:
            item.position_m += self.speed_mps * dt
        
        # Check for exit
        remaining = []
        for item in self.items:
            if item.position_m >= self.length_m:
                exited.append(item)
            else:
                remaining.append(item)
        self.items = remaining
        
        # Jam detection: items too close together cause a jam
        if len(self.items) >= 2:
            self.items.sort(key=lambda x: x.position_m)
            for i in range(1, len(self.items)):
                gap = self.items[i].position_m - self.items[i-1].position_m
                if gap < 0.15:  # Less than 15cm gap → jam
                    self.jammed = True
                    self.speed_mps = 0.0
                    break
        
        return exited
    
    def clear_jam(self) -> None:
        self.jammed = False
        # Spread items apart to prevent immediate re-jam
        for i, item in enumerate(self.items):
            item.position_m = i * 0.3
    
    @property
    def telemetry(self) -> dict:
        return {
            "component": self.name,
            "type": "conveyor",
            "speed_mps": self.speed_mps,
            "item_count": len(self.items),
            "jammed": self.jammed,
            "utilization": len(self.items) / max(int(self.length_m / 0.3), 1)
        }

Expected: The ConveyorBelt class tracks items, advances them each tick, and raises jammed = True when items bunch up.

If it fails:

  • Items never exit: Check that speed_mps > 0 before calling tick()
  • Immediate jam on load: Your items are loading faster than the belt clears them — reduce load rate or increase belt speed

Step 2: Model the Robotic Arm

The arm picks items off the end of one conveyor and places them on another. It has a reach limit, a cycle time, and can be in one of three states.

# robotic_arm.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional

class ArmState(Enum):
    IDLE = "idle"
    REACHING = "reaching"
    GRIPPING = "gripping"
    PLACING = "placing"
    RETURNING = "returning"

@dataclass
class JointAngles:
    base_deg: float = 0.0    # Rotation around vertical axis
    shoulder_deg: float = 90.0
    elbow_deg: float = 0.0

class RoboticArm:
    def __init__(self, name: str, cycle_time_s: float = 2.0):
        self.name = name
        self.cycle_time_s = cycle_time_s  # Full pick-and-place takes this long
        
        self.state = ArmState.IDLE
        self.angles = JointAngles()
        self.held_item: Optional[object] = None
        self._state_elapsed: float = 0.0
        self._target_conveyor = None
    
    def pick(self, item, from_conveyor) -> bool:
        if self.state != ArmState.IDLE:
            return False  # Arm busy — caller must retry
        self.state = ArmState.REACHING
        self.held_item = item
        self._state_elapsed = 0.0
        from_conveyor.items.remove(item)
        return True
    
    def tick(self, dt: float, target_conveyor=None) -> bool:
        """Returns True when a place action completes."""
        self._state_elapsed += dt
        phase_time = self.cycle_time_s / 4  # 4 equal phases
        
        if self._state_elapsed < phase_time:
            self.state = ArmState.REACHING
            # Interpolate arm toward pick position
            progress = self._state_elapsed / phase_time
            self.angles.shoulder_deg = 90.0 - (30.0 * progress)
        
        elif self._state_elapsed < phase_time * 2:
            self.state = ArmState.GRIPPING
            self.angles.shoulder_deg = 60.0
        
        elif self._state_elapsed < phase_time * 3:
            self.state = ArmState.PLACING
            progress = (self._state_elapsed - phase_time * 2) / phase_time
            # Rotate base toward target conveyor
            self.angles.base_deg = progress * 90.0
            if target_conveyor and progress > 0.9:
                target_conveyor.load_item(self.held_item)
                self.held_item = None
        
        elif self._state_elapsed >= phase_time * 3:
            self.state = ArmState.RETURNING
            if self._state_elapsed >= self.cycle_time_s:
                self.state = ArmState.IDLE
                self.angles = JointAngles()
                self._state_elapsed = 0.0
                return True  # Cycle complete
        
        return False
    
    @property
    def telemetry(self) -> dict:
        return {
            "component": self.name,
            "type": "robotic_arm",
            "state": self.state.value,
            "base_deg": self.angles.base_deg,
            "shoulder_deg": self.angles.shoulder_deg,
            "holding": self.held_item.id if self.held_item else None
        }

Expected: The arm cycles through four phases automatically. Each tick() call advances the animation.


Step 3: Wire the Simulation Loop

Now connect the components. The simulation loop runs at a fixed timestep and broadcasts telemetry each cycle.

# factory_twin.py
import time
import uuid
import random
import json
from conveyor import ConveyorBelt, ConveyorItem
from robotic_arm import RoboticArm

class FactoryLine:
    def __init__(self):
        # Two-stage line: infeed → arm → outfeed
        self.infeed = ConveyorBelt("infeed", length_m=5.0, max_speed_mps=0.4)
        self.outfeed = ConveyorBelt("outfeed", length_m=8.0, max_speed_mps=0.6)
        self.arm = RoboticArm("arm_1", cycle_time_s=2.5)
        
        self.infeed.set_speed(0.35)
        self.outfeed.set_speed(0.55)
        
        self._item_spawn_interval = 3.5  # seconds between new items
        self._time_since_spawn = 0.0
        self.telemetry_log: list = []
    
    def _spawn_item(self) -> None:
        item = ConveyorItem(
            id=str(uuid.uuid4())[:8],
            weight_kg=random.uniform(0.5, 5.0)
        )
        success = self.infeed.load_item(item)
        if not success:
            print(f"[WARN] Could not load item {item.id} — belt jammed")
    
    def step(self, dt: float) -> dict:
        """Advance all components by dt seconds. Returns aggregated telemetry."""
        self._time_since_spawn += dt
        
        # Spawn items at regular intervals
        if self._time_since_spawn >= self._item_spawn_interval:
            self._spawn_item()
            self._time_since_spawn = 0.0
        
        # Tick conveyors
        exited_infeed = self.infeed.tick(dt)
        self.outfeed.tick(dt)
        
        # Auto-clear jams with arm assist
        if self.infeed.jammed:
            print(f"[EVENT] Jam detected on {self.infeed.name} — clearing")
            self.infeed.clear_jam()
        
        # Pick items that reached end of infeed
        if exited_infeed and self.arm.state.value == "idle":
            item = exited_infeed[0]  # Take first available
            self.arm.pick(item, self.infeed)
        
        # Tick arm
        self.arm.tick(dt, target_conveyor=self.outfeed)
        
        # Collect telemetry snapshot
        snapshot = {
            "timestamp": time.time(),
            "infeed": self.infeed.telemetry,
            "outfeed": self.outfeed.telemetry,
            "arm": self.arm.telemetry
        }
        self.telemetry_log.append(snapshot)
        return snapshot
    
    def run(self, duration_s: float = 60.0, tick_rate_hz: float = 10.0) -> None:
        dt = 1.0 / tick_rate_hz
        elapsed = 0.0
        
        print(f"[START] Factory twin running for {duration_s}s at {tick_rate_hz}Hz")
        
        while elapsed < duration_s:
            snapshot = self.step(dt)
            
            # Print telemetry every 5 seconds
            if int(elapsed) % 5 == 0 and elapsed > 0:
                print(json.dumps(snapshot, indent=2))
            
            time.sleep(dt)  # Real-time pacing — remove for faster-than-real-time simulation
            elapsed += dt
        
        print(f"[DONE] Simulated {duration_s}s. {len(self.telemetry_log)} telemetry records.")

if __name__ == "__main__":
    factory = FactoryLine()
    factory.run(duration_s=60.0, tick_rate_hz=10.0)

Why the tick rate matters: 10 Hz is a good balance between resolution and CPU usage. For physics-accurate simulation, go to 50–100 Hz. For long-horizon scenario testing, drop time.sleep() and run at maximum speed.


Verification

pip install --break-system-packages dataclasses
python factory_twin.py

You should see:

[START] Factory twin running for 60s at 10.0Hz
{
  "timestamp": 1740000005.123,
  "infeed": {
    "component": "infeed",
    "type": "conveyor",
    "speed_mps": 0.35,
    "item_count": 2,
    "jammed": false,
    "utilization": 0.12
  },
  "arm": {
    "component": "arm_1",
    "type": "robotic_arm",
    "state": "reaching",
    "base_deg": 0.0,
    "shoulder_deg": 81.0,
    "holding": "a3f19b2c"
  }
}
...
[DONE] Simulated 60s. 600 telemetry records.

If you see [EVENT] Jam detected frequently: Your spawn interval is too low for belt speed. Increase _item_spawn_interval to 4.5s or raise max_speed_mps.

Terminal showing factory twin telemetry output 600 telemetry snapshots generated in 60 simulated seconds — ready for dashboarding or ML training


Going Further

Stream to a dashboard: Replace print() with a WebSocket broadcast using websockets or push to InfluxDB + Grafana.

Add sensor noise: Real sensors aren't perfect. Inject Gaussian noise into telemetry:

import random
speed_reading = self.speed_mps + random.gauss(0, 0.005)  # ±5mm/s noise

Run faster-than-real-time: Remove time.sleep(dt) from the loop. A 60-second scenario runs in ~0.3 seconds, letting you generate hours of synthetic training data quickly.

Add a second arm: Instantiate RoboticArm("arm_2") and alternate picks — useful for modeling parallel workstation setups.


What You Learned

  • A digital twin is a time-stepped state machine, not a dashboard
  • Conveyor jams emerge naturally from item spacing logic — no special-case code needed
  • Telemetry generation is built into the simulation, not bolted on afterward
  • Removing time.sleep() gives you a faster-than-real-time simulator for data generation

Limitation: This model doesn't account for belt slip, motor torque limits, or inertia. For high-fidelity physics, look at PyBullet or NVIDIA Isaac Sim. This approach is best for control logic testing and telemetry generation.

When NOT to use this: If your goal is photorealistic rendering or exact kinematics validation before physical deployment, use a dedicated robotics simulator.


Tested on Python 3.12, Ubuntu 24.04 and macOS Sequoia