Chapter 1.6: Nodes, Topics, Services, Actions
Learning Objectives
By the end of this chapter, you will be able to:
- Implement ROS 2 nodes in both Python and C++
- Design and implement publisher-subscriber communication patterns
- Create and use services for request-response communication
- Develop action clients and servers for long-running tasks
- Choose the appropriate communication pattern for different scenarios
- Understand the trade-offs between different communication types
- Debug communication issues in ROS 2 systems
- Design complex robot behaviors using multiple communication patterns
Introduction
In the previous chapter, we explored the architecture of ROS 2 and its core concepts. Now we'll dive deeper into the communication patterns that form the backbone of every robotic system. Understanding when and how to use nodes, topics, services, and actions is crucial for building robust, efficient robotic systems.
These communication patterns are fundamental to how robots coordinate their activities. Imagine a mobile robot navigating through a warehouse: its camera continuously streams images (topics), it requests specific location information from a mapping service (services), performs long-duration navigation tasks with real-time feedback (actions), and coordinates all these activities through various nodes working together.
Different communication patterns serve different purposes in robotic systems. In this chapter, we'll explore each pattern in detail, with hands-on examples that show how they work in practice and when to use each one. We'll also examine how these patterns work together in complex robotic systems, using the Physical AI and humanoid robotics context to ground our examples in real-world applications.
1. Nodes: The Foundation of ROS 2 (600 words)
What are Nodes? (200 words)
A node is the fundamental unit of computation in ROS 2. It's essentially a single process that performs a specific task and communicates with other nodes through ROS 2 communication primitives. Think of nodes as microservices in a distributed robotic system - each node has a defined responsibility and communicates with others to accomplish complex tasks.
In the context of Physical AI, different nodes might handle:
- Sensor data processing
- Path planning
- Motor control
- User interaction
- AI reasoning
- System monitoring
Nodes are designed to be modular and reusable. A path planning node developed for a mobile robot can often be adapted for a humanoid robot with minimal changes. This modularity is one of the key advantages of the ROS 2 architecture - you can combine existing nodes with your own custom nodes to build complex robotic behaviors.
Node Creation in Python (250 words)
Let's examine how to create a ROS 2 node in Python:
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from std_msgs.msg import Float32
from std_srvs.srv import SetBool # Example service type
class PhysicalAINode(Node):
def __init__(self):
# Initialize the node with a name
super().__init__('physical_ai_node')
# Create a publisher
self.publisher = self.create_publisher(
String,
'robot_status', # Topic name
10 # Queue size (QoS depth)
)
# Create a subscription
self.subscription = self.create_subscription(
Float32,
'sensor_input',
self.sensor_callback,
10
)
# Create a service server
self.service_server = self.create_service(
SetBool,
'control_command',
self.handle_control_command
)
# Create a timer to periodically publish messages
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.get_logger().info('Physical AI Node initialized')
def sensor_callback(self, msg):
"""Callback function executed when a message is received"""
self.get_logger().info(f'Received sensor data: {msg.data}')
# Process the sensor data and publish result
result_msg = String()
result_msg.data = f'Processed: {msg.data}'
self.publisher.publish(result_msg)
def handle_control_command(self, request, response):
"""Callback for service requests"""
if request.data:
self.get_logger().info('Activation command received')
response.success = True
response.message = 'Robot activated'
else:
self.get_logger().info('Deactivation command received')
response.success = True
response.message = 'Robot deactivated'
return response
def timer_callback(self):
"""Callback executed by timer"""
msg = String()
msg.data = f'Heartbeat: {self.get_clock().now()}'
self.publisher.publish(msg)
def main(args=None):
# Initialize the ROS 2 client library
rclpy.init(args=args)
# Create the node
physical_ai_node = PhysicalAINode()
try:
# Keep the node running until interrupted
rclpy.spin(physical_ai_node)
except KeyboardInterrupt:
physical_ai_node.get_logger().info('Interrupted, shutting down')
finally:
# Clean up
physical_ai_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Key Node Components:
- Node Constructor: Inherits from
rclpy.node.Node, sets up node name - Publishers: Send data to topics
- Subscriptions: Receive data from topics
- Services: Handle request-response communication
- Timers: Execute callbacks at regular intervals
- Actions: Handle long-running tasks with feedback (covered later)
Node Creation in C++ (150 words)
For performance-critical applications, you might prefer C++:
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
class PhysicalAICppNode : public rclcpp::Node
{
public:
PhysicalAICppNode() : Node("physical_ai_cpp_node")
{
publisher_ = this->create_publisher<std_msgs::msg::String>(
"robot_status", 10);
subscription_ = this->create_subscription<std_msgs::msg::String>(
"sensor_input", 10,
std::bind(&PhysicalAICppNode::topic_callback, this, std::placeholders::_1));
timer_ = this->create_wall_timer(
std::chrono::milliseconds(500),
std::bind(&PhysicalAICppNode::timer_callback, this));
}
private:
void topic_callback(const std_msgs::msg::String & msg) const
{
RCLCPP_INFO(this->get_logger(), "Received: '%s'", msg.data.c_str());
}
void timer_callback()
{
auto message = std_msgs::msg::String();
message.data = "Hello from C++ Node";
publisher_->publish(message);
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_;
};
When choosing between Python and C++ for nodes, consider: Python for rapid prototyping and high-level logic, C++ for performance-critical tasks like real-time control loops or heavy computations.
2. Topics: Publish-Subscribe Communication (1000 words)
The Publish-Subscribe Pattern (300 words)
Topics implement a publish-subscribe communication model that enables asynchronous, one-way data flow between nodes. This pattern is perfect for streaming data like sensor readings, robot states, or status updates that need to be broadcast to multiple interested parties simultaneously.
The publish-subscribe pattern has these key characteristics:
- Asynchronous: Publishers don't wait for subscribers to receive messages
- Unidirectional: Data flows one way from publisher to subscriber
- Broadcast: One publisher can send to many subscribers
- Anonymous: Publishers and subscribers don't need to know about each other
In a humanoid robot context, topics are ideal for:
- Publishing camera images for vision processing
- Broadcasting robot joint states for monitoring
- Streaming IMU data for balance control
- Distributing map updates for navigation
- Broadcasting system status for operator awareness
Creating Publishers (300 words)
A publisher sends messages to a named topic. Multiple publishers can write to the same topic, and multiple subscribers can listen to the same topic.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image, JointState
from geometry_msgs.msg import Twist
class SensorPublisher(Node):
def __init__(self):
super().__init__('sensor_publisher')
# Create publisher for camera images
self.image_publisher = self.create_publisher(
Image,
'/camera/image_raw',
# Use predefined QoS profile for sensor data
rclpy.qos.qos_profile_sensor_data
)
# Create publisher for joint states
self.joint_publisher = self.create_publisher(
JointState,
'/joint_states',
10 # Queue size
)
# Create publisher for velocity commands
self.velocity_publisher = self.create_publisher(
Twist,
'/cmd_vel',
1 # Minimal queue for real-time commands
)
# Timer to send data periodically
self.timer = self.create_timer(0.1, self.publish_data)
self.counter = 0
def publish_data(self):
# Create and publish a joint state message
joint_msg = JointState()
joint_msg.name = ['joint1', 'joint2', 'joint3']
joint_msg.position = [0.1, 0.2, 0.3] # Sample positions
joint_msg.header.stamp = self.get_clock().now().to_msg()
self.joint_publisher.publish(joint_msg)
# Create and publish a velocity command
vel_msg = Twist()
vel_msg.linear.x = 0.5 # Move forward at 0.5 m/s
vel_msg.angular.z = 0.2 # Turn slightly
self.velocity_publisher.publish(vel_msg)
self.counter += 1
self.get_logger().info(f'Published sensor data #{self.counter}')
def main(args=None):
rclpy.init(args=args)
publisher = SensorPublisher()
try:
rclpy.spin(publisher)
except KeyboardInterrupt:
publisher.get_logger().info('Shutting down publisher node')
finally:
publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Creating Subscribers (400 words)
Subscribers register interest in a topic and receive messages when they arrive.
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from geometry_msgs.msg import Twist
from std_msgs.msg import Float32
class SensorSubscriber(Node):
def __init__(self):
super().__init__('sensor_subscriber')
# Subscribe to joint states
self.joint_subscription = self.create_subscription(
JointState,
'/joint_states',
self.joint_state_callback,
10 # Queue size
)
# Subscribe to velocity commands
self.velocity_subscription = self.create_subscription(
Twist,
'/cmd_vel',
self.velocity_command_callback,
1 # Minimal queue for real-time commands
)
# Subscribe to other sensor data
self.pressure_subscription = self.create_subscription(
Float32,
'/pressure_sensor',
self.pressure_callback,
10
)
self.get_logger().info('Sensor subscriber initialized')
def joint_state_callback(self, msg):
# Process joint state data
self.get_logger().info(
f'Joint positions: {[round(pos, 3) for pos in msg.position]}'
)
# Perform any processing needed
# For example, check for joint limits:
for pos in msg.position:
if abs(pos) > 3.14: # Check for potential issues
self.get_logger().warn(f'Unusual joint position: {pos}')
def velocity_command_callback(self, msg):
# Process velocity commands
self.get_logger().info(
f'Received velocity command: '
f'linear.x={msg.linear.x:.2f}, '
f'angular.z={msg.angular.z:.2f}'
)
# In a real robot, this would control motors
def pressure_callback(self, msg):
# Process pressure sensor data
if msg.data < 10.0: # Example threshold
self.get_logger().warn(f'Low pressure detected: {msg.data}')
def main(args=None):
rclpy.init(args=args)
subscriber = SensorSubscriber()
try:
rclpy.spin(subscriber)
except KeyboardInterrupt:
subscriber.get_logger().info('Shutting down subscriber node')
finally:
subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Best Practices for Topics:
- Use
qos_profile_sensor_datafor high-frequency sensor data - Use small queue sizes for real-time control commands
- Always handle potential message processing delays
- Use appropriate message types from standard ROS message packages when possible
3. Services: Request-Response Communication (600 words)
When to Use Services (200 words)
Services implement a synchronous request-response pattern, where a client sends a request to a server and waits for a response. This pattern is ideal for operations that:
- Need guaranteed delivery and processing
- Have a clear request and response
- Are not time-critical (since it's blocking)
- Represent discrete actions rather than continuous data
Common uses for services in robotics include:
- Changing system parameters
- Triggering specific operations (calibration, homing)
- Requesting specific information (current position, system status)
- Activating/deactivating subsystems
- Saving/restoring configurations
Service Implementation (400 words)
Let's create a service for calibrating robot sensors:
# Create service definition file: srv/SensorCalibration.srv
# Request:
# string sensor_name
# bool reset_only
# ---
# Response:
# bool success
# string message
Server Implementation:
import rclpy
from rclpy.node import Node
from your_package.srv import SensorCalibration # Custom service type
class CalibrationService(Node):
def __init__(self):
super().__init__('calibration_service')
# Create service server
self.srv = self.create_service(
SensorCalibration, # Service type
'calibrate_sensor', # Service name
self.calibrate_sensor_callback # Callback function
)
self.get_logger().info('Calibration service ready')
def calibrate_sensor_callback(self, request, response):
self.get_logger().info(
f'Request to calibrate sensor: {request.sensor_name}, '
f'reset only: {request.reset_only}'
)
# Perform actual calibration
try:
if request.reset_only:
# Only reset to known state
success = self.reset_sensor_calibration(request.sensor_name)
else:
# Perform full calibration routine
success = self.perform_full_calibration(request.sensor_name)
if success:
response.success = True
response.message = f'Successfully calibrated {request.sensor_name}'
else:
response.success = False
response.message = f'Failed to calibrate {request.sensor_name}'
except Exception as e:
response.success = False
response.message = f'Error during calibration: {str(e)}'
return response
def reset_sensor_calibration(self, sensor_name):
# Implementation for reset
self.get_logger().info(f'Resetting calibration for {sensor_name}')
return True # In real implementation, this would do actual work
def perform_full_calibration(self, sensor_name):
# Implementation for full calibration
self.get_logger().info(f'Calibrating {sensor_name}')
return True # In real implementation, this would do actual work
def main(args=None):
rclpy.init(args=args)
service_node = CalibrationService()
try:
rclpy.spin(service_node)
except KeyboardInterrupt:
service_node.get_logger().info('Shutting down calibration service')
finally:
service_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Client Implementation:
import rclpy
from rclpy.node import Node
from your_package.srv import SensorCalibration
class CalibrationClient(Node):
def __init__(self):
super().__init__('calibration_client')
self.cli = self.create_client(SensorCalibration, 'calibrate_sensor')
# Wait for service to be available
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
self.req = SensorCalibration.Request()
def send_calibration_request(self, sensor_name, reset_only=False):
self.req.sensor_name = sensor_name
self.req.reset_only = reset_only
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
response = self.future.result()
return response
def main(args=None):
rclpy.init(args=args)
client = CalibrationClient()
# Send calibration request
response = client.send_calibration_request('camera_1', reset_only=False)
if response.success:
print(f'SUCCESS: {response.message}')
else:
print(f'ERROR: {response.message}')
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Services block the caller until a response is received. For time-sensitive applications, consider using actions instead.
4. Actions: Goal-Based Communication (800 words)
When to Use Actions (200 words)
Actions are designed for long-running tasks that require:
- Progress feedback during execution
- The ability to cancel ongoing operations
- Goal-oriented communication pattern
- Detailed results when completed
Unlike topics (continuous data) and services (synchronous request-response), actions support long-running operations with intermediate feedback. In robotics, common action use cases include:
- Navigation to distant locations
- Manipulation tasks with multiple stages
- Calibration routines with progress feedback
- Image processing with status updates
- Any complex task that takes more than a few seconds
Action Implementation (600 words)
Actions consist of three message types:
- Goal: Defines what the action should accomplish
- Feedback: Status updates during execution
- Result: Final outcome when the action completes
Let's create an action for humanoid navigation:
# Create action definition file: action/HumanoidNav.action
# # Define the goal
# float64[] target_position # x, y, theta
# bool avoid_obstacles
# ---
# # Define the result
# bool success
# float64[] final_position # actual final x, y, theta
# string message
# ---
# # Define the feedback
# int32 current_waypoint
# float64[] current_position
# float64 distance_remaining
# string status_message
Action Server Implementation:
import rclpy
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.node import Node
from your_package.action import HumanoidNav # Custom action type
from rclpy.executors import MultiThreadedExecutor
from threading import Thread
import time
class HumanoidNavActionServer(Node):
def __init__(self):
super().__init__('humanoid_nav_action_server')
self._action_server = ActionServer(
self,
HumanoidNav,
'humanoid_navigate',
execute_callback=self.execute_callback,
goal_callback=self.goal_callback,
cancel_callback=self.cancel_callback
)
self.get_logger().info('Navigation action server ready')
def goal_callback(self, goal_request):
"""Accept or reject a goal request."""
self.get_logger().info('Received navigation goal request')
# Check if the target is valid and we can accept it
return GoalResponse.ACCEPT
def cancel_callback(self, goal_handle):
"""Accept or reject a client request to cancel an action goal."""
self.get_logger().info('Received cancel request')
return CancelResponse.ACCEPT
async def execute_callback(self, goal_handle):
"""Execute the goal."""
self.get_logger().info('Executing navigation goal...')
# Get the target from the goal request
target_x, target_y, target_theta = goal_handle.request.target_position
avoid_obstacles = goal_handle.request.avoid_obstacles
result = HumanoidNav.Result()
feedback_msg = HumanoidNav.Feedback()
# Simulate navigation progress
current_x, current_y = 0.0, 0.0 # Starting position
distance_remaining = ((target_x - current_x)**2 + (target_y - current_y)**2)**0.5
waypoint_count = 0
while distance_remaining > 0.1 and not goal_handle.is_cancel_requested: # Navigate until close or cancelled
# Simulate movement (in a real robot, this would control actual motors)
current_x += 0.01 * (target_x - current_x) / distance_remaining
current_y += 0.01 * (target_y - current_y) / distance_remaining
waypoint_count += 1
distance_remaining = ((target_x - current_x)**2 + (target_y - current_y)**2)**0.5
if waypoint_count % 10 == 0: # Update feedback every 10 waypoints
feedback_msg.current_waypoint = waypoint_count
feedback_msg.current_position = [current_x, current_y, 0.0] # Simplified
feedback_msg.distance_remaining = distance_remaining
feedback_msg.status_message = f'Navigating... {distance_remaining:.2f}m to go'
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(f'Feedback: {feedback_msg.status_message}')
# Sleep to simulate real-time processing
time.sleep(0.1)
if goal_handle.is_cancel_requested:
goal_handle.canceled()
result.success = False
result.message = 'Goal canceled'
self.get_logger().info('Goal canceled')
else:
goal_handle.succeed()
result.success = True
result.final_position = [current_x, current_y, target_theta]
result.message = f'Navigation completed to [{current_x:.2f}, {current_y:.2f}]'
self.get_logger().info(f'Goal succeeded: {result.message}')
return result
def main(args=None):
rclpy.init(args=args)
action_server = HumanoidNavActionServer()
try:
# Use multi-threaded executor so action server doesn't block
executor = MultiThreadedExecutor()
executor.add_node(action_server)
executor.spin()
except KeyboardInterrupt:
action_server.get_logger().info('Shutting down action server')
finally:
action_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Action Client Implementation:
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from your_package.action import HumanoidNav
class HumanoidNavActionClient(Node):
def __init__(self):
super().__init__('humanoid_nav_action_client')
self._action_client = ActionClient(
self,
HumanoidNav,
'humanoid_navigate')
def send_goal(self, target_position, avoid_obstacles=True):
goal_msg = HumanoidNav.Goal()
goal_msg.target_position = target_position
goal_msg.avoid_obstacles = avoid_obstacles
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return
self.get_logger().info('Goal accepted :)')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(
f'Received feedback: {feedback.status_message}')
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result: {result.message}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = HumanoidNavActionClient()
# Wait for server before sending goal
action_client._action_client.wait_for_server()
# Send navigation goal
target = [5.0, 3.0, 0.0] # Navigate to x=5, y=3, theta=0
action_client.send_goal(target)
rclpy.spin(action_client)
if __name__ == '__main__':
main()
Actions provide the most sophisticated communication pattern in ROS 2, allowing for complex interactions with progress monitoring and cancellation capabilities, making them ideal for humanoid robotics applications where tasks may take considerable time.
5. Communication Pattern Selection Guide (500 words)
Decision Framework (300 words)
Choosing the right communication pattern is critical for robotic system design. Here's a framework to help you decide:
Use Topics When:
- Streaming data (sensors, robot state)
- Broadcasting to multiple subscribers
- Real-time control loops (e.g., velocity commands)
- Continuous monitoring or logging
- Publisher and subscriber don't need direct interaction
Use Services When:
- Need guaranteed delivery and response
- Operation is discrete (not continuous)
- Client waits for result before proceeding
- Task is relatively quick (under 1 second typically)
- Need to trigger specific behavior or get current state
Use Actions When:
- Task takes a long time to complete
- Need progress feedback
- Operation might need cancellation
- Task has intermediate results
- Complex, multi-stage operations
Practical Example: Physical AI Task Coordination (200 words)
Consider a humanoid robot performing a "fetch and carry" task:
- Topics: Used for continuous streams like camera images, joint states, IMU data
- Services: Used for specific queries like "get_robot_pose" or "calibrate_gripper"
- Actions: Used for the "navigate_and_grab" task which takes time and provides feedback on progress
This demonstrates how different patterns work together in a complex robotic system. The navigation action might periodically publish pose updates via topics, while service calls can query the current status during the action execution.
6. Hands-On Exercise (400 words)
Exercise: Multi-Node Communication System
Objective: Create a simple robot system with multiple nodes communicating using different ROS 2 patterns.
Prerequisites:
- ROS 2 Humble installed
- Basic Python knowledge
- Completed Chapter 1.5 (ROS 2 Architecture)
Steps:
Step 1: Create a New Package
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python communication_demo --dependencies rclpy std_msgs builtin_interfaces
cd communication_demo
Step 2: Create a Sensor Simulator Node (communication_demo/communication_demo/sensor_simulator.py)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32
from std_msgs.msg import String
import random
class SensorSimulator(Node):
def __init__(self):
super().__init__('sensor_simulator')
# Create publishers for different sensor data
self.temperature_pub = self.create_publisher(Float32, 'temperature', 10)
self.status_pub = self.create_publisher(String, 'robot_status', 10)
# Create timer to publish data
self.timer = self.create_timer(0.5, self.publish_sensor_data)
self.get_logger().info('Sensor simulator started')
def publish_sensor_data(self):
# Simulate temperature sensor (between 20-30°C)
temp_msg = Float32()
temp_msg.data = 20.0 + random.random() * 10.0
self.temperature_pub.publish(temp_msg)
# Simulate status message
status_msg = String()
statuses = ['idle', 'moving', 'waiting', 'processing']
status_msg.data = random.choice(statuses)
self.status_pub.publish(status_msg)
self.get_logger().info(f'Temperature: {temp_msg.data:.2f}°C, Status: {status_msg.data}')
def main(args=None):
rclpy.init(args=args)
sensor_sim = SensorSimulator()
try:
rclpy.spin(sensor_sim)
except KeyboardInterrupt:
sensor_sim.get_logger().info('Shutting down sensor simulator')
finally:
sensor_sim.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 3: Create a Processing Node (communication_demo/communication_demo/processor.py)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, String
class DataProcessor(Node):
def __init__(self):
super().__init__('data_processor')
# Subscriptions
self.temp_subscription = self.create_subscription(
Float32,
'temperature',
self.temperature_callback,
10)
self.status_subscription = self.create_subscription(
String,
'robot_status',
self.status_callback,
10)
# Publisher for processed data
self.threshold_pub = self.create_publisher(
String,
'temperature_alert',
10)
self.get_logger().info('Data processor initialized')
def temperature_callback(self, msg):
temp = msg.data
# Simple processing - check if temperature is too high
if temp > 28.0:
alert_msg = String()
alert_msg.data = f'WARNING: High temperature detected: {temp:.2f}°C'
self.threshold_pub.publish(alert_msg)
self.get_logger().warn(alert_msg.data)
else:
self.get_logger().info(f'Normal temperature: {temp:.2f}°C')
def status_callback(self, msg):
status = msg.data
self.get_logger().info(f'Robot status: {status}')
def main(args=None):
rclpy.init(args=args)
processor = DataProcessor()
try:
rclpy.spin(processor)
except KeyboardInterrupt:
processor.get_logger().info('Shutting down processor')
finally:
processor.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Step 4: Update setup.py to include executables
In your setup.py file, add console scripts:
entry_points={
'console_scripts': [
'sensor_simulator = communication_demo.sensor_simulator:main',
'processor = communication_demo.processor:main',
],
},
Step 5: Test the System
cd ~/ros2_ws
colcon build --packages-select communication_demo
source install/setup.bash
# Terminal 1: Start the sensor simulator
ros2 run communication_demo sensor_simulator
# Terminal 2: Start the processor
ros2 run communication_demo processor
Expected Result: You should see temperature values and robot status being published by the sensor simulator node, and the processor node receiving and processing them. When the temperature exceeds the threshold, an alert message should be published.
Troubleshooting:
- If nodes don't communicate, ensure ROS 2 is properly sourced in each terminal
- Check that both nodes are on the same ROS domain
- Use
ros2 topic echoto verify topics are publishing data
Extension Challenge (Optional)
Create a service node that allows querying the average temperature over the last N measurements from the data processor. This would combine subscription to sensor data with service provision.
7. Assessment Questions (10 questions)
Multiple Choice (5 questions)
Question 1: What is the main difference between topics and services in ROS 2? a) Topics are faster than services b) Topics are asynchronous one-way, services are synchronous request-response c) Topics use more memory than services d) There is no difference
Details
Click to reveal answer
Answer: b Explanation: Topics implement publish-subscribe (asynchronous, one-way), while services implement request-response (synchronous, two-way) communication patterns.Question 2: Which communication pattern is best for long-running tasks with progress feedback? a) Topics b) Services c) Actions d) Parameters
Details
Click to reveal answer
Answer: c Explanation: Actions are specifically designed for long-running tasks that provide progress feedback and can be cancelled.Question 3: What does the qos_profile_sensor_data provide? a) Guaranteed delivery of all messages b) High-frequency data with best-effort delivery c) Persistent storage of all messages d) Authentication for sensor data
Details
Click to reveal answer
Answer: b Explanation: qos_profile_sensor_data is optimized for high-frequency sensor data with best-effort delivery, acknowledging that occasional message loss is acceptable.Question 4: What are the three components of an action? a) Request, Response, Result b) Goal, Feedback, Result c) Input, Process, Output d) Publisher, Subscriber, Service
Details
Click to reveal answer
Answer: b Explanation: Actions consist of Goal (what to do), Feedback (status during execution), and Result (final outcome).Question 5: When should you use services instead of topics? a) For all communications b) When you need guaranteed delivery and response to a specific request c) For high-frequency data d) When you don't want guaranteed delivery
Details
Click to reveal answer
Answer: b Explanation: Services are appropriate when you need guaranteed delivery and a response to a specific request, unlike topics which are for continuous data streams.Short Answer (3 questions)
Question 6: Explain why you might use a combination of topics, services, and actions in a humanoid robot system, providing a specific example task.
Details
Click to reveal sample answer
A humanoid robot performing a "fetch a cup" task might use: topics for continuous data like camera images and joint states; services for discrete actions like "calibrate_gripper" or "confirm_object_detection"; and actions for the navigation task which takes time and provides feedback during execution. This combination allows for both continuous monitoring and discrete control decisions.Question 7: What are the key factors to consider when choosing between BEST_EFFORT and RELIABLE QoS policies?
Details
Click to reveal sample answer
Key factors include: the critical nature of the data (safety commands need reliable), frequency of data transmission (high-frequency sensor data might use best-effort), real-time constraints (best-effort has less overhead), and the consequences of data loss (controls need reliability, sensor streams might tolerate some loss).Question 8: Describe the advantages and disadvantages of the publish-subscribe communication pattern.
Details
Click to reveal sample answer
Advantages: decoupling between publishers and subscribers, broadcast capability (one-to-many), asynchronous processing. Disadvantages: no guarantee of delivery, no response to confirm receipt, potential message flooding in complex systems, late-joining subscribers miss historical data.Practical Exercises (2 questions)
Question 9: Design Exercise Design a ROS 2 system for a humanoid robot that needs to:
- Continuously monitor its joint angles
- Request calibration of its arms when needed
- Navigate to different locations with progress feedback
- Query its current battery status
For each requirement, specify:
- Which communication pattern you would use
- Why that pattern is most appropriate
- What message types you would use
- How the nodes would be organized
Question 10: Debugging Exercise A ROS 2 system has two nodes: a sensor node publishing data at 30Hz and a processing node expecting the data. The processing node occasionally misses messages, causing intermittent failures.
Analyze potential causes and solutions:
- What QoS settings might be causing the issue?
- How would you diagnose the problem using ROS 2 tools?
- What would happen if you changed from RELIABLE to BEST_EFFORT QoS?
- How would you modify the system to ensure more reliable communication?
8. Further Reading (5-7 resources)
-
"Programming Robots with ROS" - Morgan Quigley, Brian Gerkey, William Smart Why read: In-depth coverage of ROS communication patterns with practical examples Link: https://www.oreilly.com/library/view/programming-robots-with/9781449323899/
-
ROS 2 Design: Client Library Design Why read: Understand the design principles behind ROS 2 communication patterns Link: https://design.ros2.org/articles/client_library_interface.html
-
DDS Concepts and Examples Why read: Deep understanding of the underlying DDS middleware Link: https://www.omg.org/spec/DDS/
-
"Effective Robotics Programming with ROS" - Anis Koubaa Why read: Practical patterns for structuring robotic communication Link: https://www.packtpub.com/product/effective-robotics-programming-with-ros-third-edition/9781787281933
-
ROS 2 Actions Design Article Why read: Understanding the motivation and implementation of actions Link: https://design.ros2.org/articles/actions.html
-
Quality of Service in ROS 2 Why read: Complete reference on QoS settings and their impact Link: https://docs.ros.org/en/rolling/Concepts/About-Quality-of-Service-Settings.html
-
ROS 2 Tutorials - Writing a Simple Publisher and Subscriber Why read: Official tutorial on implementing topics Link: https://docs.ros.org/en/rolling/Tutorials/Beginner-Client-Libraries/Writing-A-Simple-Py-Publisher-And-Subscriber.html
Recommended Order:
- Start with the ROS 2 tutorials for hands-on experience
- Read the Design articles for theoretical understanding
- Consult the Programming Robots book for practical applications
- Explore DDS documentation for deeper technical understanding
9. Hardware/Software Requirements
Software Requirements:
- ROS 2 Humble Hawksbill (or latest LTS)
- Python 3.8+ or C++ compiler toolchain
- Text editor or IDE (VS Code recommended)
- Terminal emulator
Hardware Requirements:
- Computer with 4+ GB RAM
- Multi-core processor
- Network connection for package installation
- (Optional) Robot or simulator for hardware testing
10. Chapter Summary & Next Steps
Chapter Summary
In this chapter, you learned:
- How to implement ROS 2 nodes in Python and C++
- The publish-subscribe pattern for streaming data using topics
- The request-response pattern for synchronous communication using services
- The goal-feedback-result pattern for long-running tasks using actions
- When to use each communication pattern based on system requirements
- How to create QoS policies for different communication needs
- How to debug communication issues in ROS 2 systems
Next Steps
In Chapter 1.7, we'll dive into building ROS 2 packages in Python, learning how to structure code, manage dependencies, and build reusable components. This builds directly on the communication patterns you've learned here, as you'll create structured packages containing the nodes and communication interfaces you've now mastered.
Estimated Time to Complete: 2.5 hours Difficulty Level: Intermediate Prerequisites: Chapters 1.1-1.5