CREATING A PUBLISHER AND SUBSCRIBER NODES USING PYTHON

 

Introduction.

In ROS2(Robot Operating System 2), a node is the smallest executable unit that performs computation. It is essentially a process that communicates with other nodes to exchange data (via topics, services, or actions) and perform tasks in a robotic system. ROS 2 publishers and subscribers are the basic communication mechanism between nodes using topics. They can be created by either using python or C++ language. But in this blog we will be creating both the publisher and subscriber nodes using python.

Prerequisites

Publisher-Subscriber Node in python

In this case you will notice some small changes due to the language changes.

1. Create two ROS2

Run the below commands in terminal:

cd ~/ros2_ws/src # Create publisher package ros2 pkg create --build-type ament_python talker --dependencies rclpy std_msgs # Create subscriber package ros2 pkg create --build-type ament_python listener --dependencies rclpy std_msgs

2. Implement the Talker Node (Publisher)

To access the talker node create a file under ros2_ws/src/talker inside the talker package just created above and name it accordingly, e.g, main.py, talker_node.py. Then enter the code as follows:

# Import the ROS 2 Python client library import rclpy from rclpy.node import Node # Import the standard message type: String from std_msgs.msg import String # Define a custom node class that inherits from Node class TalkerNode(Node): def __init__(self): super().__init__('talker_node') # Initialize the node with the name 'talker' # Create a publisher on the topic 'chatter' with queue size 10 self.publisher = self.create_publisher( String, 'talker', 10 ) timer_period = 0.5 # Create a timer that calls a function every 0.5 second self.timer = self.create_timer( timer_period, self.timer_callback ) # The function that gets called every 0.5 second def timer_callback(self): msg = String() # Create a message of type String msg.data = 'Hello!' # Set its data self.publisher.publish(msg)# Publish the message self.get_logger().info(f'published:{msg.data}') # Log the event # The main entry point of the node def main(args=None): rclpy.init(args=args) # Initialize ROS 2 talker_node = TalkerNode() # Create an instance of the node rclpy.spin(talker_node) # Keep the node running talker_node.destroy_node() # Clean up rclpy.shutdown() # Shutdown ROS 2

3. Implement the Listener Node (Subscriber)

To access the listener node create a file under ros2_ws/src/listener inside the listener package just created above and name it accordingly, e.g, main.py, listener_node.py. Then input the code as follows:

import rclpy from rclpy.node import Node from std_msgs.msg import String class ListenerNode(Node): def __init__(self): super().__init__('listener_node') self.subscriber = self.create_subscription( String, 'talker', self.listener_callback, 10 ) def listener_callback(self, msg): self.get_logger().info('msg: %s' %msg.data) def main(args=None): rclpy.init(args=args) listener_node = ListenerNode() rclpy.spin(listener_node) listener_node.destroy_node() rclpy.shutdown()

4. Update setup.py for each package

talker/setup.py

from setuptools import find_packages, setup package_name = 'talker' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='caleb253', maintainer_email='wambuacaleb253@gmail.com', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'talker = talker.main:main' ], }, )

listener/setup.py

from setuptools import find_packages, setup package_name = 'listener' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), ], install_requires=['setuptools'], zip_safe=True, maintainer='caleb253', maintainer_email='wambuacaleb253@gmail.com', description='TODO: Package description', license='TODO: License declaration', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'listener = listener.main:main' ], }, )

5. Update package.xml Dependencies

In both talker/package.xml and listener/package.xml, make sure the dependencies are present to make the code as follows:

<?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>talker</name> <version>0.1.0</version> <description>TODO: Package description</description> <maintainer email="wambuacaleb253@gmail.com">caleb253</maintainer> <license>MIT</license> # they declare runtime dependencies <exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package>

Ensure that the

6. Build the Packages

From the root workspace run the following commands in terminal:

cd ~/ros2_ws colcon build --packages-select talker listener source install/setup.bash

7. Run the Nodes

Open two terminals

Terminal 1: Run the Talker(Publisher)
source ~/ros2_ws/install/setup.bash ros2 run <talker_pkg> talker
Terminal 2: Run the Listener(Subscriber)
source ~/ros2_ws/install/setup.bash ros2 run <listener_pkg> listener

When running the system the following should be the results:

image-20250611-051421.png

To stop the run press Ctrl+c

Common Errors and Fixes in ROS 2 Python Publisher/Subscriber Nodes

1. Node Does Not Run / "Entry Point Not Found"

Error

ros2 run talker talker Package 'talker' has no executable named 'talker'

Cause

  • setup.py is missing the correct entry_points configuration.

  • File/module name doesn't match.

  • You forgot to build the package.

Fix

Check setup.py:

entry_points={ 'console_scripts': [ 'talker = talker.talker_node:main', ], }

Then rebuild:

colcon build --packages-select talker_pkg source install/setup.bash

2. Import Error / Module Not Found

Error

ModuleNotFoundError: No module named 'talker'

Cause

  • You didn’t create an __init__.py file in your package folder.

  • Your setup.py isn't configured to recognize the package.

Fix

  • Create a blank file: touch talker_pkg/__init__.py

  • Make sure packages=[package_name] is correct in setup.py.


3. Topic Mismatch — No Data Received

Symptom

  • The talker is publishing.

  • The listener runs, but receives nothing.

Causes

  • Topic names are different (e.g., 'chatter' vs 'topic').

  • Message types don’t match.

  • QoS settings incompatible (rare but possible).

Fix

  • Check that both nodes use exactly the same topic name.

  • Make sure both use std_msgs.msg.String

# Example: self.publisher_ = self.create_publisher(String, 'chatter', 10) self.subscription = self.create_subscription(String, 'chatter', ...)

4. Messages Published But Not Seen in ros2 topic echo

Cause

  • Wrong topic name.

  • Publisher not running or node not sourced.

Fix

  1. Check topics:

ros2 topic list
  1. Echo the correct one:

ros2 topic echo /chatter

5. Package Builds, but ros2 run Fails

Error

Failed to load entry point 'talker = ...'

Cause

  • Wrong Python import path in setup.py.

  • File not executable.

Fix

  • Make sure the file path matches talker_pkg.talker_node:main.

  • Ensure your script has a proper main() function.

  • Ensure your Python script is in the correct subfolder (talker_pkg/).


6. Colcon Build Errors

Error

error: package not found: talker_pkg

Cause

  • You’re in the wrong workspace.

  • You didn’t source the setup script.

Fix

source install/setup.bash colcon build --packages-select talker_pkg

7. Message Type Errors

Error

TypeError: Expected message of type 'String'

CopyEdit

TypeError: Expected message of type 'String'

Cause

  • You accidentally published a plain string, not a String() message.

Fix

Correct:

msg = String() msg.data = "Hello" self.publisher_.publish(msg)

Incorrect:

self.publisher_.publish("Hello") #

8. rclpy not installed

Error

ModuleNotFoundError: No module named 'rclpy'

 

Fix

Make sure you’ve sourced your ROS 2 environment:

source /opt/ros/humble/setup.bash

Also source your workspace:

source ~/ros2_ws/install/setup.bash

9. Dependencies not available

Error

ModuleNotFoundError: No module named 'std_msgs.msg'

Fix

Make sure that the dependencies in package.xml is updated by adding:

<exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend>


References:

ROS2 Basics #11 - Writing a Simple Publisher and Subscriber (Python)

Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Jazzy documentation

Comments

Popular posts from this blog

ROBOT MOBILE PLATFORM

Getting Started with ROS2 Humble: Workspace, Packages, and Nodes