Problem: You Can't Test Factory Logic Without a Real Factory
Physical factory lines are expensive, dangerous to modify, and impossible to test failure scenarios on safely. You need a way to validate control logic, spot bottlenecks, and train ML models before a single bolt is turned.
A digital twin solves this. It's a live software model that mirrors physical behavior in real time.
You'll learn:
- How to model conveyors, robotic arms, and sensors as Python objects
- How to run a time-stepped simulation loop that generates realistic telemetry
- How to detect jams and trigger corrective arm movements automatically
Time: 30 min | Level: Intermediate
Why This Happens
Most tutorials show digital twins as dashboards—they display data, they don't simulate it. A real digital twin has internal state that evolves over time, just like physical hardware.
The key insight: a factory line is just a state machine. Each component holds state (position, speed, load), receives inputs (commands, sensor readings), and produces outputs (telemetry, events).
Common symptoms that send people here:
- "How do I test my PLC logic without live hardware?"
- Control code that works in isolation but breaks when components interact
- No way to generate training data for anomaly detection models
Solution
Step 1: Model the Conveyor Belt
A conveyor has speed, a queue of items, and a jam state. That's it.
# conveyor.py
import time
from dataclasses import dataclass, field
from typing import List
@dataclass
class ConveyorItem:
id: str
weight_kg: float
position_m: float = 0.0 # Distance along belt in meters
class ConveyorBelt:
def __init__(self, name: str, length_m: float = 5.0, max_speed_mps: float = 0.5):
self.name = name
self.length_m = length_m
self.max_speed_mps = max_speed_mps
self.speed_mps: float = 0.0
self.items: List[ConveyorItem] = []
self.jammed: bool = False
self._last_tick = time.monotonic()
def set_speed(self, speed: float) -> None:
# Clamp speed to safe range — negative means reverse
self.speed_mps = max(-self.max_speed_mps, min(speed, self.max_speed_mps))
def load_item(self, item: ConveyorItem) -> bool:
if self.jammed:
return False # Don't accept items when jammed
item.position_m = 0.0
self.items.append(item)
return True
def tick(self, dt: float) -> List[ConveyorItem]:
"""Advance simulation by dt seconds. Returns items that exited the belt."""
if self.jammed or abs(self.speed_mps) < 0.001:
return []
exited = []
for item in self.items:
item.position_m += self.speed_mps * dt
# Check for exit
remaining = []
for item in self.items:
if item.position_m >= self.length_m:
exited.append(item)
else:
remaining.append(item)
self.items = remaining
# Jam detection: items too close together cause a jam
if len(self.items) >= 2:
self.items.sort(key=lambda x: x.position_m)
for i in range(1, len(self.items)):
gap = self.items[i].position_m - self.items[i-1].position_m
if gap < 0.15: # Less than 15cm gap → jam
self.jammed = True
self.speed_mps = 0.0
break
return exited
def clear_jam(self) -> None:
self.jammed = False
# Spread items apart to prevent immediate re-jam
for i, item in enumerate(self.items):
item.position_m = i * 0.3
@property
def telemetry(self) -> dict:
return {
"component": self.name,
"type": "conveyor",
"speed_mps": self.speed_mps,
"item_count": len(self.items),
"jammed": self.jammed,
"utilization": len(self.items) / max(int(self.length_m / 0.3), 1)
}
Expected: The ConveyorBelt class tracks items, advances them each tick, and raises jammed = True when items bunch up.
If it fails:
- Items never exit: Check that
speed_mps > 0before callingtick() - Immediate jam on load: Your items are loading faster than the belt clears them — reduce load rate or increase belt speed
Step 2: Model the Robotic Arm
The arm picks items off the end of one conveyor and places them on another. It has a reach limit, a cycle time, and can be in one of three states.
# robotic_arm.py
from enum import Enum
from dataclasses import dataclass
from typing import Optional
class ArmState(Enum):
IDLE = "idle"
REACHING = "reaching"
GRIPPING = "gripping"
PLACING = "placing"
RETURNING = "returning"
@dataclass
class JointAngles:
base_deg: float = 0.0 # Rotation around vertical axis
shoulder_deg: float = 90.0
elbow_deg: float = 0.0
class RoboticArm:
def __init__(self, name: str, cycle_time_s: float = 2.0):
self.name = name
self.cycle_time_s = cycle_time_s # Full pick-and-place takes this long
self.state = ArmState.IDLE
self.angles = JointAngles()
self.held_item: Optional[object] = None
self._state_elapsed: float = 0.0
self._target_conveyor = None
def pick(self, item, from_conveyor) -> bool:
if self.state != ArmState.IDLE:
return False # Arm busy — caller must retry
self.state = ArmState.REACHING
self.held_item = item
self._state_elapsed = 0.0
from_conveyor.items.remove(item)
return True
def tick(self, dt: float, target_conveyor=None) -> bool:
"""Returns True when a place action completes."""
self._state_elapsed += dt
phase_time = self.cycle_time_s / 4 # 4 equal phases
if self._state_elapsed < phase_time:
self.state = ArmState.REACHING
# Interpolate arm toward pick position
progress = self._state_elapsed / phase_time
self.angles.shoulder_deg = 90.0 - (30.0 * progress)
elif self._state_elapsed < phase_time * 2:
self.state = ArmState.GRIPPING
self.angles.shoulder_deg = 60.0
elif self._state_elapsed < phase_time * 3:
self.state = ArmState.PLACING
progress = (self._state_elapsed - phase_time * 2) / phase_time
# Rotate base toward target conveyor
self.angles.base_deg = progress * 90.0
if target_conveyor and progress > 0.9:
target_conveyor.load_item(self.held_item)
self.held_item = None
elif self._state_elapsed >= phase_time * 3:
self.state = ArmState.RETURNING
if self._state_elapsed >= self.cycle_time_s:
self.state = ArmState.IDLE
self.angles = JointAngles()
self._state_elapsed = 0.0
return True # Cycle complete
return False
@property
def telemetry(self) -> dict:
return {
"component": self.name,
"type": "robotic_arm",
"state": self.state.value,
"base_deg": self.angles.base_deg,
"shoulder_deg": self.angles.shoulder_deg,
"holding": self.held_item.id if self.held_item else None
}
Expected: The arm cycles through four phases automatically. Each tick() call advances the animation.
Step 3: Wire the Simulation Loop
Now connect the components. The simulation loop runs at a fixed timestep and broadcasts telemetry each cycle.
# factory_twin.py
import time
import uuid
import random
import json
from conveyor import ConveyorBelt, ConveyorItem
from robotic_arm import RoboticArm
class FactoryLine:
def __init__(self):
# Two-stage line: infeed → arm → outfeed
self.infeed = ConveyorBelt("infeed", length_m=5.0, max_speed_mps=0.4)
self.outfeed = ConveyorBelt("outfeed", length_m=8.0, max_speed_mps=0.6)
self.arm = RoboticArm("arm_1", cycle_time_s=2.5)
self.infeed.set_speed(0.35)
self.outfeed.set_speed(0.55)
self._item_spawn_interval = 3.5 # seconds between new items
self._time_since_spawn = 0.0
self.telemetry_log: list = []
def _spawn_item(self) -> None:
item = ConveyorItem(
id=str(uuid.uuid4())[:8],
weight_kg=random.uniform(0.5, 5.0)
)
success = self.infeed.load_item(item)
if not success:
print(f"[WARN] Could not load item {item.id} — belt jammed")
def step(self, dt: float) -> dict:
"""Advance all components by dt seconds. Returns aggregated telemetry."""
self._time_since_spawn += dt
# Spawn items at regular intervals
if self._time_since_spawn >= self._item_spawn_interval:
self._spawn_item()
self._time_since_spawn = 0.0
# Tick conveyors
exited_infeed = self.infeed.tick(dt)
self.outfeed.tick(dt)
# Auto-clear jams with arm assist
if self.infeed.jammed:
print(f"[EVENT] Jam detected on {self.infeed.name} — clearing")
self.infeed.clear_jam()
# Pick items that reached end of infeed
if exited_infeed and self.arm.state.value == "idle":
item = exited_infeed[0] # Take first available
self.arm.pick(item, self.infeed)
# Tick arm
self.arm.tick(dt, target_conveyor=self.outfeed)
# Collect telemetry snapshot
snapshot = {
"timestamp": time.time(),
"infeed": self.infeed.telemetry,
"outfeed": self.outfeed.telemetry,
"arm": self.arm.telemetry
}
self.telemetry_log.append(snapshot)
return snapshot
def run(self, duration_s: float = 60.0, tick_rate_hz: float = 10.0) -> None:
dt = 1.0 / tick_rate_hz
elapsed = 0.0
print(f"[START] Factory twin running for {duration_s}s at {tick_rate_hz}Hz")
while elapsed < duration_s:
snapshot = self.step(dt)
# Print telemetry every 5 seconds
if int(elapsed) % 5 == 0 and elapsed > 0:
print(json.dumps(snapshot, indent=2))
time.sleep(dt) # Real-time pacing — remove for faster-than-real-time simulation
elapsed += dt
print(f"[DONE] Simulated {duration_s}s. {len(self.telemetry_log)} telemetry records.")
if __name__ == "__main__":
factory = FactoryLine()
factory.run(duration_s=60.0, tick_rate_hz=10.0)
Why the tick rate matters: 10 Hz is a good balance between resolution and CPU usage. For physics-accurate simulation, go to 50–100 Hz. For long-horizon scenario testing, drop time.sleep() and run at maximum speed.
Verification
pip install --break-system-packages dataclasses
python factory_twin.py
You should see:
[START] Factory twin running for 60s at 10.0Hz
{
"timestamp": 1740000005.123,
"infeed": {
"component": "infeed",
"type": "conveyor",
"speed_mps": 0.35,
"item_count": 2,
"jammed": false,
"utilization": 0.12
},
"arm": {
"component": "arm_1",
"type": "robotic_arm",
"state": "reaching",
"base_deg": 0.0,
"shoulder_deg": 81.0,
"holding": "a3f19b2c"
}
}
...
[DONE] Simulated 60s. 600 telemetry records.
If you see [EVENT] Jam detected frequently: Your spawn interval is too low for belt speed. Increase _item_spawn_interval to 4.5s or raise max_speed_mps.
600 telemetry snapshots generated in 60 simulated seconds — ready for dashboarding or ML training
Going Further
Stream to a dashboard: Replace print() with a WebSocket broadcast using websockets or push to InfluxDB + Grafana.
Add sensor noise: Real sensors aren't perfect. Inject Gaussian noise into telemetry:
import random
speed_reading = self.speed_mps + random.gauss(0, 0.005) # ±5mm/s noise
Run faster-than-real-time: Remove time.sleep(dt) from the loop. A 60-second scenario runs in ~0.3 seconds, letting you generate hours of synthetic training data quickly.
Add a second arm: Instantiate RoboticArm("arm_2") and alternate picks — useful for modeling parallel workstation setups.
What You Learned
- A digital twin is a time-stepped state machine, not a dashboard
- Conveyor jams emerge naturally from item spacing logic — no special-case code needed
- Telemetry generation is built into the simulation, not bolted on afterward
- Removing
time.sleep()gives you a faster-than-real-time simulator for data generation
Limitation: This model doesn't account for belt slip, motor torque limits, or inertia. For high-fidelity physics, look at PyBullet or NVIDIA Isaac Sim. This approach is best for control logic testing and telemetry generation.
When NOT to use this: If your goal is photorealistic rendering or exact kinematics validation before physical deployment, use a dedicated robotics simulator.
Tested on Python 3.12, Ubuntu 24.04 and macOS Sequoia