Chapter 1.7: Python Package Development for ROS 2
Learning Objectives
By the end of this chapter, you will be able to:
- Create ROS 2 Python packages with proper structure and dependencies
- Configure package manifest files (package.xml) correctly
- Organize Python code according to ROS 2 conventions
- Implement setup files (setup.py, setup.cfg) for Python packages
- Use ament build system for Python packages
- Manage dependencies and version constraints
- Create and run unit tests for ROS 2 packages
- Document Python code following ROS 2 standards
- Install and build ROS 2 Python packages
Introduction
In this chapter, we'll explore Python package development specifically for ROS 2 systems. While we've learned about the communication patterns between nodes, this chapter focuses on the foundational aspect of organizing and distributing your robotic software as reusable packages.
Python packages in ROS 2 follow specific conventions that allow them to integrate seamlessly with the ROS 2 ecosystem. These conventions include file structure, dependency management, build configuration, and testing procedures. A well-structured ROS 2 Python package can be easily shared across teams, installed on different robots, and integrated into larger robotic systems.
Consider a Physical AI system that includes multiple Python packages:
physical_ai_perception: Handles camera processing, object detection, and sensor fusionphysical_ai_control: Implements control algorithms for humanoid robotsphysical_ai_navigation: Contains path planning and navigation capabilitiesphysical_ai_ui: Provides user interface elements and interaction
Each of these packages would follow the same structural patterns we'll learn in this chapter. Proper package structure not only simplifies development and maintenance but also enables other developers to understand, extend, and reuse your code more effectively.
ROS 2's package system provides several advantages over standard Python packages:
- Integration with the ROS 2 tooling ecosystem (rosdep, roslaunch, etc.)
- Standardized dependency management across different programming languages
- Consistent build and installation procedures
- Integration with workspace management tools
- Automated generation of documentation and interfaces
Throughout this chapter, we'll develop a sample package physical_ai_sensors that provides utilities for managing sensor data in a Physical AI system. This practical example will demonstrate all the concepts we cover, from package creation to testing and installation.
1. ROS 2 Package Structure (600 words)
Standard Package Layout (300 words)
A ROS 2 Python package follows a standardized structure that enables consistent tooling and project organization:
physical_ai_sensors/ # Package root directory
├── package.xml # Package metadata and dependencies
├── setup.py # Python package configuration
├── setup.cfg # Build configuration
├── requirements.txt # Python dependencies
├── pytest.ini or tox.ini # Test configuration
├── README.md # Package documentation
├── LICENSE # License information
├── CHANGELOG.rst # Version changelog
├── resource/ # Package resource files
│ └── physical_ai_sensors # Executables resource file
├── test/ # Test files
│ ├── test_copyright.py
│ ├── test_flake8.py
│ ├── test_pep257.py
│ └── test_my_nodes.py
├── physical_ai_sensors/ # Main Python package
│ ├── __init__.py # Package initialization
│ ├── sensors/ # Module for sensor utilities
│ │ ├── __init__.py
│ │ ├── camera_utils.py
│ │ ├── lidar_utils.py
│ │ └── imu_processor.py
│ ├── nodes/ # ROS 2 nodes
│ │ ├── __init__.py
│ │ ├── sensor_fusion_node.py
│ │ └── data_logger_node.py
│ └── utils/ # Utility functions
│ ├── __init__.py
│ ├── data_validation.py
│ └── timestamp_sync.py
└── launch/ # Launch files
└── sensor_processing.launch.py
Key Directories and Their Purpose:
physical_ai_sensors/: Main Python package directory (named after the package)launch/: Contains launch files for starting nodestest/: Unit and integration testsresource/: Contains resource files for executablesconfig/: (Optional) Configuration files
Each Python package directory should include an __init__.py file to ensure proper Python imports, even if it's empty.
Package Configuration Files (300 words)
package.xml: This is the primary package metadata file that specifies dependencies, authors, license, and maintainers:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>physical_ai_sensors</name>
<version>0.1.0</version>
<description>ROS 2 package for sensor processing in Physical AI systems</description>
<maintainer email="maintainer@physicalaitextbook.com">Your Name</maintainer>
<license>Apache License 2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>builtin_interfaces</depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>numpy</exec_depend>
<exec_depend>scipy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
setup.py: Configures the Python package and defines entry points for executables:
from setuptools import setup, find_packages
import os
from glob import glob
package_name = 'physical_ai_sensors'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# Include launch files
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*.launch.py'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Your Name',
maintainer_email='maintainer@physicalaitextbook.com',
description='ROS 2 package for sensor processing in Physical AI systems',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sensor_fusion_node = physical_ai_sensors.nodes.sensor_fusion_node:main',
'data_logger_node = physical_ai_sensors.nodes.data_logger_node:main',
],
},
)
setup.cfg: Additional build configuration:
[develop]
script-dir=$base/lib/physical_ai_sensors
[install]
install-scripts=$base/lib/physical_ai_sensors
2. Creating a ROS 2 Python Package (800 words)
Using the ROS 2 Package Creation Tool (400 words)
The easiest way to create a new ROS 2 Python package is using the ros2 pkg create command:
cd ~/ros2_workspace/src
ros2 pkg create --build-type ament_python physical_ai_sensors
This command creates the standard package structure with all necessary files. The --build-type ament_python flag tells ROS 2 that this is a Python package using the ament build system.
Let's customize the generated package for our Physical AI sensor utilities:
Step 1: Update the package.xml file:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>physical_ai_sensors</name>
<version>1.0.0</version>
<description>Utilities for processing sensor data in Physical AI and humanoid robotics applications</description>
<maintainer email="developer@physicalaitextbook.com">Physical AI Robotics Team</maintainer>
<license>Apache License 2.0</license>
<!-- Dependencies -->
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>builtin_interfaces</depend>
<!-- Execution dependencies -->
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>cv_bridge</exec_depend>
<!-- Test dependencies -->
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
Step 2: Update setup.py with proper dependencies:
import os
from glob import glob
from setuptools import setup, find_packages
package_name = 'physical_ai_sensors'
setup(
name=package_name,
version='1.0.0',
packages=find_packages(), # Finds all Python packages in the directory
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*.launch.py'))),
(os.path.join('share', package_name, 'config'),
glob(os.path.join('config', '*.yaml'))),
],
install_requires=['setuptools'],
zip_safe=True,
author='Physical AI Robotics Team',
maintainer='Physical AI Robotics Team',
maintainer_email='developer@physicalaitextbook.com',
description='Utilities for processing sensor data in Physical AI systems',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
# Sensor processing nodes
'imu_processor = physical_ai_sensors.nodes.imu_processor:main',
'camera_subscriber = physical_ai_sensors.nodes.camera_subscriber:main',
'lidar_preprocessor = physical_ai_sensors.nodes.lidar_preprocessor:main',
'sensor_fusion = physical_ai_sensors.nodes.sensor_fusion:main',
# Utility tools
'data_validator = physical_ai_sensors.utils.data_validator:main',
'calibration_tool = physical_ai_sensors.utils.calibration_tool:main',
],
},
)
Manual Package Creation (400 words)
While using ros2 pkg create is recommended, understanding the manual process is valuable:
Step 1: Create the directory structure manually:
mkdir -p physical_ai_sensors/{physical_ai_sensors/{sensors,nodes,utils},launch,test,resource,config}
touch physical_ai_sensors/physical_ai_sensors/__init__.py
touch physical_ai_sensors/physical_ai_sensors/sensors/__init__.py
touch physical_ai_sensors/physical_ai_sensors/nodes/__init__.py
touch physical_ai_sensors/physical_ai_sensors/utils/__init__.py
Step 2: Create the main sensor utilities module (physical_ai_sensors/physical_ai_sensors/sensors/imu_processor.py):
"""IMU Processing Utilities for Physical AI Systems.
This module provides utilities for processing IMU (Inertial Measurement Unit) data
commonly used in humanoid robots and other Physical AI applications.
"""
from typing import Tuple, Optional
import numpy as np
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Vector3
from rclpy.time import Time
class IMUProcessor:
"""Provides utilities for processing IMU data."""
def __init__(self, covariance_params: Optional[dict] = None):
"""Initialize the IMU processor.
Args:
covariance_params: Dictionary of covariance parameters for noise modeling
"""
self.covariance_params = covariance_params or {}
self.bias = np.zeros(6) # Bias for 3-axis accelerometer and 3-axis gyroscope
self.gravity = 9.81 # Standard gravity in m/s^2
def extract_orientation_from_quaternion(self, imu_msg: Imu) -> Tuple[float, float, float]:
"""Extract roll, pitch, yaw from quaternion in IMU message.
Args:
imu_msg: Sensor message containing orientation quaternion
Returns:
Tuple of (roll, pitch, yaw) in radians
"""
import tf_transformations # Usually imported separately
# Extract quaternion components
q = imu_msg.orientation
quaternion = [q.x, q.y, q.z, q.w]
# Convert quaternion to Euler angles
roll, pitch, yaw = tf_transformations.euler_from_quaternion(quaternion)
return roll, pitch, yaw
def extract_linear_acceleration(self, imu_msg: Imu) -> Tuple[float, float, float]:
"""Extract linear acceleration from IMU message, removing gravity.
Args:
imu_msg: Sensor message containing linear acceleration
Returns:
Tuple of (ax, ay, az) linear acceleration in m/s^2
"""
acc = imu_msg.linear_acceleration
# Convert to numpy array for processing
linear_acc = np.array([acc.x, acc.y, acc.z])
# Optionally subtract gravity vector (if known)
# This depends on the orientation of the IMU
# gravity_vector = self._get_gravity_vector_in_imu_frame(orientation)
# linear_acc -= gravity_vector
return linear_acc[0], linear_acc[1], linear_acc[2]
def compute_velocity_from_acceleration(self, acceleration_data: np.ndarray,
timestamps: np.ndarray) -> np.ndarray:
"""Compute velocity by integrating acceleration data.
Args:
acceleration_data: Array of acceleration values (shape: N x 3 for x,y,z)
timestamps: Array of corresponding timestamps (in seconds)
Returns:
Array of velocity values (shape: N x 3 for x,y,z)
"""
if len(acceleration_data) != len(timestamps) or len(timestamps) < 2:
raise ValueError("Invalid input arrays")
# Compute deltas between timestamps
dt = np.diff(timestamps)
# Integrate acceleration to get velocity
velocity_changes = acceleration_data[:-1] * dt[:, np.newaxis]
# Initial velocity assumed to be zero
velocity = np.zeros_like(acceleration_data)
velocity[1:] = np.cumsum(velocity_changes, axis=0)
return velocity
def apply_bias_correction(self, raw_data: np.ndarray) -> np.ndarray:
"""Apply bias correction to raw IMU readings.
Args:
raw_data: Array of raw sensor readings (6-dof: acc_x, acc_y, acc_z, gyro_x, gyro_y, gyro_z)
Returns:
Corrected sensor readings
"""
corrected = raw_data - self.bias
return corrected
def validate_imu_message(imu_msg: Imu) -> bool:
"""Validate IMU message integrity.
Args:
imu_msg: Sensor message to validate
Returns:
True if message is valid, False otherwise
"""
# Check if timestamp is valid
if imu_msg.header.stamp.sec < 0 or imu_msg.header.stamp.nanosec < 0:
return False
# Check if orientation is normalized (approximately)
q = imu_msg.orientation
quat_norm = np.sqrt(q.x**2 + q.y**2 + q.z**2 + q.w**2)
if not (0.99 < quat_norm < 1.01):
return False
# Check for NaN or inf values
def has_invalid_values(arr):
return np.any(np.isnan(arr)) or np.any(np.isinf(arr))
lin_acc = np.array([imu_msg.linear_acceleration.x,
imu_msg.linear_acceleration.y,
imu_msg.linear_acceleration.z])
ang_vel = np.array([imu_msg.angular_velocity.x,
imu_msg.angular_velocity.y,
imu_msg.angular_velocity.z])
if has_invalid_values(lin_acc) or has_invalid_values(ang_vel):
return False
return True
Step 3: Create a ROS 2 node that uses the utilities (physical_ai_sensors/physical_ai_sensors/nodes/imu_processor.py):
#!/usr/bin/env python3
"""IMU Processor Node for Physical AI Systems.
This node processes IMU data and provides processed outputs for balance control,
navigation, and other Physical AI applications.
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Vector3Stamped
from std_msgs.msg import Float32
import numpy as np
from physical_ai_sensors.sensors.imu_processor import IMUProcessor, validate_imu_message
class IMUProcessorNode(Node):
"""ROS 2 node that processes IMU data for Physical AI applications."""
def __init__(self):
super().__init__('imu_processor_node')
# Initialize the IMU processor utility
self.imu_processor = IMUProcessor()
# Declare parameters
self.declare_parameter('publish_frequency', 100) # Hz
self.declare_parameter('use_bias_correction', True)
# Get parameters
self.publish_freq = self.get_parameter('publish_frequency').value
self.use_bias_correction = self.get_parameter('use_bias_correction').value
# Create subscription to raw IMU data
self.subscription = self.create_subscription(
Imu,
'/imu/raw',
self.imu_callback,
10 # QoS history depth
)
# Publishers for processed data
self.euler_angles_pub = self.create_publisher(Vector3Stamped, '/imu/euler_angles', 10)
self.linear_velocity_pub = self.create_publisher(Vector3Stamped, '/imu/linear_velocity', 10)
self.acceleration_mag_pub = self.create_publisher(Float32, '/imu/acceleration_magnitude', 10)
# Initialize state variables
self.last_timestamp = None
self.velocity_buffer = []
self.get_logger().info(
f'IMU Processor Node initialized with frequency: {self.publish_freq}Hz, '
f'bias correction: {self.use_bias_correction}'
)
def imu_callback(self, msg: Imu) -> None:
"""Process incoming IMU message.
Args:
msg: Raw IMU sensor message
"""
# Validate the message first
if not validate_imu_message(msg):
self.get_logger().warn('Received invalid IMU message, skipping')
return
# Apply bias correction if enabled
if self.use_bias_correction:
# In a real implementation, we'd correct the raw values
# For now, just log that we're applying correction
pass
# Extract orientation as Euler angles
roll, pitch, yaw = self.imu_processor.extract_orientation_from_quaternion(msg)
# Create and publish Euler angles
euler_msg = Vector3Stamped()
euler_msg.header = msg.header
euler_msg.vector.x = roll
euler_msg.vector.y = pitch
euler_msg.vector.z = yaw
self.euler_angles_pub.publish(euler_msg)
# Calculate and publish acceleration magnitude
acc = msg.linear_acceleration
acc_magnitude = np.sqrt(acc.x**2 + acc.y**2 + acc.z**2)
mag_msg = Float32()
mag_msg.data = acc_magnitude
self.acceleration_mag_pub.publish(mag_msg)
# Estimate velocity by integrating acceleration
if self.last_timestamp is not None:
dt = (msg.header.stamp.sec - self.last_timestamp.sec) + \
(msg.header.stamp.nanosec - self.last_timestamp.nanosec) * 1e-9
if dt > 0:
# For simplicity, we'll just report the instantaneous values
# In a real implementation, we would integrate over time
linear_acc = np.array([acc.x, acc.y, acc.z])
velocity_change = linear_acc * dt
# Create and publish velocity estimate
vel_msg = Vector3Stamped()
vel_msg.header = msg.header
vel_msg.vector.x = velocity_change[0]
vel_msg.vector.y = velocity_change[1]
vel_msg.vector.z = velocity_change[2]
self.linear_velocity_pub.publish(vel_msg)
self.last_timestamp = msg.header.stamp
# Log key data periodically
if self.get_clock().now().nanoseconds % (1000000000 // 10) < (1000000000 // self.publish_freq): # Log every 0.1s
self.get_logger().debug(
f'Processed IMU data: Orientation(RPY)=({roll:.3f}, {pitch:.3f}, {yaw:.3f}), '
f'Acc Mag={acc_magnitude:.3f}'
)
def main(args=None):
"""Main function to run the IMU processor node."""
rclpy.init(args=args)
try:
imu_processor_node = IMUProcessorNode()
rclpy.spin(imu_processor_node)
except KeyboardInterrupt:
print("Shutting down IMU processor node...")
finally:
imu_processor_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
This implementation shows how to structure a proper ROS 2 Python package following all best practices and conventions, including proper imports, type hints, documentation, and parameter management.
3. Dependency Management (700 words)
Managing Python Dependencies (400 words)
Python dependency management in ROS 2 requires understanding both ROS-specific and standard Python tools:
requirements.txt approach: For Python packages that will be used in ROS 2, you can still maintain a requirements.txt file:
# requirements.txt for physical_ai_sensors
numpy>=1.19.0
scipy>=1.6.0
opencv-python>=4.5.0
transforms3d>=0.3.1
pyquaternion>=0.9.9
However, ROS 2 dependencies should mainly be specified in package.xml:
<package format="3">
...
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
...
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
...
</package>
Dependency Resolution: ROS 2 has its own dependency resolver (rosdep) that works alongside pip:
# Install system dependencies with rosdep
rosdep install --from-paths src --ignore-src -r -y
# Install Python dependencies with pip (for development)
pip3 install -r requirements.txt
Version Constraints and Compatibility (300 words)
Semantic Versioning: ROS 2 follows semantic versioning (MAJOR.MINOR.PATCH) for packages:
<!-- For dependencies that are also ROS packages -->
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
Python Package Dependencies: For external Python packages, you typically specify them in your package's setup.py:
# In setup.py
setup(
name='physical_ai_sensors',
...
install_requires=[
'setuptools',
'numpy>=1.19.0,<2.0.0', # Specify version bounds
'scipy>=1.6.0',
'opencv-python>=4.5.0',
],
...
)
Virtual Environments: For development, using virtual environments helps manage dependencies:
# Create virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -e .
ROS 2 uses ament as its build system, which handles both C++ and Python packages. For Python, ament_python is used as the build type.
4. Testing ROS 2 Python Packages (500 words)
Unit Testing with pytest (250 words)
ROS 2 Python packages should include unit tests following standard Python practices:
# test/test_imu_processor.py
import pytest
import numpy as np
from sensor_msgs.msg import Imu
from geometry_msgs.msg import Quaternion
from std_msgs.msg import Header
from physical_ai_sensors.sensors.imu_processor import IMUProcessor, validate_imu_message
def test_imu_processor_initialization():
"""Test that IMUProcessor initializes correctly."""
processor = IMUProcessor()
assert processor is not None
assert hasattr(processor, 'bias')
assert len(processor.bias) == 6 # 3-axis acc + 3-axis gyro
def test_extract_orientation_from_quaternion():
"""Test extracting Euler angles from quaternion."""
processor = IMUProcessor()
# Create a mock IMU message with identity quaternion (no rotation)
imu_msg = Imu()
imu_msg.orientation.w = 1.0 # Identity quaternion
imu_msg.orientation.x = 0.0
imu_msg.orientation.y = 0.0
imu_msg.orientation.z = 0.0
roll, pitch, yaw = processor.extract_orientation_from_quaternion(imu_msg)
# Should be approximately zero for identity quaternion
assert pytest.approx(roll, abs=1e-6) == 0.0
assert pytest.approx(pitch, abs=1e-6) == 0.0
assert pytest.approx(yaw, abs=1e-6) == 0.0
def test_compute_velocity_from_acceleration():
"""Test velocity computation from acceleration data."""
processor = IMUProcessor()
# Simple test: constant acceleration yields linearly increasing velocity
acceleration_data = np.array([[1.0, 0.0, 0.0]] * 10) # 1 m/s^2 in x-direction
timestamps = np.linspace(0.0, 0.9, 10) # Every 0.1 seconds
velocities = processor.compute_velocity_from_acceleration(acceleration_data, timestamps)
# First velocity should be 0, subsequent should increase
assert velocities[0][0] == pytest.approx(0.0)
# After 0.1s with 1 m/s^2 acceleration, velocity should be ~0.1 m/s
assert velocities[1][0] == pytest.approx(0.01, abs=0.001) # 1.0 * 0.01 dt
def test_validate_imu_message_valid():
"""Test validation of valid IMU message."""
imu_msg = Imu()
imu_msg.header = Header()
imu_msg.header.stamp.sec = 100
imu_msg.header.stamp.nanosec = 500000000
imu_msg.orientation.w = 1.0 # Normalized quaternion
assert validate_imu_message(imu_msg)
def test_validate_imu_message_invalid():
"""Test validation of invalid IMU message."""
imu_msg = Imu()
imu_msg.header = Header()
imu_msg.header.stamp.sec = -1 # Invalid negative timestamp
assert not validate_imu_message(imu_msg)
# Test invalid quaternion normalization
imu_msg.header.stamp.sec = 100
imu_msg.orientation.w = 2.0 # Not normalized
assert not validate_imu_message(imu_msg)
ROS 2 Specific Testing (250 words)
ROS 2 provides specific test utilities for node testing:
# test/test_ros_integration.py
import unittest
import rclpy
from rclpy.executors import SingleThreadedExecutor
from rclpy.node import Node
from sensor_msgs.msg import Imu
from std_msgs.msg import Float32
from physical_ai_sensors.nodes.imu_processor import IMUProcessorNode
class TestIMUProcessorNode(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Initialize ROS 2 context once for all tests."""
rclpy.init()
@classmethod
def tearDownClass(cls):
"""Shutdown ROS 2 context after all tests."""
rclpy.shutdown()
def setUp(self):
"""Set up test fixtures before each test method."""
self.node = IMUProcessorNode()
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.node)
def tearDown(self):
"""Tear down test fixtures after each test method."""
self.node.destroy_node()
def test_node_parameters(self):
"""Test that node declares and retrieves parameters correctly."""
# Check the default values are what we expect
freq_param = self.node.get_parameter('publish_frequency')
bias_corr_param = self.node.get_parameter('use_bias_correction')
self.assertEqual(freq_param.value, 100)
self.assertEqual(bias_corr_param.value, True)
def test_imu_callback_processes_message(self):
"""Test that IMU callback processes messages without error."""
# Create a valid IMU message
imu_msg = Imu()
imu_msg.header.stamp.sec = 100
imu_msg.header.stamp.nanosec = 500000000
imu_msg.orientation.w = 1.0
# Call the callback directly
try:
self.node.imu_callback(imu_msg)
# If we get here, no exception was raised
success = True
except Exception as e:
print(f"Exception in callback: {e}")
success = False
self.assertTrue(success, "IMU callback should process message without error")
if __name__ == '__main__':
unittest.main()
Running Tests:
# Run tests with colcon
colcon test --packages-select physical_ai_sensors
# Run specific tests
python3 -m pytest test/test_imu_processor.py -v
# Run ROS-specific tests
python3 -m pytest test/test_ros_integration.py -v
5. Build and Installation (400 words)
Using Colcon for Building (200 words)
ROS 2 packages use colcon as the build tool:
# Build a specific package
colcon build --packages-select physical_ai_sensors
# Build with symlinks to avoid copying
colcon build --symlink-install --packages-select physical_ai_sensors
# Build and install specific package only
colcon build --packages-select physical_ai_sensors --event-handlers console_direct+
# Build entire workspace
colcon build
Build Process for Python Packages:
- ament_python detects the build type from package.xml
- setuptools installs the Python package using setup.py
- Console scripts are registered based on setup.py entry_points
- Data files (launch files, configs) are copied to the install directory
Source Setup (200 words)
After building, you must source the setup file to use the package:
# Source the entire workspace
source install/setup.bash
# Or source just the local package (development)
source install/physical_ai_sensors/local_setup.bash
# In a new terminal, always source before using packages
source ~/ros2_workspace/install/setup.bash
# Verify the package is available to ROS
ros2 pkg list | grep physical_ai_sensors
Installation Verification:
# List nodes provided by the package
ros2 node list
# List available executables (console scripts)
ros2 run physical_ai_sensors imu_processor --ros-args --help
# Run the node
ros2 run physical_ai_sensors imu_processor
6. Hands-On Exercise (500 words)
Exercise: Create a Camera Processing Package
Objective: Create a complete ROS 2 Python package for camera image processing with proper structure, dependencies, and tests.
Prerequisites:
- ROS 2 Humble Hawksbill installed
- Python 3.8+ environment
- Completed previous chapters
Steps:
Step 1: Create the Package Structure
# Navigate to your ROS 2 workspace source directory
cd ~/ros2_workspace/src
# Create the camera processing package
ros2 pkg create --build-type ament_python physical_ai_camera
# Change into the new package directory
cd physical_ai_camera
Step 2: Set Up the Package Files
Create the package.xml file:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>physical_ai_camera</name>
<version>1.0.0</version>
<description>ROS 2 package for camera processing in Physical AI systems</description>
<maintainer email="developer@physicalaitextbook.com">Physical AI Team</maintainer>
<license>Apache License 2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>cv_bridge</depend>
<depend>image_transport</depend>
<exec_depend>python3-opencv</exec_depend>
<exec_depend>python3-numpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
Update the setup.py file:
from setuptools import setup, find_packages
import os
from glob import glob
package_name = 'physical_ai_camera'
setup(
name=package_name,
version='1.0.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*.launch.py'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Physical AI Team',
maintainer_email='developer@physicalaitextbook.com',
description='ROS 2 package for camera processing in Physical AI systems',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'image_subscriber = physical_ai_camera.nodes.image_subscriber:main',
'object_detector = physical_ai_camera.nodes.object_detector:main',
],
},
)
Step 3: Create the Package Structure
# Create the necessary directories
mkdir -p physical_ai_camera/{utils,camera,nodes}
mkdir -p launch test
# Create empty init files
touch physical_ai_camera/__init__.py
touch physical_ai_camera/utils/__init__.py
touch physical_ai_camera/camera/__init__.py
touch physical_ai_camera/nodes/__init__.py
Step 4: Create a Camera Processing Utility Module
Create physical_ai_camera/physical_ai_camera/camera/image_processor.py:
"""Image Processing Utilities for Physical AI Systems.
This module provides utilities for processing camera images
commonly used in humanoid robots and other Physical AI applications.
"""
import cv2
import numpy as np
from typing import Tuple, Optional
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
class ImageProcessor:
"""Provides utilities for processing camera images."""
def __init__(self):
"""Initialize the image processor."""
self.bridge = CvBridge()
self.focal_length = 500.0 # Default focal length in pixels
def ros_image_to_opencv(self, img_msg: Image) -> np.ndarray:
"""Convert ROS Image message to OpenCV image.
Args:
img_msg: ROS Image message
Returns:
OpenCV image as numpy array
"""
try:
cv_image = self.bridge.imgmsg_to_cv2(img_msg, desired_encoding='bgr8')
return cv_image
except Exception as e:
print(f"Could not convert ROS image to OpenCV: {e}")
return None
def opencv_to_ros_image(self, cv_image: np.ndarray) -> Image:
"""Convert OpenCV image to ROS Image message.
Args:
cv_image: OpenCV image as numpy array
Returns:
ROS Image message
"""
try:
ros_img = self.bridge.cv2_to_imgmsg(cv_image, encoding='bgr8')
return ros_img
except Exception as e:
print(f"Could not convert OpenCV image to ROS: {e}")
return None
def detect_edges(self, image: np.ndarray) -> np.ndarray:
"""Detect edges in an image using Canny edge detection.
Args:
image: Input image in BGR format
Returns:
Binary image with edges highlighted
"""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply Gaussian blur to reduce noise
blurred = cv2.GaussianBlur(gray, (5, 5), 0)
# Apply Canny edge detection
edges = cv2.Canny(blurred, 50, 150)
return edges
def detect_corners(self, image: np.ndarray, max_corners: int = 100) -> np.ndarray:
"""Detect corners in an image using Shi-Tomasi corner detector.
Args:
image: Input image in BGR format
max_corners: Maximum number of corners to detect
Returns:
Array of corner coordinates
"""
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Detect corners
corners = cv2.goodFeaturesToTrack(
gray,
maxCorners=max_corners,
qualityLevel=0.01,
minDistance=10
)
# Return as array of (x, y) coordinates
if corners is not None:
return np.int0(corners).reshape(-1, 2)
else:
return np.array([])
def calculate_depth_from_disparity(self, left_image: np.ndarray,
right_image: np.ndarray) -> np.ndarray:
"""Calculate depth map from stereo pair images.
Args:
left_image: Left camera image in BGR format
right_image: Right camera image in BGR format
Returns:
Depth map as grayscale image
"""
# Convert to grayscale
gray_left = cv2.cvtColor(left_image, cv2.COLOR_BGR2GRAY)
gray_right = cv2.cvtColor(right_image, cv2.COLOR_BGR2GRAY)
# Create stereo matcher
stereo = cv2.StereoBM_create(numDisparities=32, blockSize=15)
# Compute disparity
disparity = stereo.compute(gray_left, gray_right)
# Convert to depth using focal length and baseline
# In practice, you'd need the actual stereo camera parameters
baseline = 0.2 # Baseline in meters (example)
depth_map = (self.focal_length * baseline) / (disparity + 1e-6) # Add small value to avoid division by zero
# Normalize for visualization
depth_normalized = cv2.normalize(depth_map, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8UC1)
return depth_normalized
Create physical_ai_camera/physical_ai_camera/nodes/image_subscriber.py:
#!/usr/bin/env python3
"""Image Subscriber Node for Physical AI Systems.
This node subscribes to camera images and provides basic processing.
"""
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from std_msgs.msg import Header
import numpy as np
from physical_ai_camera.camera.image_processor import ImageProcessor
class ImageSubscriberNode(Node):
"""ROS 2 node that subscribes to camera images and processes them."""
def __init__(self):
super().__init__('image_subscriber_node')
# Initialize the image processor utility
self.image_processor = ImageProcessor()
# Declare parameters
self.declare_parameter('input_topic', '/camera/image_raw')
self.declare_parameter('enable_edge_detection', False)
# Get parameters
self.input_topic = self.get_parameter('input_topic').value
self.enable_edge_detection = self.get_parameter('enable_edge_detection').value
# Create subscription to camera images
self.subscription = self.create_subscription(
Image,
self.input_topic,
self.image_callback,
10 # QoS history depth
)
# Publisher for processed images
self.processed_publisher = self.create_publisher(
Image,
'/camera/image_processed',
10
)
self.frame_counter = 0
self.get_logger().info(
f'Image subscriber initialized - topic: {self.input_topic}, '
f'edge detection: {self.enable_edge_detection}'
)
def image_callback(self, msg: Image) -> None:
"""Process incoming image message.
Args:
msg: Raw camera image message
"""
# Convert ROS image to OpenCV
cv_image = self.image_processor.ros_image_to_opencv(msg)
if cv_image is None:
self.get_logger().warn('Could not convert image message')
return
# Apply processing based on parameters
if self.enable_edge_detection:
processed_image = self.image_processor.detect_edges(cv_image)
# Convert back to 3-channel for publishing
processed_image = cv2.cvtColor(processed_image, cv2.COLOR_GRAY2BGR)
else:
processed_image = cv_image # Use original image
# Convert processed image back to ROS message
processed_msg = self.image_processor.opencv_to_ros_image(processed_image)
if processed_msg is not None:
# Update header timestamp for processed message
processed_msg.header = Header()
processed_msg.header.stamp = self.get_clock().now().to_msg()
processed_msg.header.frame_id = msg.header.frame_id + '_processed'
# Publish the processed image
self.processed_publisher.publish(processed_msg)
# Count frames and log periodically
self.frame_counter += 1
if self.frame_counter % 30 == 0: # Log every 30 frames
height, width = cv_image.shape[:2]
self.get_logger().info(f'Processed frame {self.frame_counter}: {width}x{height}')
def main(args=None):
"""Main function to run the image subscriber node."""
rclpy.init(args=args)
try:
image_subscriber_node = ImageSubscriberNode()
rclpy.spin(image_subscriber_node)
except KeyboardInterrupt:
print("Shutting down image subscriber node...")
finally:
# Cleanup happens in destroy_node
image_subscriber_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 5: Build and Test Your Package
# Navigate back to the workspace root
cd ~/ros2_workspace
# Build your package
colcon build --packages-select physical_ai_camera
# Source the workspace
source install/setup.bash
# Verify your package exists
ros2 pkg list | grep physical_ai_camera
# Run the image subscriber node
ros2 run physical_ai_camera image_subscriber
Expected Result:
You should see the node initialize and wait for camera images. If you have a camera publishing images to /camera/image_raw, the node would process them. If no camera data is available, the node will continue running and log its status.
Troubleshooting:
- If build fails, ensure all file references in setup.py are correct
- If imports don't work, verify the Python package structure is correct
- If ROS commands don't recognize your package, check that you've sourced the correct setup file
Extension Challenge (Optional)
Add a new node to your package that performs object detection using a simple color-based approach. The node should detect objects of a specific color in the camera feed and publish bounding box coordinates.
7. Assessment Questions (10 questions)
Multiple Choice (5 questions)
Question 1: What is the purpose of the <build_type>ament_python</build_type> tag in package.xml?
a) Specifies that the package uses Python as its primary language
b) Tells ROS 2 to use the ament build system with Python-specific extensions
c) Indicates the package is ament-compatible
d) Sets the package version
Details
Click to reveal answer
Answer: b Explanation: The build_type tag in the export section tells ROS 2's build tool (colcon) which build system plugin to use. ament_python is specifically designed for Python packages.Question 2: What is the correct file structure for a ROS 2 Python package entry point?
a) Defined in package.xml under
Details
Click to reveal answer
Answer: b Explanation: Console scripts in ROS 2 Python packages are defined in setup.py under the entry_points dictionary, specifically in the 'console_scripts' list.Question 3: Which command is used to build a specific ROS 2 package? a) build package_name b) colcon build package_name c) colcon build --packages-select package_name d) catkin_make package_name
Details
Click to reveal answer
Answer: c Explanation: With colcon (ROS 2's build system), you use the --packages-select option to build specific packages.Question 4: What does the ros_image_to_opencv function typically return? a) A ROS Image message b) A NumPy array containing image data c) A PIL Image object d) A raw byte string
Details
Click to reveal answer
Answer: b Explanation: Functions like ros_image_to_opencv typically return a NumPy array containing the image data in OpenCV-compatible format, which is essential for image processing operations.Question 5: Where should ROS 2 Python dependencies be specified to ensure proper resolution? a) Only in requirements.txt b) Only in setup.py c) In package.xml and setup.py as appropriate d) In the source code directly
Details
Click to reveal answer
Answer: c Explanation: For proper ROS 2 dependency management, system-level Python packages should be listed in package.xml as exec_depend, while import dependencies should be in setup.py install_requires.Short Answer (3 questions)
Question 6: Explain the difference between build dependencies and execution dependencies in ROS 2 packages, and provide examples of each.
Details
Click to reveal sample answer
Build dependencies (dep) are required during compilation/building of the package (e.g., rclpy, std_msgs). Execution dependencies (exec_depend) are required when the package is run (e.g., python3-opencv, python3-numpy). Test dependencies (test_depend) are required for running tests (e.g., pytest, ament_copyright).Question 7: Describe the purpose of the init.py files in a ROS 2 Python package and why they are necessary.
Details
Click to reveal sample answer
init.py files make directories into Python packages, allowing them to be imported. In ROS 2, they're required for proper Python packaging. Even if empty, they signal to Python that the directory contains a package. Without them, Python cannot import modules from subdirectories.Question 8: What is the purpose of the cv_bridge package in ROS 2 Python image processing applications?
Details
Click to reveal sample answer
The cv_bridge package converts between ROS Image messages and OpenCV image formats. Since ROS uses its own image message format and OpenCV uses NumPy arrays, cv_bridge provides the necessary conversion utilities, allowing ROS nodes to process images using OpenCV functions.Practical Exercises (2 questions)
Question 9: Practical Implementation Task Create a ROS 2 Python package named 'physical_ai_navigation' that includes:
- A utility module for path planning algorithms (at least 2 different path planning methods)
- A node that subscribes to sensor data and publishes navigation commands
- Unit tests for your path planning utilities
- Proper package.xml and setup.py files
Document the complete process from package creation to successful node execution, including any challenges you encountered and how you solved them.
Question 10: Package Integration Challenge Design a system where your 'physical_ai_camera' package from the exercise communicates with a hypothetical 'physical_ai_navigation' package to provide visual obstacle detection for navigation. Specify:
- What topics these packages would share
- What message types would be used
- How the camera node would process data to detect obstacles
- How the navigation node would use this information
- What QoS policies would be appropriate for this communication
8. Further Reading Resources
-
"Effective Python for Robotics" - Peter Corke's Python Robotics Guide Why read: Python best practices specifically for robotics applications Link: https://github.com/petercorke/python-robotics-toolkit
-
ROS 2 Python Package Documentation Why read: Official guide to Python packages in ROS 2 Link: https://docs.ros.org/en/rolling/Tutorials/Beginner-Client-Libraries/Create-Python-Package.html
-
ament_python Build System Documentation Why read: Understanding the ROS 2 build system for Python Link: https://github.com/ament/ament_python
-
"Programming Robots with ROS" - Morgan Quigley et al. Why read: Comprehensive guide to ROS software engineering Link: https://www.oreilly.com/library/view/programming-robots-with/9781449323899/
-
Python Testing with pytest Why read: Best practices for Python testing applicable to ROS 2 Link: https://docs.pytest.org/
-
Colcon Build Tool Documentation Why read: Understanding the ROS 2 build system Link: https://colcon.readthedocs.io/en/released/
-
ROS 2 Design Philosophy: Client Library Interface Why read: Understanding the design principles behind ROS 2 packages Link: https://design.ros2.org/articles/client_library_interface.html
Recommended Reading Order:
- Start with the official ROS 2 Python package tutorial
- Read about the ament build system
- Review Python best practices for robotics
- Study testing methodologies for robotic systems
9. Hardware/Software Requirements
Software Requirements:
- ROS 2 Humble (or latest LTS) installed
- Python 3.8 or higher with pip
- Git for version control
- Text editor or IDE with Python support (VS Code recommended)
- Terminal/shell access
Hardware Requirements:
- Computer with 4+ GB RAM (8+ GB recommended)
- Multi-core processor (Intel i5 or equivalent)
- Network connection for package downloads
- (Optional) Robot with camera for hardware testing
10. Chapter Summary & Next Steps
Chapter Summary
In this chapter, you learned:
- ROS 2 Python package structure and conventions
- How to create and configure packages with proper metadata
- Dependency management using both package.xml and setup.py
- Implementation of utility modules and ROS 2 nodes
- Testing of ROS 2 Python packages using pytest
- Build and installation processes with colcon
- Best practices for organizing Python code in ROS 2 contexts
Next Steps
Chapter 1.8 will cover ROS 2 launch files, parameters, and configuration systems. You'll learn how to configure and launch complex multi-node systems efficiently, building on the package knowledge you've gained here. This will enable you to create sophisticated robotic applications with multiple coordinated nodes.
Estimated Time to Complete: 3 hours Difficulty Level: Intermediate Prerequisites: Chapters 1.1-1.6