Coordinate 10+ Drone Swarms with Python and ROS 2

Build a working drone swarm coordinator in Python using ROS 2 Humble. Covers namespacing, formation logic, and fault tolerance for 10+ UAVs.

Problem: Coordinating 10+ Drones Without Chaos

You've got a fleet of UAVs and they keep colliding, drifting out of formation, or going silent when one node crashes. Scaling from 2 drones to 10+ requires more than duplicating your single-drone ROS 2 setup — it needs deliberate namespacing, a shared state model, and a coordinator that handles failures gracefully.

You'll learn:

  • How to namespace ROS 2 nodes so 10+ drones don't step on each other
  • A Python-based swarm coordinator that assigns formation positions
  • Fault detection and reassignment when a drone drops offline

Time: 45 min | Level: Advanced


Why This Happens

A naive multi-drone setup launches identical nodes that all publish to /cmd_vel and subscribe to /odom. With 10 drones, every drone hears every other drone's odometry — and sends velocity commands to all of them simultaneously.

Common symptoms:

  • All drones move when you command one
  • Position feedback is an averaged blur of all 10 drones
  • One crashed node brings down coordination for the entire fleet

ROS 2's namespace system solves the topic collision problem, but you still need a coordinator node that treats the swarm as a first-class entity — not just 10 independent robots.


Solution

Step 1: Namespace Every Drone at Launch

Each drone gets its own namespace so /drone_1/cmd_vel never conflicts with /drone_5/cmd_vel.

Create swarm_launch.py:

# swarm_launch.py
from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    drone_count = 10
    nodes = []

    for i in range(1, drone_count + 1):
        nodes.append(
            Node(
                package="drone_controller",
                executable="drone_node",
                namespace=f"drone_{i}",          # Isolates all topics per drone
                name=f"drone_{i}",
                parameters=[{"drone_id": i}]
            )
        )

    return LaunchDescription(nodes)

Launch with:

ros2 launch drone_controller swarm_launch.py

Expected: ros2 node list shows /drone_1/drone_1 through /drone_10/drone_10.

If it fails:

  • "package not found": Run colcon build --packages-select drone_controller first, then source install/setup.bash
  • Nodes launch then immediately die: Check that drone_node executable is installed — ros2 pkg executables drone_controller

Step 2: Build the Swarm Coordinator Node

The coordinator holds the desired formation, tracks which drones are alive, and publishes individual position targets.

# swarm_coordinator.py
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import Bool
import math
import time

FORMATION_RADIUS = 5.0  # meters between drones in circle formation

class SwarmCoordinator(Node):
    def __init__(self):
        super().__init__("swarm_coordinator")
        self.declare_parameter("drone_count", 10)
        self.drone_count = self.get_parameter("drone_count").value

        self.alive = {i: False for i in range(1, self.drone_count + 1)}
        self.publishers = {}
        self.heartbeat_subs = {}

        for i in range(1, self.drone_count + 1):
            # Publish target pose to each drone's namespace
            self.publishers[i] = self.create_publisher(
                PoseStamped,
                f"/drone_{i}/target_pose",
                10
            )
            # Subscribe to heartbeat so we know who's alive
            self.heartbeat_subs[i] = self.create_subscription(
                Bool,
                f"/drone_{i}/heartbeat",
                lambda msg, drone_id=i: self._on_heartbeat(drone_id, msg),
                10
            )

        # Check for stale drones every 2 seconds
        self.create_timer(2.0, self._check_alive)
        # Publish formation targets at 10 Hz
        self.create_timer(0.1, self._publish_targets)

        self._last_seen = {i: 0.0 for i in range(1, self.drone_count + 1)}
        self.get_logger().info(f"Coordinator ready for {self.drone_count} drones")

    def _on_heartbeat(self, drone_id: int, msg: Bool):
        self._last_seen[drone_id] = time.time()
        if not self.alive[drone_id]:
            self.get_logger().info(f"Drone {drone_id} came online")
        self.alive[drone_id] = True

    def _check_alive(self):
        now = time.time()
        for drone_id, last in self._last_seen.items():
            # Mark as dead if no heartbeat for 3 seconds
            if now - last > 3.0 and self.alive[drone_id]:
                self.alive[drone_id] = False
                self.get_logger().warn(f"Drone {drone_id} timed out — reassigning formation")

    def _formation_position(self, slot: int, total_slots: int) -> tuple[float, float]:
        # Evenly space active drones around a circle
        angle = (2 * math.pi / total_slots) * slot
        x = FORMATION_RADIUS * math.cos(angle)
        y = FORMATION_RADIUS * math.sin(angle)
        return x, y

    def _publish_targets(self):
        active = [i for i, alive in self.alive.items() if alive]
        if not active:
            return

        for slot, drone_id in enumerate(active):
            x, y = self._formation_position(slot, len(active))
            msg = PoseStamped()
            msg.header.stamp = self.get_clock().now().to_msg()
            msg.header.frame_id = "world"
            msg.pose.position.x = x
            msg.pose.position.y = y
            msg.pose.position.z = 10.0  # Constant altitude
            self.publishers[drone_id].publish(msg)


def main():
    rclpy.init()
    node = SwarmCoordinator()
    rclpy.spin(node)
    rclpy.shutdown()

Why the heartbeat pattern: ROS 2's lifecycle manager handles individual node states, but it doesn't tell you if the physical drone lost connection. A simple heartbeat topic gives you real-world drone health, not just process health.


Step 3: Add Heartbeat to Each Drone Node

Each drone publishes its heartbeat so the coordinator knows it's alive.

# In your drone_node.py
from std_msgs.msg import Bool

class DroneNode(Node):
    def __init__(self):
        super().__init__("drone_node")
        self.declare_parameter("drone_id", 0)
        self.drone_id = self.get_parameter("drone_id").value

        self._heartbeat_pub = self.create_publisher(Bool, "heartbeat", 10)
        # Relative topic — becomes /drone_N/heartbeat due to namespace
        self.create_timer(0.5, self._send_heartbeat)

    def _send_heartbeat(self):
        msg = Bool()
        msg.data = True
        self._heartbeat_pub.publish(msg)

Note the relative topic name "heartbeat" — not "/heartbeat". The leading slash bypasses namespace. Relative names inherit it. This single distinction causes most multi-robot topic bugs.


Step 4: Test Fault Tolerance

Kill one drone and watch the coordinator redistribute the formation.

Terminal 1 — watch coordinator output:

ros2 topic echo /swarm_coordinator/status

Terminal 2 — kill drone 3:

ros2 node kill /drone_3/drone_3

You should see: Within 3 seconds, the coordinator logs "Drone 3 timed out — reassigning formation" and the remaining 9 drones receive updated positions that close the gap.


Verification

Run the full system in simulation:

# Launch all drone nodes
ros2 launch drone_controller swarm_launch.py

# In a second terminal, start the coordinator
ros2 run drone_controller swarm_coordinator

# Confirm all 10 drones are receiving targets
ros2 topic hz /drone_1/target_pose
ros2 topic hz /drone_10/target_pose

You should see: Both topics reporting ~10 Hz, and ros2 node info /swarm_coordinator listing 10 publishers and 10 heartbeat subscribers.


What You Learned

  • ROS 2 namespaces isolate topics per drone — always use relative topic names inside drone nodes
  • A dedicated coordinator node is the right place to own formation logic, not the drones themselves
  • Heartbeat timeouts let you detect real-world failures that ROS lifecycle states miss
  • When a drone drops, dynamic slot reassignment keeps the formation tight without manual intervention

Limitation: This circle formation assumes a flat 2D plane at constant altitude. For 3D formations (sphere, helix), extend _formation_position to return an (x, y, z) tuple and account for vertical collision avoidance separately.

When NOT to use this: If your drones use different firmware (PX4 vs ArduPilot) you'll need a MAVROS abstraction layer between the coordinator and each drone node. The coordinator logic stays the same — only the command interface changes.


Tested on ROS 2 Humble, Python 3.10, Ubuntu 22.04. Gazebo simulation recommended before any real hardware runs.