CREATING A PUBLISHER AND SUBSCRIBER NODES USING PYTHON

 


Introduction.

In ROS2(Robot Operating System 2), a node is the smallest executable unit that performs computation. It is essentially a process that communicates with other nodes to exchange data (via topics, services, or actions) and perform tasks in a robotic system. ROS 2 publishers and subscribers are the basic communication mechanism between nodes using topics. They can be created by either using python or C++ language. But in this blog we will be creating both the publisher and subscriber nodes using python.

Prerequisites

Publisher-Subscriber Node in python

In this case you will notice some small changes due to the language changes.

1. Create two ROS2

Run the below commands in terminal:

cd ~/ros2_ws/src # Create publisher package ros2 pkg create --build-type ament_python talker --dependencies rclpy std_msgs # Create subscriber package ros2 pkg create --build-type ament_python listener --dependencies rclpy std_msgs

2. Implement the Talker Node (Publisher)

To access the talker node create a file under ros2_ws/src/talker inside the talker package just created above and name it accordingly, e.g, main.py, talker_node.py. Then enter the code as follows:

# Import the ROS 2 Python client library import rclpy from rclpy.node import Node # Import the standard message type: String from std_msgs.msg import String # Define a custom node class that inherits from Node class TalkerNode(Node): def __init__(self): super().__init__('talker_node') # Initialize the node with the name 'talker' # Create a publisher on the topic 'chatter' with queue size 10 self.publisher = self.create_publisher( String, 'talker', 10 ) timer_period = 0.5 # Create a timer that calls a function every 0.5 second self.timer = self.create_timer( timer_period, self.timer_callback ) # The function that gets called every 0.5 second def timer_callback(self): msg = String() # Create a message of type String msg.data = 'Hello!' # Set its data self.publisher.publish(msg)# Publish the message self.get_logger().info(f'published:{msg.data}') # Log the event # The main entry point of the node def main(args=None): rclpy.init(args=args) # Initialize ROS 2 talker_node = TalkerNode() # Create an instance of the node rclpy.spin(talker_node) # Keep the node running talker_node.destroy_node() # Clean up rclpy.shutdown() # Shutdown ROS 2

3. Implement the Listener Node (Subscriber)

To access the listener node create a file under ros2_ws/src/listener inside the listener package just created above and name it accordingly, e.g, main.py, listener_node.py. Then input the code as follows:

import rclpy from rclpy.node import Node from std_msgs.msg import String class ListenerNode(Node): def __init__(self): super().__init__('listener_node') self.subscriber = self.create_subscription( String, 'talker', self.listener_callback, 10 ) def listener_callback(self, msg): self.get_logger().info('msg: %s' %msg.data) def main(args=None): rclpy.init(args=args) listener_node = ListenerNode() rclpy.spin(listener_node) listener_node.destroy_node() rclpy.shutdown()

4. Update setup.py for each package

talker/setup.py

from setuptools import find_packages, setup package_name = 'talker' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='caleb253', maintainer_email='wambuacaleb253@gmail.com', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'talker = talker.main:main' ], }, )

listener/setup.py

from setuptools import find_packages, setup package_name = 'listener' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='caleb253', maintainer_email='wambuacaleb253@gmail.com', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'listener = listener.main:main' ], }, )

5. Update package.xml Dependencies

In both talker/package.xml and listener/package.xml, make sure the dependencies are present to make the code as follows:

<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>talker</name> <version>0.1.0</version> <description>TODO: Package description</description> <maintainer email="wambuacaleb253@gmail.com">caleb253</maintainer> <license>MIT</license> # they declare runtime dependencies <exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package>

Ensure that the

6. Build the Packages

From the root workspace run the following commands in terminal:

cd ~/ros2_ws colcon build --packages-select talker listener source install/setup.bash

7. Run the Nodes

Open two terminals

Terminal 1: Run the Talker(Publisher)
source ~/ros2_ws/install/setup.bash ros2 run <talker_pkg> talker
Terminal 2: Run the Listener(Subscriber)
source ~/ros2_ws/install/setup.bash ros2 run <listener_pkg> listener

When running the system the following should be the results:

image-20250611-051421.png

To stop the run press Ctrl+c

Common Errors and Fixes in ROS 2 Python Publisher/Subscriber Nodes

1. Node Does Not Run / "Entry Point Not Found"

Error

ros2 run talker talker Package 'talker' has no executable named 'talker'

Cause

  • setup.py is missing the correct entry_points configuration.

  • File/module name doesn't match.

  • You forgot to build the package.

Fix

Check setup.py:

entry_points={ 'console_scripts': [ 'talker = talker.talker_node:main', ], }

Then rebuild:

colcon build --packages-select talker_pkg source install/setup.bash

2. Import Error / Module Not Found

Error

ModuleNotFoundError: No module named 'talker'

Cause

  • You didn’t create an __init__.py file in your package folder.

  • Your setup.py isn't configured to recognize the package.

Fix

  • Create a blank file: touch talker_pkg/__init__.py

  • Make sure packages=[package_name] is correct in setup.py.


3. Topic Mismatch — No Data Received

Symptom

  • The talker is publishing.

  • The listener runs, but receives nothing.

Causes

  • Topic names are different (e.g., 'chatter' vs 'topic').

  • Message types don’t match.

  • QoS settings incompatible (rare but possible).

Fix

  • Check that both nodes use exactly the same topic name.

  • Make sure both use std_msgs.msg.String

# Example: self.publisher_ = self.create_publisher(String, 'chatter', 10) self.subscription = self.create_subscription(String, 'chatter', ...)

4. Messages Published But Not Seen in ros2 topic echo

Cause

  • Wrong topic name.

  • Publisher not running or node not sourced.

Fix

  1. Check topics:

ros2 topic list
  1. Echo the correct one:

ros2 topic echo /chatter

5. Package Builds, but ros2 run Fails

Error

Failed to load entry point 'talker = ...'

Cause

  • Wrong Python import path in setup.py.

  • File not executable.

Fix

  • Make sure the file path matches talker_pkg.talker_node:main.

  • Ensure your script has a proper main() function.

  • Ensure your Python script is in the correct subfolder (talker_pkg/).


6. Colcon Build Errors

Error

error: package not found: talker_pkg

Cause

  • You’re in the wrong workspace.

  • You didn’t source the setup script.

Fix

source install/setup.bash colcon build --packages-select talker_pkg

7. Message Type Errors

Error

TypeError: Expected message of type 'String'

CopyEdit

TypeError: Expected message of type 'String'

Cause

  • You accidentally published a plain string, not a String() message.

Fix

Correct:

msg = String() msg.data = "Hello" self.publisher_.publish(msg)

Incorrect:

self.publisher_.publish("Hello") #

8. rclpy not installed

Error

ModuleNotFoundError: No module named 'rclpy'

 

Fix

Make sure you’ve sourced your ROS 2 environment:

source /opt/ros/humble/setup.bash

Also source your workspace:

source ~/ros2_ws/install/setup.bash

9. Dependencies not available

Error

ModuleNotFoundError: No module named 'std_msgs.msg'

Fix

Make sure that the dependencies in package.xml is updated by adding:

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend>


References:

ROS2 Basics #11 - Writing a Simple Publisher and Subscriber (Python)

Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Jazzy documentation

Comments

  1. Great article! Respiratory health is something many people overlook until problems start. I recently came across a helpful resource about natural breathing support products at Respiclear. It provides useful information and solutions for people looking to improve their lung health. Thanks for sharing such valuable insights!

    ReplyDelete
  2. Very informative post. For those interested in fat-burning support,
    CitrusBurn is worth checking out.

    ReplyDelete
  3. Great read! I found
    Metabo Drops to be a useful resource as well.

    ReplyDelete
  4. Very informative post. For those dealing with nerve discomfort,
    SciatiEase is worth checking out.

    ReplyDelete

Post a Comment

Popular posts from this blog

Getting Started with ROS2 Humble: Workspace, Packages, and Nodes

Introduction to ROS 2: The Future of Robotics Software

RQT Setup in ROS2 Humble