Skip to content

Commit 5ce76f9

Browse files
Acuadros95pablogs9
andauthored
Add multithread example (#47)
* Add multithread example * Update rclc/CMakeLists.txt * Apply topic name suggestion from code review Co-authored-by: Pablo Garrido <pablogs9@gmail.com> * Apply comments and allocate message memory with micro_ros_utilities Co-authored-by: Pablo Garrido <pablogs9@gmail.com>
1 parent 6cba262 commit 5ce76f9

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed

rclc/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ export_executable(static_type_handling)
5656
export_executable(epoch_synchronization)
5757
export_executable(parameter_server)
5858
# export_executable(configuration_example custom_transports)
59+
export_executable(multithread_publisher_subscriber)
5960

6061
if(EXISTS ${CMAKE_BINARY_DIR}/temp_install/)
6162
install(
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
cmake_minimum_required(VERSION 3.5)
2+
3+
project(multithread_publisher_subscriber LANGUAGES C)
4+
5+
find_package(ament_cmake REQUIRED)
6+
find_package(rcl REQUIRED)
7+
find_package(rclc REQUIRED)
8+
find_package(std_msgs REQUIRED)
9+
find_package(rmw_microxrcedds REQUIRED)
10+
find_package(micro_ros_utilities REQUIRED)
11+
12+
add_executable(${PROJECT_NAME} main.c)
13+
14+
ament_target_dependencies(${PROJECT_NAME}
15+
rcl
16+
rclc
17+
std_msgs
18+
rmw_microxrcedds
19+
micro_ros_utilities
20+
)
21+
22+
install(TARGETS ${PROJECT_NAME}
23+
DESTINATION ${PROJECT_NAME}
24+
)
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
#include <rcl/rcl.h>
2+
#include <rcl/error_handling.h>
3+
#include <rclc/rclc.h>
4+
#include <rclc/executor.h>
5+
6+
#include <std_msgs/msg/header.h>
7+
#include <micro_ros_utilities/type_utilities.h>
8+
#include <micro_ros_utilities/string_utilities.h>
9+
10+
#include <stdio.h>
11+
#include <pthread.h>
12+
#include <unistd.h>
13+
#include <time.h>
14+
15+
#define STRING_BUFFER_LEN 100
16+
#define PUBLISHER_NUMBER 3
17+
18+
#define RCCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){printf("Failed status on line %d: %d. Aborting.\n",__LINE__,(int)temp_rc); return 1;}}
19+
#define RCSOFTCHECK(fn) { rcl_ret_t temp_rc = fn; if((temp_rc != RCL_RET_OK)){printf("Failed status on line %d: %d. Continuing.\n",__LINE__,(int)temp_rc);}}
20+
21+
struct arg_struct {
22+
int index;
23+
rcl_publisher_t *publisher;
24+
};
25+
26+
rcl_node_t node;
27+
pthread_t pub_thr[PUBLISHER_NUMBER];
28+
rcl_publisher_t publisher[PUBLISHER_NUMBER];
29+
rcl_subscription_t subscriber;
30+
std_msgs__msg__Header recv_msg;
31+
static micro_ros_utilities_memory_conf_t conf = {0};
32+
33+
volatile bool exit_flag = false;
34+
35+
// Publish thread
36+
void* publish(
37+
void* args)
38+
{
39+
struct arg_struct *arguments = args;
40+
rcl_publisher_t *publisher = arguments->publisher;
41+
uint8_t id = arguments->index;
42+
43+
// Create and allocate the publisher message
44+
std_msgs__msg__Header send_msg;
45+
46+
bool success = micro_ros_utilities_create_message_memory(
47+
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Header),
48+
&send_msg,
49+
conf);
50+
51+
if (!success)
52+
{
53+
printf("Error allocating message memory for publisher %d", id);
54+
return NULL;
55+
}
56+
57+
char message[STRING_BUFFER_LEN];
58+
sprintf(message, "Thread %d", id);
59+
send_msg.frame_id = micro_ros_string_utilities_set(send_msg.frame_id, message);
60+
61+
uint32_t period_us = 1e6 + ((rand()) % 10) * 1e5;
62+
printf("Thread %d start, publish period: %.1f seconds\n", id, period_us/1000000.0);
63+
64+
while (!exit_flag)
65+
{
66+
// Fill the message timestamp
67+
struct timespec ts;
68+
clock_gettime(CLOCK_REALTIME, &ts);
69+
send_msg.stamp.sec = ts.tv_sec;
70+
send_msg.stamp.nanosec = ts.tv_nsec;
71+
72+
RCSOFTCHECK(rcl_publish(publisher, &send_msg, NULL));
73+
printf("Thread %d sent: %d-%d\n", id, send_msg.stamp.sec, send_msg.stamp.nanosec);
74+
usleep(period_us);
75+
}
76+
}
77+
78+
void subscription_callback(const void * msgin)
79+
{
80+
const std_msgs__msg__Header * msg = (const std_msgs__msg__Header *)msgin;
81+
printf("Received %d-%d from %s\n", msg->stamp.sec, msg->stamp.nanosec, msg->frame_id.data);
82+
}
83+
84+
int main(int argc, const char * const * argv)
85+
{
86+
rcl_allocator_t allocator = rcl_get_default_allocator();
87+
rclc_support_t support;
88+
89+
// Create init_options
90+
RCCHECK(rclc_support_init(&support, argc, argv, &allocator));
91+
92+
// Create node
93+
RCCHECK(rclc_node_init_default(&node, "multithread_node", "", &support));
94+
95+
// Create publishers
96+
for (size_t i = 0; i < PUBLISHER_NUMBER; i++)
97+
{
98+
RCCHECK(rclc_publisher_init_default(
99+
&publisher[i],
100+
&node,
101+
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Header),
102+
"multithread_topic"));
103+
}
104+
105+
// Create subscriber
106+
RCCHECK(rclc_subscription_init_default(
107+
&subscriber,
108+
&node,
109+
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Header),
110+
"multithread_topic"));
111+
112+
// Configure and allocate the subscriber message
113+
conf.max_string_capacity = STRING_BUFFER_LEN;
114+
115+
bool success = micro_ros_utilities_create_message_memory(
116+
ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Header),
117+
&recv_msg,
118+
conf);
119+
120+
if (!success)
121+
{
122+
printf("Error allocating message memory for subscriber");
123+
return 1;
124+
}
125+
126+
// Create executor
127+
rclc_executor_t executor = rclc_executor_get_zero_initialized_executor();
128+
RCCHECK(rclc_executor_init(&executor, &support.context, 1, &allocator));
129+
RCCHECK(rclc_executor_add_subscription(&executor, &subscriber, &recv_msg, &subscription_callback, ON_NEW_DATA));
130+
131+
// Start publish threads
132+
for (size_t i = 0; i < PUBLISHER_NUMBER; i++)
133+
{
134+
struct arg_struct *args = malloc(sizeof(struct arg_struct));
135+
args->publisher = &publisher[i];
136+
args->index = i;
137+
138+
pthread_create(&pub_thr[i], NULL, publish, args);
139+
}
140+
141+
// Set executor timeout to 100 ms to reduce thread locking time
142+
const uint64_t timeout_ns = 100000000;
143+
RCCHECK(rclc_executor_set_timeout(&executor,timeout_ns));
144+
rclc_executor_spin(&executor);
145+
146+
exit_flag = true;
147+
148+
RCCHECK(rcl_subscription_fini(&subscriber, &node));
149+
150+
for (size_t i = 0; i < PUBLISHER_NUMBER; i++)
151+
{
152+
pthread_join(pub_thr[i], NULL);
153+
RCCHECK(rcl_publisher_fini(&publisher[i], &node));
154+
}
155+
156+
RCCHECK(rcl_node_fini(&node));
157+
}

0 commit comments

Comments
 (0)