Skip to main content

Chapter 1.7: Building ROS 2 Packages with Python

Learning Objectives

By the end of this chapter, students will be able to:

  • Create and structure ROS 2 packages using Python
  • Implement nodes with proper package organization
  • Use ROS 2 client libraries effectively in Python
  • Create custom message, service, and action definitions
  • Build and test ROS 2 Python packages
  • Follow ROS 2 Python best practices

Introduction

ROS 2 packages are the fundamental units of software organization in ROS 2. They contain nodes, libraries, and other resources needed for robot applications. Python is one of the primary languages supported by ROS 2, offering rapid prototyping and development capabilities that are particularly valuable in the research and development phases of physical AI systems.

In this chapter, we'll explore how to create and structure ROS 2 packages using Python, implement nodes with proper organization, and follow best practices for Python development in ROS 2. Understanding package structure and development workflows is essential for building maintainable and reusable robot software.

Package Structure and Organization

Standard ROS 2 Package Layout

A typical ROS 2 package follows this structure:

my_robot_package/
├── CMakeLists.txt # Build configuration for C++
├── package.xml # Package metadata
├── setup.py # Python package configuration
├── setup.cfg # Python build configuration
├── my_robot_package/ # Python module
│ ├── __init__.py
│ ├── main.py
│ ├── nodes/
│ │ ├── __init__.py
│ │ └── sensor_processor.py
│ ├── utils/
│ │ ├── __init__.py
│ │ └── data_processing.py
│ └── msgs/
│ ├── __init__.py
│ └── custom_messages.py
├── launch/
│ └── robot.launch.py
├── config/
│ └── robot_params.yaml
├── test/
│ └── test_nodes.py
└── resource/ # Additional resources
└── my_robot_package

Creating a Package with colcon

The recommended way to create a ROS 2 package is using the ros2 pkg create command:

# Create a Python package with colcon
ros2 pkg create --build-type ament_python my_robot_package

# This creates the basic structure with necessary files

Package Configuration Files

package.xml

The package.xml file contains metadata about the package:

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_package</name>
<version>0.0.0</version>
<description>Package for my robot functionality</description>
<maintainer email="user@example.com">User Name</maintainer>
<license>Apache-2.0</license>

<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>

setup.py

The setup.py file configures the Python package:

from setuptools import setup
from glob import glob
import os

package_name = 'my_robot_package'

setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include launch files
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
# Include config files
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='your.email@example.com',
description='Package for my robot functionality',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sensor_processor = my_robot_package.nodes.sensor_processor:main',
'robot_controller = my_robot_package.nodes.robot_controller:main',
],
},
)

Implementing Nodes in Python

Basic Node Structure

# my_robot_package/nodes/basic_node.py
import rclpy
from rclpy.node import Node

class BasicNode(Node):
def __init__(self):
super().__init__('basic_node')
self.get_logger().info('Basic node initialized')

def main(args=None):
rclpy.init(args=args)
node = BasicNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Advanced Node with Multiple Components

# my_robot_package/nodes/robot_controller.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from my_robot_package.utils.data_processing import DataProcessor

class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')

# Initialize data processor
self.data_processor = DataProcessor()

# Create publisher for velocity commands
self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)

# Create subscriber for laser scan
self.scan_sub = self.create_subscription(
LaserScan, 'scan', self.scan_callback, 10)

# Create subscriber for status messages
self.status_sub = self.create_subscription(
String, 'status', self.status_callback, 10)

# Create timer for periodic tasks
self.timer = self.create_timer(0.1, self.control_loop)

# Initialize robot state
self.obstacle_detected = False
self.target_velocity = Twist()

self.get_logger().info('Robot controller initialized')

def scan_callback(self, msg):
"""Process laser scan data"""
try:
# Process scan to detect obstacles
min_distance = min(msg.ranges) if msg.ranges else float('inf')
self.obstacle_detected = min_distance < 1.0 # Threshold at 1 meter

# Log obstacle detection
if self.obstacle_detected:
self.get_logger().warn(f'Obstacle detected at {min_distance:.2f}m')
except Exception as e:
self.get_logger().error(f'Error processing scan: {e}')

def status_callback(self, msg):
"""Handle status messages"""
self.get_logger().info(f'Status: {msg.data}')

def control_loop(self):
"""Main control loop"""
try:
# Determine velocity based on state
cmd_vel = Twist()

if self.obstacle_detected:
# Turn to avoid obstacle
cmd_vel.angular.z = 0.5
else:
# Move forward
cmd_vel.linear.x = 0.3

# Publish command
self.cmd_vel_pub.publish(cmd_vel)

except Exception as e:
self.get_logger().error(f'Error in control loop: {e}')

def main(args=None):
rclpy.init(args=args)
node = RobotController()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down robot controller')
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Custom Messages, Services, and Actions

Creating Custom Messages

To create custom message types, create .msg files in the msg/ directory:

# In msg/RobotStatus.msg
string robot_name
int32 battery_level
bool is_charging
float32[] joint_positions

After creating the message file, update package.xml and setup.py:

<!-- In package.xml -->
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
# In setup.py, add to entry_points
data_files=[
# ... other entries
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include custom message files
(os.path.join('share', package_name, 'msg'), glob('msg/*.msg')),
],

Using Custom Messages in Python

# my_robot_package/nodes/status_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_package.msg import RobotStatus # Custom message

class StatusPublisher(Node):
def __init__(self):
super().__init__('status_publisher')
self.publisher = self.create_publisher(RobotStatus, 'robot_status', 10)
self.timer = self.create_timer(1.0, self.publish_status)

def publish_status(self):
msg = RobotStatus()
msg.robot_name = "MyRobot01"
msg.battery_level = 85
msg.is_charging = False
msg.joint_positions = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]

self.publisher.publish(msg)
self.get_logger().info(f'Published status: {msg.robot_name}')

def main(args=None):
rclpy.init(args=args)
node = StatusPublisher()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

Launch Files for Python Packages

Launch files allow you to start multiple nodes with specific configurations:

# launch/robot_system.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os

def generate_launch_description():
# Get package share directory
pkg_share = get_package_share_directory('my_robot_package')

# Load parameters from config file
config_file = os.path.join(pkg_share, 'config', 'robot_params.yaml')

return LaunchDescription([
# Robot controller node
Node(
package='my_robot_package',
executable='robot_controller',
name='robot_controller',
parameters=[config_file],
output='screen'
),

# Sensor processor node
Node(
package='my_robot_package',
executable='sensor_processor',
name='sensor_processor',
parameters=[config_file],
output='screen'
),

# Status publisher node
Node(
package='my_robot_package',
executable='status_publisher',
name='status_publisher',
parameters=[config_file],
output='screen'
)
])

Configuration Files

Configuration files allow you to parameterize your nodes:

# config/robot_params.yaml
/**:
ros__parameters:
robot_name: "MyRobot01"
max_linear_velocity: 0.5
max_angular_velocity: 1.0
safety_distance: 0.5
control_frequency: 10.0
battery_threshold: 20

Using parameters in your nodes:

# my_robot_package/nodes/configured_node.py
import rclpy
from rclpy.node import Node

class ConfiguredNode(Node):
def __init__(self):
super().__init__('configured_node')

# Declare parameters with default values
self.declare_parameter('robot_name', 'DefaultRobot')
self.declare_parameter('max_linear_velocity', 0.5)
self.declare_parameter('max_angular_velocity', 1.0)

# Get parameter values
self.robot_name = self.get_parameter('robot_name').value
self.max_linear_vel = self.get_parameter('max_linear_velocity').value
self.max_angular_vel = self.get_parameter('max_angular_velocity').value

self.get_logger().info(f'Robot name: {self.robot_name}')
self.get_logger().info(f'Max velocities - Linear: {self.max_linear_vel}, Angular: {self.max_angular_vel}')

def main(args=None):
rclpy.init(args=args)
node = ConfiguredNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

Testing Python Packages

Unit Tests

Create tests in the test/ directory:

# test/test_nodes.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from my_robot_package.nodes.robot_controller import RobotController

class TestRobotController(unittest.TestCase):
def setUp(self):
rclpy.init()
self.node = RobotController()
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)

def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()

def test_node_initialization(self):
"""Test that the node initializes correctly"""
self.assertIsNotNone(self.node)
self.assertEqual(self.node.get_name(), 'robot_controller')

def test_parameters_declared(self):
"""Test that parameters are properly declared"""
params = self.node.get_parameters(['robot_name', 'max_linear_velocity'])
self.assertIsNotNone(params)

if __name__ == '__main__':
unittest.main()

Running Tests

# Run all tests for the package
colcon test --packages-select my_robot_package

# Run tests with more verbose output
colcon test --packages-select my_robot_package --event-handlers console_direct+

# View test results
colcon test-result --all

Best Practices for Python Packages

Code Organization

# my_robot_package/utils/data_processing.py
"""Utility functions for data processing in robot applications."""

import numpy as np
from typing import List, Optional


def filter_sensor_data(raw_data: List[float],
min_val: float = 0.0,
max_val: float = 10.0) -> List[float]:
"""
Filter sensor data to remove outliers.

Args:
raw_data: List of raw sensor readings
min_val: Minimum valid value
max_val: Maximum valid value

Returns:
List of filtered sensor readings
"""
filtered = []
for value in raw_data:
if min_val <= value <= max_val:
filtered.append(value)
else:
# Log warning for outliers
print(f"Warning: Outlier value {value} filtered out")

return filtered


def calculate_average(values: List[float]) -> Optional[float]:
"""
Calculate average of a list of values.

Args:
values: List of numeric values

Returns:
Average value or None if list is empty
"""
if not values:
return None

return sum(values) / len(values)


class DataProcessor:
"""Class for processing robot sensor data."""

def __init__(self, outlier_threshold: float = 2.0):
self.outlier_threshold = outlier_threshold
self.history = []

def process_scan(self, scan_data: List[float]) -> List[float]:
"""Process laser scan data to remove noise."""
# Apply filtering
filtered_data = filter_sensor_data(
scan_data,
min_val=0.1, # Minimum detectable distance
max_val=10.0 # Maximum range
)

# Calculate statistics
avg_distance = calculate_average(filtered_data)

# Store in history
if avg_distance is not None:
self.history.append(avg_distance)
# Keep only last 100 measurements
if len(self.history) > 100:
self.history = self.history[-100:]

return filtered_data

Error Handling and Logging

# my_robot_package/nodes/robust_node.py
import rclpy
from rclpy.node import Node
import traceback
from rclpy.qos import QoSProfile, QoSDurabilityPolicy
from std_msgs.msg import String

class RobustNode(Node):
def __init__(self):
super().__init__('robust_node')

# Use appropriate QoS for different scenarios
self.publisher = self.create_publisher(
String,
'status',
QoSProfile(depth=10, durability=QoSDurabilityPolicy.VOLATILE))

self.timer = self.create_timer(1.0, self.robust_timer_callback)
self.error_count = 0

def robust_timer_callback(self):
"""Timer callback with error handling."""
try:
# Perform potentially risky operation
result = self.potentially_risky_operation()

# Publish result
msg = String()
msg.data = f'Operation successful: {result}'
self.publisher.publish(msg)

# Reset error count on success
if self.error_count > 0:
self.get_logger().info(f'Recovered after {self.error_count} errors')
self.error_count = 0

except Exception as e:
self.error_count += 1
self.get_logger().error(f'Error in timer callback: {e}')
self.get_logger().error(traceback.format_exc())

# After too many errors, consider restarting or alerting
if self.error_count > 10:
self.get_logger().fatal('Too many consecutive errors, node may be unstable')

def potentially_risky_operation(self):
"""Simulate a potentially risky operation."""
# In real code, this might be:
# - File I/O operations
# - Network requests
# - Hardware interactions
# - Complex calculations
import random

if random.random() < 0.1: # 10% chance of failure
raise RuntimeError("Simulated operation failure")

return "Success"

def main(args=None):
rclpy.init(args=args)
node = RobustNode()

try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Node interrupted by user')
except Exception as e:
node.get_logger().error(f'Unexpected error: {e}')
node.get_logger().error(traceback.format_exc())
finally:
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Hands-On Exercise: Create a Complete ROS 2 Package

Objective

Create a complete ROS 2 package with multiple nodes, custom messages, and launch files.

Prerequisites

  • ROS 2 Humble installed
  • Basic understanding of ROS 2 concepts
  • Python programming skills

Steps

  1. Create a new ROS 2 package called robot_navigation_tutorial
  2. Implement nodes for:
    • Sensor processing (subscribes to laser scan, publishes processed data)
    • Navigation controller (subscribes to processed data, publishes velocity commands)
    • Status monitor (monitors system status, publishes health information)
  3. Create custom messages for communication between nodes
  4. Create a launch file to start all nodes together
  5. Add configuration parameters
  6. Create basic unit tests
  7. Build and test the package

Expected Result

Students will have a complete, functioning ROS 2 package with multiple nodes communicating through various patterns.

Assessment Questions

Multiple Choice

Q1: What is the purpose of the setup.py file in a ROS 2 Python package?

  • a) To define the build system for C++ code
  • b) To configure the Python package and define entry points
  • c) To specify runtime dependencies only
  • d) To define message types
Details

Click to reveal answer Answer: b
Explanation: The setup.py file configures the Python package, including dependencies, data files, and entry points (console scripts).

Short Answer

Q2: Explain the difference between package.xml and setup.py in a ROS 2 Python package.

Practical Exercise

Q3: Create a ROS 2 Python package that includes a node publishing sensor data, another node processing that data, and a third node that makes decisions based on the processed data. Use custom messages for communication between nodes and create a launch file to start all nodes together.

Further Reading

  1. "ROS 2 Python Programming" - Comprehensive guide to Python in ROS 2
  2. "Effective Python for Robotics" - Best practices for Python robotics development
  3. "ROS 2 Package Development" - Guidelines for creating robust packages

Summary

In this chapter, we've explored how to create and structure ROS 2 packages using Python, implement nodes with proper organization, and follow best practices for Python development in ROS 2. We've covered package structure, configuration files, custom message definitions, launch files, and testing strategies.

Creating well-structured ROS 2 packages is essential for building maintainable and reusable robot software. The modular approach allows for better code organization, easier testing, and improved collaboration among development teams. Understanding these concepts is crucial for developing complex physical AI systems that require multiple coordinated components.

In the next chapter, we'll explore launch files and parameter management in more detail, learning how to configure and deploy complex robot systems effectively.