diff --git a/controller_interface/src/controller_interface_base.cpp b/controller_interface/src/controller_interface_base.cpp
index 50bef315f1..94779f2c0b 100644
--- a/controller_interface/src/controller_interface_base.cpp
+++ b/controller_interface/src/controller_interface_base.cpp
@@ -165,10 +165,13 @@ const rclcpp_lifecycle::State & ControllerInterfaceBase::configure()
if (ctrl_itf_params_.update_rate != 0u && update_rate > ctrl_itf_params_.update_rate)
{
RCLCPP_WARN(
- get_node()->get_logger(),
- "The update rate of the controller : '%ld Hz' cannot be higher than the update rate of the "
- "controller manager : '%d Hz'. Setting it to the update rate of the controller manager.",
- update_rate, ctrl_itf_params_.update_rate);
+ get_node()->get_logger(), "%s",
+ fmt::format(
+ "The update rate of the controller : '{} Hz' cannot be higher than the update rate of "
+ "the "
+ "controller manager : '{} Hz'. Setting it to the update rate of the controller manager.",
+ update_rate, ctrl_itf_params_.update_rate)
+ .c_str());
}
else
{
diff --git a/controller_manager/CMakeLists.txt b/controller_manager/CMakeLists.txt
index d3b4c336ff..1b935c20f3 100644
--- a/controller_manager/CMakeLists.txt
+++ b/controller_manager/CMakeLists.txt
@@ -9,14 +9,15 @@ set(THIS_PACKAGE_INCLUDE_DEPENDS
controller_interface
controller_manager_msgs
diagnostic_updater
+ fmt
+ generate_parameter_library
hardware_interface
+ libstatistics_collector
+ lifecycle_msgs
pluginlib
rclcpp
realtime_tools
std_msgs
- libstatistics_collector
- generate_parameter_library
- fmt
)
find_package(ament_cmake REQUIRED)
@@ -249,6 +250,10 @@ if(BUILD_TESTING)
DESTINATION test)
ament_add_pytest_test(test_ros2_control_node test/test_ros2_control_node_launch.py)
ament_add_pytest_test(test_test_utils test/test_test_utils.py)
+
+ # Now include the launch_utils subfolder
+ add_subdirectory(test/test_launch_utils)
+
endif()
install(
diff --git a/controller_manager/package.xml b/controller_manager/package.xml
index d514b8cce2..8945530afa 100644
--- a/controller_manager/package.xml
+++ b/controller_manager/package.xml
@@ -19,34 +19,35 @@
controller_interface
controller_manager_msgs
diagnostic_updater
+ fmt
+ generate_parameter_library
hardware_interface
- launch
- launch_ros
+ libstatistics_collector
+ lifecycle_msgs
pluginlib
rclcpp
- rcpputils
realtime_tools
- ros2_control_test_assets
- ros2param
- ros2run
std_msgs
- libstatistics_collector
- generate_parameter_library
- fmt
+
+ launch_ros
+ launch_testing_ros
+ launch
python3-filelock
+ python3-yaml
+ rcl_interfaces
+ rclpy
+ ros2param
+ sensor_msgs
ament_cmake_gmock
ament_cmake_pytest
+ example_interfaces
hardware_interface_testing
- launch_testing_ros
launch_testing
- launch
python3-coverage
- rclpy
robot_state_publisher
ros2_control_test_assets
- sensor_msgs
- example_interfaces
+ ros2pkg
ament_cmake
diff --git a/controller_manager/test/test_launch_utils/CMakeLists.txt b/controller_manager/test/test_launch_utils/CMakeLists.txt
new file mode 100644
index 0000000000..3c2aa465b5
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/CMakeLists.txt
@@ -0,0 +1,47 @@
+# This subdirectory handles pytest-based launch tests for controller_manager
+
+find_package(ament_cmake_pytest REQUIRED)
+find_package(launch_testing_ament_cmake REQUIRED)
+find_package(rclpy REQUIRED)
+
+# Install the URDF files into the 'share' directory
+#install(
+# DIRECTORY urdf
+# DESTINATION share/${PROJECT_NAME}
+# FILES_MATCHING
+# PATTERN "*.urdf"
+#)
+
+# Install YAML test files
+#
+install(
+ FILES
+ test_controller_load.yaml
+ test_ros2_control_node_combined.yaml
+ DESTINATION share/${PROJECT_NAME}/test
+)
+
+# Make sure test files get installed into the test tree
+install(
+ FILES
+ test_launch_utils_unit.py
+ test_launch_utils_integration_list.py
+ test_launch_utils_integration_dict.py
+ test_launch_utils_integration_load.py
+ DESTINATION share/${PROJECT_NAME}/test
+)
+
+# Register each test with ament
+ament_add_pytest_test(test_launch_utils_unit test_launch_utils_unit.py
+ APPEND_ENV AMENT_PREFIX_PATH=${ament_index_build_path}
+ PYTHONPATH=${CMAKE_CURRENT_BINARY_DIR}:${CMAKE_CURRENT_SOURCE_DIR}:${PYTHONPATH}
+)
+ament_add_pytest_test(test_launch_utils_integration_list
+ test_launch_utils_integration_list.py
+)
+ament_add_pytest_test(test_launch_utils_integration_dict
+ test_launch_utils_integration_dict.py
+)
+ament_add_pytest_test(test_launch_utils_integration_load
+ test_launch_utils_integration_load.py
+)
diff --git a/controller_manager/test/test_launch_utils/test_controller_load.yaml b/controller_manager/test/test_launch_utils/test_controller_load.yaml
new file mode 100644
index 0000000000..ae5745e888
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_controller_load.yaml
@@ -0,0 +1,8 @@
+controller_manager:
+ ros__parameters:
+ update_rate: 100 # Hz
+
+test_controller_load:
+ ros__parameters:
+ type: "controller_manager/test_controller"
+ joint_names: ["joint1"]
diff --git a/controller_manager/test/test_launch_utils/test_launch_utils_integration_dict.py b/controller_manager/test/test_launch_utils/test_launch_utils_integration_dict.py
new file mode 100644
index 0000000000..75dfe1f95a
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_launch_utils_integration_dict.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+# Copyright 2025 Robert Kwan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+import unittest
+from launch import LaunchDescription
+import launch_testing
+from launch_testing.actions import ReadyToTest
+import launch_ros.actions
+from launch.substitutions import PathSubstitution
+from launch_ros.substitutions import FindPackageShare
+from launch.launch_context import LaunchContext
+
+import rclpy
+
+from controller_manager.test_utils import check_controllers_running
+
+from controller_manager.launch_utils import (
+ generate_controllers_spawner_launch_description_from_dict,
+)
+
+
+@pytest.mark.launch_test
+def generate_test_description():
+ """
+ Generate launch description for testing.
+
+ THIS VERSION CREATES ALL NEEDED FILES DYNAMICALLY AND USES THE COMBINED CONFIG.
+ """
+
+ # URDF path (pathlib version, no xacro)
+ urdf_subst = (
+ PathSubstitution(FindPackageShare("ros2_control_test_assets"))
+ / "urdf"
+ / "test_hardware_components.urdf"
+ )
+
+ context = LaunchContext()
+
+ urdf_path_str = urdf_subst.perform(context)
+
+ # DEBUG: You can print the resolved path here to verify:
+ print(f"Resolved URDF Path: {urdf_path_str}")
+
+ with open(urdf_path_str) as infp:
+ robot_description_content = infp.read()
+ robot_description = {"robot_description": robot_description_content}
+
+ # Path to combined YAML
+ robot_controllers_subst = (
+ PathSubstitution(FindPackageShare("controller_manager"))
+ / "test"
+ / "test_ros2_control_node_combined.yaml"
+ )
+
+ robot_controllers_path = robot_controllers_subst.perform(context)
+ print("Resolved controller YAML:", robot_controllers_path)
+
+ # The dictionary keys are the controller names to be spawned/started.
+ # Values can be empty lists since config is provided via the main YAML.
+ ctrl_dict = {
+ "joint_state_broadcaster": [],
+ "controller1": [],
+ "controller2": [],
+ }
+ controller_list = list(ctrl_dict.keys())
+
+ # ===== GENERATE SPAWNER LAUNCH DESCRIPTION =====
+ print(f"Spawning controllers: {controller_list}")
+
+ # ===== CREATE LAUNCH DESCRIPTION =====
+ return LaunchDescription(
+ [
+ launch_ros.actions.Node(
+ package="robot_state_publisher",
+ executable="robot_state_publisher",
+ namespace="",
+ output="both",
+ parameters=[robot_description],
+ ),
+ launch_ros.actions.Node(
+ package="controller_manager",
+ executable="ros2_control_node",
+ namespace="",
+ parameters=[robot_description, robot_controllers_path],
+ output="both",
+ ),
+ generate_controllers_spawner_launch_description_from_dict(
+ controller_info_dict=ctrl_dict, extra_spawner_args=["--activate"]
+ ),
+ ReadyToTest(),
+ ]
+ ), {"controller_list": controller_list}
+
+
+# Active tests
+class TestControllerSpawnerList(unittest.TestCase):
+ """Active tests that run while the launch is active."""
+
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+
+ @classmethod
+ def tearDownClass(cls):
+ rclpy.shutdown()
+
+ def setUp(self):
+ self.node = rclpy.create_node("test_controller_spawner")
+
+ def test_spawner_nodes_launched(self, proc_info):
+ """Ensure processes are running."""
+ process_names = proc_info.process_names()
+ self.assertGreater(len(process_names), 0)
+ print(f"\n[TEST] Active processes: {process_names}")
+
+ def test_controllers_start(self, controller_list):
+ cnames = controller_list.copy()
+ check_controllers_running(self.node, cnames, state="active")
+
+ def test_spawner_exit_code(self, proc_info):
+ """Test that spawner process ran (may have completed already)."""
+ process_names = proc_info.process_names()
+ print(f"\n[TEST] Checking for spawner in: {process_names}")
+
+ # The spawner may have already completed successfully and exited
+ # So we just verify that we have processes running
+ self.assertGreater(len(process_names), 0)
+ print(f"[TEST] Launch has {len(process_names)} active processes")
+
+
+@launch_testing.post_shutdown_test()
+class TestProcessOutput(unittest.TestCase):
+ """Post-shutdown tests."""
+
+ def test_exit_codes(self, proc_info):
+ """Verify all processes exited successfully."""
+ launch_testing.asserts.assertExitCodes(
+ proc_info,
+ # All other processes (ros2_control_node, etc.) must exit cleanly (0)
+ allowable_exit_codes=[0, 1, -2, -15],
+ )
diff --git a/controller_manager/test/test_launch_utils/test_launch_utils_integration_list.py b/controller_manager/test/test_launch_utils/test_launch_utils_integration_list.py
new file mode 100644
index 0000000000..03fce12480
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_launch_utils_integration_list.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# Copyright 2025 Robert Kwan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+import unittest
+from launch import LaunchDescription
+import launch_testing
+from launch_testing.actions import ReadyToTest
+import launch_ros.actions
+from launch.substitutions import PathSubstitution, PathJoinSubstitution
+from launch_ros.substitutions import FindPackageShare
+from launch.launch_context import LaunchContext
+
+import rclpy
+
+from controller_manager.test_utils import check_controllers_running
+
+from controller_manager.launch_utils import (
+ generate_controllers_spawner_launch_description,
+)
+
+
+@pytest.mark.launch_test
+def generate_test_description():
+ """
+ Generate launch description for testing.
+ """
+
+ # URDF path (pathlib version, no xacro)
+ urdf_subst = (
+ PathSubstitution(FindPackageShare("ros2_control_test_assets"))
+ / "urdf"
+ / "test_hardware_components.urdf"
+ )
+
+ context = LaunchContext()
+
+ urdf_path_str = urdf_subst.perform(context)
+
+ # DEBUG: You can print the resolved path here to verify:
+ print(f"Resolved URDF Path: {urdf_path_str}")
+
+ with open(urdf_path_str) as infp:
+ robot_description_content = infp.read()
+ robot_description = {"robot_description": robot_description_content}
+
+ # Path to combined YAML
+ # robot_controllers = (
+ # PathSubstitution(FindPackageShare("controller_manager"))
+ # / "test"
+ # / "test_ros2_control_node_combined.yaml"
+ # )
+
+ robot_controllers_subst = PathJoinSubstitution(
+ [
+ FindPackageShare("controller_manager"),
+ "test",
+ "test_ros2_control_node_combined.yaml",
+ ]
+ )
+
+ robot_controllers_path = robot_controllers_subst.perform(context)
+ print("Resolved controller YAML:", robot_controllers_path)
+
+ # ===== DEFINE CONTROLLERS TO SPAWN =====
+ controller_list = ["joint_state_broadcaster", "controller1", "controller2"]
+
+ # ===== GENERATE SPAWNER =====
+ print(f"Spawning controllers: {controller_list}")
+
+ # ===== CREATE LAUNCH DESCRIPTION =====
+ return LaunchDescription(
+ [
+ launch_ros.actions.Node(
+ package="robot_state_publisher",
+ executable="robot_state_publisher",
+ namespace="",
+ output="both",
+ parameters=[robot_description],
+ ),
+ launch_ros.actions.Node(
+ package="controller_manager",
+ executable="ros2_control_node",
+ namespace="",
+ parameters=[robot_description, robot_controllers_path],
+ output="both",
+ ),
+ generate_controllers_spawner_launch_description(
+ controller_names=controller_list.copy(),
+ controller_params_files=[robot_controllers_path],
+ ),
+ ReadyToTest(),
+ ]
+ ), {"controller_list": controller_list}
+
+
+# Active tests
+class TestControllerSpawnerList(unittest.TestCase):
+ """Active tests that run while the launch is active."""
+
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+
+ @classmethod
+ def tearDownClass(cls):
+ rclpy.shutdown()
+
+ def setUp(self):
+ self.node = rclpy.create_node("test_controller_spawner")
+
+ def test_spawner_nodes_launched(self, proc_info):
+ """Ensure processes are running."""
+ process_names = proc_info.process_names()
+ self.assertGreater(len(process_names), 0)
+ print(f"\n[TEST] Active processes: {process_names}")
+
+ def test_controllers_start(self, controller_list):
+ cnames = controller_list.copy()
+ check_controllers_running(self.node, cnames, state="active")
+
+ def test_spawner_exit_code(self, proc_info):
+ """Test that spawner process ran (may have completed already)."""
+ process_names = proc_info.process_names()
+ print(f"\n[TEST] Checking for spawner in: {process_names}")
+
+ # The spawner may have already completed successfully and exited
+ # So we just verify that we have processes running
+ self.assertGreater(len(process_names), 0)
+ print(f"[TEST] ? Launch has {len(process_names)} active processes")
+
+
+@launch_testing.post_shutdown_test()
+class TestProcessOutput(unittest.TestCase):
+ """Post-shutdown tests."""
+
+ def test_exit_codes(self, proc_info):
+ """Verify all processes exited successfully."""
+ launch_testing.asserts.assertExitCodes(
+ proc_info,
+ # All other processes (ros2_control_node, etc.) must exit cleanly (0)
+ allowable_exit_codes=[0, 1, -15],
+ )
diff --git a/controller_manager/test/test_launch_utils/test_launch_utils_integration_load.py b/controller_manager/test/test_launch_utils/test_launch_utils_integration_load.py
new file mode 100644
index 0000000000..1b61e63c0e
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_launch_utils_integration_load.py
@@ -0,0 +1,166 @@
+#!/usr/bin/env python3
+# Copyright 2025 Robert Kwan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.import os
+
+import pytest
+import unittest
+from launch import LaunchDescription
+import launch_testing
+from launch_testing.actions import ReadyToTest
+
+# import launch_testing.asserts
+import launch_ros.actions
+from launch.substitutions import PathSubstitution
+from launch_ros.substitutions import FindPackageShare
+from launch.launch_context import LaunchContext
+
+
+import rclpy
+
+from controller_manager.launch_utils import generate_load_controller_launch_description
+
+
+def get_loaded_controllers(node, timeout=30.0):
+ """Query /controller_manager/list_controllers until controllers are available."""
+ from controller_manager_msgs.srv import ListControllers
+ import time
+
+ client = node.create_client(ListControllers, "/controller_manager/list_controllers")
+
+ if not client.wait_for_service(timeout_sec=timeout):
+ raise RuntimeError("Controller manager service not available")
+
+ seen = []
+ start_time = time.time()
+ while time.time() - start_time < timeout:
+ req = ListControllers.Request()
+ fut = client.call_async(req)
+ rclpy.spin_until_future_complete(node, fut, timeout_sec=2.0)
+
+ if fut.done() and fut.result() is not None:
+ response = fut.result()
+ seen = [c.name for c in response.controller]
+ if seen:
+ break
+
+ time.sleep(0.2)
+
+ return seen
+
+
+@pytest.mark.launch_test
+def generate_test_description():
+
+ # URDF path (pathlib version, no xacro)
+ urdf_subst = (
+ PathSubstitution(FindPackageShare("ros2_control_test_assets"))
+ / "urdf"
+ / "test_hardware_components.urdf"
+ )
+
+ context = LaunchContext()
+
+ urdf_path_str = urdf_subst.perform(context)
+
+ # DEBUG: You can print the resolved path here to verify:
+ print(f"Resolved URDF Path: {urdf_path_str}")
+
+ with open(urdf_path_str) as infp:
+ robot_description_content = infp.read()
+ robot_description = {"robot_description": robot_description_content}
+
+ # Path to combined YAML
+ robot_controllers = (
+ PathSubstitution(FindPackageShare("controller_manager"))
+ / "test"
+ / "test_ros2_control_node_combined.yaml"
+ )
+
+ context = LaunchContext()
+ robot_controllers_path = robot_controllers.perform(context)
+
+ print("Resolved controller YAML:", robot_controllers_path)
+
+ return LaunchDescription(
+ [
+ launch_ros.actions.Node(
+ package="robot_state_publisher",
+ executable="robot_state_publisher",
+ namespace="",
+ output="both",
+ parameters=[robot_description],
+ ),
+ launch_ros.actions.Node(
+ package="controller_manager",
+ executable="ros2_control_node",
+ namespace="",
+ parameters=[robot_description, robot_controllers_path],
+ output="both",
+ ),
+ generate_load_controller_launch_description(
+ controller_name="controller1",
+ controller_params_file=[robot_controllers_path],
+ ),
+ ReadyToTest(),
+ ]
+ ), {
+ "controller_name": "controller1",
+ }
+
+
+class TestControllerLoad(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ rclpy.init()
+
+ @classmethod
+ def tearDownClass(cls):
+ rclpy.shutdown()
+
+ def setUp(self):
+ self.node = rclpy.create_node("test_controller_client")
+
+ def test_controller_loaded(self, launch_service, proc_output, controller_name):
+
+ # Poll the ListControllers service to ensure the target controller is present
+ loaded_controllers = get_loaded_controllers(self.node, timeout=30.0)
+
+ # CRITICAL ASSERTION: The test passes only if the controller name is in the list
+ assert (
+ controller_name in loaded_controllers
+ ), f"Controller '{controller_name}' not found. Loaded: {loaded_controllers}"
+
+ def test_spawner_exit_code(self, proc_info):
+ """Test that spawner process ran (may have completed already)."""
+ process_names = proc_info.process_names()
+ print(f"\n[TEST] Checking for spawner in: {process_names}")
+
+ # The spawner may have already completed successfully and exited
+ # So we just verify that we have processes running
+ self.assertGreater(len(process_names), 0)
+ print(f"[TEST] ? Launch has {len(process_names)} active processes")
+
+
+@launch_testing.post_shutdown_test()
+class TestProcessOutput(unittest.TestCase):
+ """Post-shutdown tests."""
+
+ def test_exit_codes(self, proc_info):
+ """Verify all processes exited successfully."""
+ launch_testing.asserts.assertExitCodes(
+ proc_info,
+ # All other processes (ros2_control_node, etc.) must exit cleanly (0)
+ allowable_exit_codes=[0, 1, -2, -15],
+ )
diff --git a/controller_manager/test/test_launch_utils/test_launch_utils_unit.py b/controller_manager/test/test_launch_utils/test_launch_utils_unit.py
new file mode 100644
index 0000000000..9661a21a4d
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_launch_utils_unit.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# Copyright 2025 Robert Kwan
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Unit tests for launch_utils.py functions."""
+
+import unittest
+from launch import LaunchDescription
+from launch_ros.actions import Node
+
+from controller_manager.launch_utils import (
+ generate_controllers_spawner_launch_description,
+ generate_controllers_spawner_launch_description_from_dict,
+ generate_load_controller_launch_description,
+)
+
+
+class TestLaunchUtils(unittest.TestCase):
+ """
+ Test suite for launch_utils.py functions.
+
+ Tests three entry points:
+ 1. generate_controllers_spawner_launch_description (from list)
+ 2. generate_controllers_spawner_launch_description_from_dict (from dict)
+ 3. generate_load_controller_launch_description (load controllers)
+ """
+
+ # ------------------------------------------------------------------
+ # Helper methods
+ # ------------------------------------------------------------------
+ def _extract_actions(self, result):
+ """Return a list of launch actions, regardless of type."""
+ if isinstance(result, list):
+ return result
+ elif hasattr(result, "entities"): # LaunchDescription in newer ROS2
+ return list(result.entities)
+ else:
+ return [result]
+
+ def _assert_launch_result(self, result, min_len=0):
+ """Verify that result is list- or LaunchDescription-like."""
+ self.assertTrue(
+ isinstance(result, (list, LaunchDescription)),
+ f"Unexpected result type: {type(result)}",
+ )
+ actions = self._extract_actions(result)
+ self.assertGreaterEqual(len(actions), min_len)
+ for act in actions:
+ self.assertTrue(
+ hasattr(act, "execute") or hasattr(act, "visit"),
+ f"Invalid action type: {type(act)}",
+ )
+ return actions
+
+ # ------------------------------------------------------------------
+ # Launch utility tests
+ # ------------------------------------------------------------------
+ def test_generate_controllers_spawner_from_list(self):
+ controllers = ["test_controller_1", "test_controller_2"]
+ result = generate_controllers_spawner_launch_description(controllers)
+ actions = self._assert_launch_result(result, min_len=1)
+ self.assertTrue(actions is not None and len(actions) > 0)
+
+ def test_generate_controllers_spawner_from_dict(self):
+ """Ensure dict-based API works with string param files (old-style)."""
+ controllers = {
+ "ctrl_A": ["/tmp/dummy.yaml"],
+ "ctrl_B": ["/tmp/dummy.yaml"],
+ }
+ result = generate_controllers_spawner_launch_description_from_dict(controllers)
+ actions = self._extract_actions(result)
+ self.assertIsInstance(result, LaunchDescription)
+ self.assertEqual(len(actions), 3)
+ self.assertIsInstance(actions[-1], Node)
+
+ def test_generate_load_controller_launch_description(self):
+ """Test load controller description with valid single string params."""
+ controllers = ["test_controller_load"]
+ result = generate_load_controller_launch_description(controllers)
+ actions = self._extract_actions(result)
+ self.assertIsInstance(result, LaunchDescription)
+ self.assertEqual(len(actions), 3)
+ self.assertIsInstance(actions[-1], Node)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/controller_manager/test/test_launch_utils/test_ros2_control_node_combined.yaml b/controller_manager/test/test_launch_utils/test_ros2_control_node_combined.yaml
new file mode 100644
index 0000000000..56f3eefdae
--- /dev/null
+++ b/controller_manager/test/test_launch_utils/test_ros2_control_node_combined.yaml
@@ -0,0 +1,14 @@
+controller_manager:
+ ros__parameters:
+ update_rate: 100 # Hz
+
+ joint_state_broadcaster:
+ type: "joint_state_broadcaster/JointStateBroadcaster"
+
+ controller1:
+ type: "controller_manager/test_controller"
+ joint_names: ["joint1"]
+
+ controller2:
+ type: "controller_manager/test_controller"
+ joint_names: ["joint2"]
diff --git a/controller_manager/test/test_ros2_control_node_launch.py b/controller_manager/test/test_ros2_control_node_launch.py
index 9c9f8892ed..a8defcf80e 100644
--- a/controller_manager/test/test_ros2_control_node_launch.py
+++ b/controller_manager/test/test_ros2_control_node_launch.py
@@ -38,13 +38,17 @@
import launch_testing.markers
import launch_ros.actions
+
import rclpy
from controller_manager.test_utils import (
check_controllers_running,
check_node_running,
)
-from controller_manager.launch_utils import generate_controllers_spawner_launch_description
-
+from controller_manager.launch_utils import (
+ generate_controllers_spawner_launch_description,
+ generate_controllers_spawner_launch_description_from_dict,
+ generate_load_controller_launch_description,
+)
# Executes the given launch file and checks if all nodes can be started
@pytest.mark.launch_test
@@ -102,6 +106,33 @@ def setUp(self):
def tearDown(self):
self.node.destroy_node()
+ # ------------------------------------------------------------------
+ # Helper methods
+ # ------------------------------------------------------------------
+ def _extract_actions(self, result):
+ """Return a list of launch actions, regardless of type."""
+ if isinstance(result, list):
+ return result
+ elif hasattr(result, "entities"): # LaunchDescription in newer ROS2
+ return list(result.entities)
+ else:
+ return [result]
+
+ def _assert_launch_result(self, result, min_len=0):
+ """Verify that result is list- or LaunchDescription-like."""
+ self.assertTrue(
+ isinstance(result, (list, LaunchDescription)),
+ f"Unexpected result type: {type(result)}",
+ )
+ actions = self._extract_actions(result)
+ self.assertGreaterEqual(len(actions), min_len)
+ for act in actions:
+ self.assertTrue(
+ hasattr(act, "execute") or hasattr(act, "visit"),
+ f"Invalid action type: {type(act)}",
+ )
+ return actions
+
def test_node_start(self):
check_node_running(self.node, "controller_manager")
diff --git a/hardware_interface/CMakeLists.txt b/hardware_interface/CMakeLists.txt
index e10f7ec16f..a696e605d6 100644
--- a/hardware_interface/CMakeLists.txt
+++ b/hardware_interface/CMakeLists.txt
@@ -14,7 +14,6 @@ set(THIS_PACKAGE_INCLUDE_DEPENDS
rcutils
realtime_tools
TinyXML2
- tinyxml2_vendor
joint_limits
urdf
pal_statistics
@@ -50,7 +49,6 @@ target_link_libraries(hardware_interface PUBLIC
rcpputils::rcpputils
${joint_limits_TARGETS}
${TinyXML2_LIBRARIES}
- ${tinyxml2_vendor_LIBRARIES}
${pal_statistics_LIBRARIES}
${control_msgs_TARGETS}
${lifecycle_msgs_TARGETS}
diff --git a/hardware_interface/package.xml b/hardware_interface/package.xml
index 6d643c20d1..153e737f7f 100644
--- a/hardware_interface/package.xml
+++ b/hardware_interface/package.xml
@@ -23,7 +23,7 @@
rcpputils
realtime_tools
sdformat_urdf
- tinyxml2_vendor
+ tinyxml2
urdf
fmt
diff --git a/hardware_interface/src/resource_manager.cpp b/hardware_interface/src/resource_manager.cpp
index 0d300eed05..d2128dceb3 100644
--- a/hardware_interface/src/resource_manager.cpp
+++ b/hardware_interface/src/resource_manager.cpp
@@ -1003,8 +1003,10 @@ class ResourceStorage
{
if (interface)
{
- for (auto & [hw_name, limiters] : joint_limiters_interface_)
+ for (auto & entry : joint_limiters_interface_)
{
+ auto & limiters = entry.second;
+
// If the prefix is a joint name, then bind the limiter to the command interface
if (limiters.find(interface->get_prefix_name()) != limiters.end())
{