CREATING A PUBLISHER AND SUBSCRIBER NODES USING PYTHON
Introduction.
In ROS2(Robot Operating System 2), a node is the smallest executable unit that performs computation. It is essentially a process that communicates with other nodes to exchange data (via topics, services, or actions) and perform tasks in a robotic system. ROS 2 publishers and subscribers are the basic communication mechanism between nodes using topics. They can be created by either using python or C++ language. But in this blog we will be creating both the publisher and subscriber nodes using python.
Prerequisites
ROS installed. installing ros2
ROS2 workspace set up. creating a workspace
Package setup.https://docs.ros.org/en/jazzy/Tutorials/Beginner-Client-Libraries/Creating-Your-First-ROS2-Package.html
Basic Understanding of ROS concepts(nodes, topic, messages).https://docs.ros.org/en/jazzy/Tutorials/Beginner-CLI-Tools.html
ROS installed. installing ros2
ROS2 workspace set up. creating a workspace
Package setup.https://docs.ros.org/en/jazzy/Tutorials/Beginner-Client-Libraries/Creating-Your-First-ROS2-Package.html
Basic Understanding of ROS concepts(nodes, topic, messages).https://docs.ros.org/en/jazzy/Tutorials/Beginner-CLI-Tools.html
Publisher-Subscriber Node in python
In this case you will notice some small changes due to the language changes.
1. Create two ROS2
Run the below commands in terminal:
cd ~/ros2_ws/src
# Create publisher package
ros2 pkg create --build-type ament_python talker --dependencies rclpy std_msgs
# Create subscriber package
ros2 pkg create --build-type ament_python listener --dependencies rclpy std_msgs
2. Implement the Talker Node (Publisher)
To access the talker node create a file under ros2_ws/src/talker inside the talker package just created above and name it accordingly, e.g, main.py, talker_node.py. Then enter the code as follows:
# Import the ROS 2 Python client library
import rclpy
from rclpy.node import Node
# Import the standard message type: String
from std_msgs.msg import String
# Define a custom node class that inherits from Node
class TalkerNode(Node):
def __init__(self):
super().__init__('talker_node') # Initialize the node with the name 'talker'
# Create a publisher on the topic 'chatter' with queue size 10
self.publisher = self.create_publisher(
String,
'talker',
10
)
timer_period = 0.5 # Create a timer that calls a function every 0.5 second
self.timer = self.create_timer(
timer_period, self.timer_callback
)
# The function that gets called every 0.5 second
def timer_callback(self):
msg = String() # Create a message of type String
msg.data = 'Hello!' # Set its data
self.publisher.publish(msg)# Publish the message
self.get_logger().info(f'published:{msg.data}') # Log the event
# The main entry point of the node
def main(args=None):
rclpy.init(args=args) # Initialize ROS 2
talker_node = TalkerNode() # Create an instance of the node
rclpy.spin(talker_node) # Keep the node running
talker_node.destroy_node() # Clean up
rclpy.shutdown() # Shutdown ROS 2
3. Implement the Listener Node (Subscriber)
To access the listener node create a file under ros2_ws/src/listener inside the listener package just created above and name it accordingly, e.g, main.py, listener_node.py. Then input the code as follows:
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class ListenerNode(Node):
def __init__(self):
super().__init__('listener_node')
self.subscriber = self.create_subscription(
String,
'talker',
self.listener_callback,
10
)
def listener_callback(self, msg):
self.get_logger().info('msg: %s' %msg.data)
def main(args=None):
rclpy.init(args=args)
listener_node = ListenerNode()
rclpy.spin(listener_node)
listener_node.destroy_node()
rclpy.shutdown()
4. Update setup.py for each package
talker/setup.py
from setuptools import find_packages, setup
package_name = 'talker'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='caleb253',
maintainer_email='wambuacaleb253@gmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'talker = talker.main:main'
],
},
)
listener/setup.py
from setuptools import find_packages, setup
package_name = 'listener'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='caleb253',
maintainer_email='wambuacaleb253@gmail.com',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'listener = listener.main:main'
],
},
)
5. Update package.xml Dependencies
In both talker/package.xml and listener/package.xml, make sure the dependencies are present to make the code as follows:
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>talker</name>
<version>0.1.0</version>
<description>TODO: Package description</description>
<maintainer email="wambuacaleb253@gmail.com">caleb253</maintainer>
<license>MIT</license>
# they declare runtime dependencies
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
Ensure that the
6. Build the Packages
From the root workspace run the following commands in terminal:
cd ~/ros2_ws
colcon build --packages-select talker listener
source install/setup.bash
7. Run the Nodes
Open two terminals
Terminal 1: Run the Talker(Publisher)
source ~/ros2_ws/install/setup.bash
ros2 run <talker_pkg> talker
Terminal 2: Run the Listener(Subscriber)
source ~/ros2_ws/install/setup.bash
ros2 run <listener_pkg> listener
When running the system the following should be the results:
To stop the run press Ctrl+c
Common Errors and Fixes in ROS 2 Python Publisher/Subscriber Nodes
1. Node Does Not Run / "Entry Point Not Found"
Error
ros2 run talker talker
Package 'talker' has no executable named 'talker'
Cause
setup.py
is missing the correctentry_points
configuration.File/module name doesn't match.
You forgot to build the package.
Fix
Check setup.py
:
entry_points={
'console_scripts': [
'talker = talker.talker_node:main',
],
}
Then rebuild:
colcon build --packages-select talker_pkg
source install/setup.bash
2. Import Error / Module Not Found
Error
ModuleNotFoundError: No module named 'talker'
Cause
You didn’t create an
__init__.py
file in your package folder.Your
setup.py
isn't configured to recognize the package.
Fix
Create a blank file:
touch talker_pkg/__init__.py
Make sure
packages=[package_name]
is correct insetup.py
.
3. Topic Mismatch — No Data Received
Symptom
The talker is publishing.
The listener runs, but receives nothing.
Causes
Topic names are different (e.g.,
'chatter'
vs'topic'
).Message types don’t match.
QoS settings incompatible (rare but possible).
Fix
Check that both nodes use exactly the same topic name.
Make sure both use
std_msgs.msg.String
# Example:
self.publisher_ = self.create_publisher(String, 'chatter', 10)
self.subscription = self.create_subscription(String, 'chatter', ...)
4. Messages Published But Not Seen in ros2 topic echo
Cause
Wrong topic name.
Publisher not running or node not sourced.
Fix
Check topics:
ros2 topic list
Echo the correct one:
ros2 topic echo /chatter
5. Package Builds, but ros2 run
Fails
Error
Failed to load entry point 'talker = ...'
Cause
Wrong Python import path in
setup.py
.File not executable.
Fix
Make sure the file path matches
talker_pkg.talker_node:main
.Ensure your script has a proper
main()
function.Ensure your Python script is in the correct subfolder (
talker_pkg/
).
6. Colcon Build Errors
Error
error: package not found: talker_pkg
Cause
You’re in the wrong workspace.
You didn’t source the setup script.
Fix
source install/setup.bash
colcon build --packages-select talker_pkg
7. Message Type Errors
Error
TypeError: Expected message of type 'String'
CopyEdit
TypeError: Expected message of type 'String'
Cause
You accidentally published a plain string, not a
String()
message.
Fix
Correct:
msg = String()
msg.data = "Hello"
self.publisher_.publish(msg)
Incorrect:
self.publisher_.publish("Hello") #
8. rclpy not installed
Error
ModuleNotFoundError: No module named 'rclpy'
Fix
Make sure you’ve sourced your ROS 2 environment:
source /opt/ros/humble/setup.bash
Also source your workspace:
source ~/ros2_ws/install/setup.bash
9. Dependencies not available
Error
ModuleNotFoundError: No module named 'std_msgs.msg'
Fix
Make sure that the dependencies in package.xml is updated by adding:
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
References:
ROS2 Basics #11 - Writing a Simple Publisher and Subscriber (Python)
Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Jazzy documentation
Comments
Post a Comment