Chapter 1.7: Building ROS 2 Packages with Python
Learning Objectives
By the end of this chapter, students will be able to:
- Create and structure ROS 2 packages using Python
- Implement nodes with proper package organization
- Use ROS 2 client libraries effectively in Python
- Create custom message, service, and action definitions
- Build and test ROS 2 Python packages
- Follow ROS 2 Python best practices
Introduction
ROS 2 packages are the fundamental units of software organization in ROS 2. They contain nodes, libraries, and other resources needed for robot applications. Python is one of the primary languages supported by ROS 2, offering rapid prototyping and development capabilities that are particularly valuable in the research and development phases of physical AI systems.
In this chapter, we'll explore how to create and structure ROS 2 packages using Python, implement nodes with proper organization, and follow best practices for Python development in ROS 2. Understanding package structure and development workflows is essential for building maintainable and reusable robot software.
Package Structure and Organization
Standard ROS 2 Package Layout
A typical ROS 2 package follows this structure:
my_robot_package/
├── CMakeLists.txt # Build configuration for C++
├── package.xml # Package metadata
├── setup.py # Python package configuration
├── setup.cfg # Python build configuration
├── my_robot_package/ # Python module
│ ├── __init__.py
│ ├── main.py
│ ├── nodes/
│ │ ├── __init__.py
│ │ └── sensor_processor.py
│ ├── utils/
│ │ ├── __init__.py
│ │ └── data_processing.py
│ └── msgs/
│ ├── __init__.py
│ └── custom_messages.py
├── launch/
│ └── robot.launch.py
├── config/
│ └── robot_params.yaml
├── test/
│ └── test_nodes.py
└── resource/ # Additional resources
└── my_robot_package
Creating a Package with colcon
The recommended way to create a ROS 2 package is using the ros2 pkg create command:
# Create a Python package with colcon
ros2 pkg create --build-type ament_python my_robot_package
# This creates the basic structure with necessary files
Package Configuration Files
package.xml
The package.xml file contains metadata about the package:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_robot_package</name>
<version>0.0.0</version>
<description>Package for my robot functionality</description>
<maintainer email="user@example.com">User Name</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
setup.py
The setup.py file configures the Python package:
from setuptools import setup
from glob import glob
import os
package_name = 'my_robot_package'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include launch files
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
# Include config files
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='your.email@example.com',
description='Package for my robot functionality',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sensor_processor = my_robot_package.nodes.sensor_processor:main',
'robot_controller = my_robot_package.nodes.robot_controller:main',
],
},
)
Implementing Nodes in Python
Basic Node Structure
# my_robot_package/nodes/basic_node.py
import rclpy
from rclpy.node import Node
class BasicNode(Node):
def __init__(self):
super().__init__('basic_node')
self.get_logger().info('Basic node initialized')
def main(args=None):
rclpy.init(args=args)
node = BasicNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Advanced Node with Multiple Components
# my_robot_package/nodes/robot_controller.py
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Twist
from std_msgs.msg import String
from my_robot_package.utils.data_processing import DataProcessor
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Initialize data processor
self.data_processor = DataProcessor()
# Create publisher for velocity commands
self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
# Create subscriber for laser scan
self.scan_sub = self.create_subscription(
LaserScan, 'scan', self.scan_callback, 10)
# Create subscriber for status messages
self.status_sub = self.create_subscription(
String, 'status', self.status_callback, 10)
# Create timer for periodic tasks
self.timer = self.create_timer(0.1, self.control_loop)
# Initialize robot state
self.obstacle_detected = False
self.target_velocity = Twist()
self.get_logger().info('Robot controller initialized')
def scan_callback(self, msg):
"""Process laser scan data"""
try:
# Process scan to detect obstacles
min_distance = min(msg.ranges) if msg.ranges else float('inf')
self.obstacle_detected = min_distance < 1.0 # Threshold at 1 meter
# Log obstacle detection
if self.obstacle_detected:
self.get_logger().warn(f'Obstacle detected at {min_distance:.2f}m')
except Exception as e:
self.get_logger().error(f'Error processing scan: {e}')
def status_callback(self, msg):
"""Handle status messages"""
self.get_logger().info(f'Status: {msg.data}')
def control_loop(self):
"""Main control loop"""
try:
# Determine velocity based on state
cmd_vel = Twist()
if self.obstacle_detected:
# Turn to avoid obstacle
cmd_vel.angular.z = 0.5
else:
# Move forward
cmd_vel.linear.x = 0.3
# Publish command
self.cmd_vel_pub.publish(cmd_vel)
except Exception as e:
self.get_logger().error(f'Error in control loop: {e}')
def main(args=None):
rclpy.init(args=args)
node = RobotController()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Shutting down robot controller')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Custom Messages, Services, and Actions
Creating Custom Messages
To create custom message types, create .msg files in the msg/ directory:
# In msg/RobotStatus.msg
string robot_name
int32 battery_level
bool is_charging
float32[] joint_positions
After creating the message file, update package.xml and setup.py:
<!-- In package.xml -->
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
# In setup.py, add to entry_points
data_files=[
# ... other entries
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include custom message files
(os.path.join('share', package_name, 'msg'), glob('msg/*.msg')),
],
Using Custom Messages in Python
# my_robot_package/nodes/status_publisher.py
import rclpy
from rclpy.node import Node
from my_robot_package.msg import RobotStatus # Custom message
class StatusPublisher(Node):
def __init__(self):
super().__init__('status_publisher')
self.publisher = self.create_publisher(RobotStatus, 'robot_status', 10)
self.timer = self.create_timer(1.0, self.publish_status)
def publish_status(self):
msg = RobotStatus()
msg.robot_name = "MyRobot01"
msg.battery_level = 85
msg.is_charging = False
msg.joint_positions = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
self.publisher.publish(msg)
self.get_logger().info(f'Published status: {msg.robot_name}')
def main(args=None):
rclpy.init(args=args)
node = StatusPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Launch Files for Python Packages
Launch files allow you to start multiple nodes with specific configurations:
# launch/robot_system.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
# Get package share directory
pkg_share = get_package_share_directory('my_robot_package')
# Load parameters from config file
config_file = os.path.join(pkg_share, 'config', 'robot_params.yaml')
return LaunchDescription([
# Robot controller node
Node(
package='my_robot_package',
executable='robot_controller',
name='robot_controller',
parameters=[config_file],
output='screen'
),
# Sensor processor node
Node(
package='my_robot_package',
executable='sensor_processor',
name='sensor_processor',
parameters=[config_file],
output='screen'
),
# Status publisher node
Node(
package='my_robot_package',
executable='status_publisher',
name='status_publisher',
parameters=[config_file],
output='screen'
)
])
Configuration Files
Configuration files allow you to parameterize your nodes:
# config/robot_params.yaml
/**:
ros__parameters:
robot_name: "MyRobot01"
max_linear_velocity: 0.5
max_angular_velocity: 1.0
safety_distance: 0.5
control_frequency: 10.0
battery_threshold: 20
Using parameters in your nodes:
# my_robot_package/nodes/configured_node.py
import rclpy
from rclpy.node import Node
class ConfiguredNode(Node):
def __init__(self):
super().__init__('configured_node')
# Declare parameters with default values
self.declare_parameter('robot_name', 'DefaultRobot')
self.declare_parameter('max_linear_velocity', 0.5)
self.declare_parameter('max_angular_velocity', 1.0)
# Get parameter values
self.robot_name = self.get_parameter('robot_name').value
self.max_linear_vel = self.get_parameter('max_linear_velocity').value
self.max_angular_vel = self.get_parameter('max_angular_velocity').value
self.get_logger().info(f'Robot name: {self.robot_name}')
self.get_logger().info(f'Max velocities - Linear: {self.max_linear_vel}, Angular: {self.max_angular_vel}')
def main(args=None):
rclpy.init(args=args)
node = ConfiguredNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
Testing Python Packages
Unit Tests
Create tests in the test/ directory:
# test/test_nodes.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from my_robot_package.nodes.robot_controller import RobotController
class TestRobotController(unittest.TestCase):
def setUp(self):
rclpy.init()
self.node = RobotController()
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
def tearDown(self):
self.node.destroy_node()
rclpy.shutdown()
def test_node_initialization(self):
"""Test that the node initializes correctly"""
self.assertIsNotNone(self.node)
self.assertEqual(self.node.get_name(), 'robot_controller')
def test_parameters_declared(self):
"""Test that parameters are properly declared"""
params = self.node.get_parameters(['robot_name', 'max_linear_velocity'])
self.assertIsNotNone(params)
if __name__ == '__main__':
unittest.main()
Running Tests
# Run all tests for the package
colcon test --packages-select my_robot_package
# Run tests with more verbose output
colcon test --packages-select my_robot_package --event-handlers console_direct+
# View test results
colcon test-result --all
Best Practices for Python Packages
Code Organization
# my_robot_package/utils/data_processing.py
"""Utility functions for data processing in robot applications."""
import numpy as np
from typing import List, Optional
def filter_sensor_data(raw_data: List[float],
min_val: float = 0.0,
max_val: float = 10.0) -> List[float]:
"""
Filter sensor data to remove outliers.
Args:
raw_data: List of raw sensor readings
min_val: Minimum valid value
max_val: Maximum valid value
Returns:
List of filtered sensor readings
"""
filtered = []
for value in raw_data:
if min_val <= value <= max_val:
filtered.append(value)
else:
# Log warning for outliers
print(f"Warning: Outlier value {value} filtered out")
return filtered
def calculate_average(values: List[float]) -> Optional[float]:
"""
Calculate average of a list of values.
Args:
values: List of numeric values
Returns:
Average value or None if list is empty
"""
if not values:
return None
return sum(values) / len(values)
class DataProcessor:
"""Class for processing robot sensor data."""
def __init__(self, outlier_threshold: float = 2.0):
self.outlier_threshold = outlier_threshold
self.history = []
def process_scan(self, scan_data: List[float]) -> List[float]:
"""Process laser scan data to remove noise."""
# Apply filtering
filtered_data = filter_sensor_data(
scan_data,
min_val=0.1, # Minimum detectable distance
max_val=10.0 # Maximum range
)
# Calculate statistics
avg_distance = calculate_average(filtered_data)
# Store in history
if avg_distance is not None:
self.history.append(avg_distance)
# Keep only last 100 measurements
if len(self.history) > 100:
self.history = self.history[-100:]
return filtered_data
Error Handling and Logging
# my_robot_package/nodes/robust_node.py
import rclpy
from rclpy.node import Node
import traceback
from rclpy.qos import QoSProfile, QoSDurabilityPolicy
from std_msgs.msg import String
class RobustNode(Node):
def __init__(self):
super().__init__('robust_node')
# Use appropriate QoS for different scenarios
self.publisher = self.create_publisher(
String,
'status',
QoSProfile(depth=10, durability=QoSDurabilityPolicy.VOLATILE))
self.timer = self.create_timer(1.0, self.robust_timer_callback)
self.error_count = 0
def robust_timer_callback(self):
"""Timer callback with error handling."""
try:
# Perform potentially risky operation
result = self.potentially_risky_operation()
# Publish result
msg = String()
msg.data = f'Operation successful: {result}'
self.publisher.publish(msg)
# Reset error count on success
if self.error_count > 0:
self.get_logger().info(f'Recovered after {self.error_count} errors')
self.error_count = 0
except Exception as e:
self.error_count += 1
self.get_logger().error(f'Error in timer callback: {e}')
self.get_logger().error(traceback.format_exc())
# After too many errors, consider restarting or alerting
if self.error_count > 10:
self.get_logger().fatal('Too many consecutive errors, node may be unstable')
def potentially_risky_operation(self):
"""Simulate a potentially risky operation."""
# In real code, this might be:
# - File I/O operations
# - Network requests
# - Hardware interactions
# - Complex calculations
import random
if random.random() < 0.1: # 10% chance of failure
raise RuntimeError("Simulated operation failure")
return "Success"
def main(args=None):
rclpy.init(args=args)
node = RobustNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Node interrupted by user')
except Exception as e:
node.get_logger().error(f'Unexpected error: {e}')
node.get_logger().error(traceback.format_exc())
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Hands-On Exercise: Create a Complete ROS 2 Package
Objective
Create a complete ROS 2 package with multiple nodes, custom messages, and launch files.
Prerequisites
- ROS 2 Humble installed
- Basic understanding of ROS 2 concepts
- Python programming skills
Steps
- Create a new ROS 2 package called
robot_navigation_tutorial - Implement nodes for:
- Sensor processing (subscribes to laser scan, publishes processed data)
- Navigation controller (subscribes to processed data, publishes velocity commands)
- Status monitor (monitors system status, publishes health information)
- Create custom messages for communication between nodes
- Create a launch file to start all nodes together
- Add configuration parameters
- Create basic unit tests
- Build and test the package
Expected Result
Students will have a complete, functioning ROS 2 package with multiple nodes communicating through various patterns.
Assessment Questions
Multiple Choice
Q1: What is the purpose of the setup.py file in a ROS 2 Python package?
- a) To define the build system for C++ code
- b) To configure the Python package and define entry points
- c) To specify runtime dependencies only
- d) To define message types
Details
Click to reveal answer
Answer: bExplanation: The setup.py file configures the Python package, including dependencies, data files, and entry points (console scripts).
Short Answer
Q2: Explain the difference between package.xml and setup.py in a ROS 2 Python package.
Practical Exercise
Q3: Create a ROS 2 Python package that includes a node publishing sensor data, another node processing that data, and a third node that makes decisions based on the processed data. Use custom messages for communication between nodes and create a launch file to start all nodes together.
Further Reading
- "ROS 2 Python Programming" - Comprehensive guide to Python in ROS 2
- "Effective Python for Robotics" - Best practices for Python robotics development
- "ROS 2 Package Development" - Guidelines for creating robust packages
Summary
In this chapter, we've explored how to create and structure ROS 2 packages using Python, implement nodes with proper organization, and follow best practices for Python development in ROS 2. We've covered package structure, configuration files, custom message definitions, launch files, and testing strategies.
Creating well-structured ROS 2 packages is essential for building maintainable and reusable robot software. The modular approach allows for better code organization, easier testing, and improved collaboration among development teams. Understanding these concepts is crucial for developing complex physical AI systems that require multiple coordinated components.
In the next chapter, we'll explore launch files and parameter management in more detail, learning how to configure and deploy complex robot systems effectively.