66#define _GNU_SOURCE
77#endif
88
9- #include <malloc.h>
109#include <pthread.h>
1110#include <stdbool.h>
1211#include <stdio.h>
1514
1615#define PAGE_SIZE 4096 /* FIXME: avoid hard-coded */
1716#define CACHE_LINE_SIZE 64 /* FIXME: make it configurable */
18- #define CACHE_ALIGNED __attribute__((aligned(CACHE_LINE_SIZE)))
19- #define DOUBLE_CACHE_ALIGNED __attribute__((aligned(2 * CACHE_LINE_SIZE)))
17+ #define __CACHE_ALIGNED __attribute__((aligned(CACHE_LINE_SIZE)))
18+ #define __DOUBLE___CACHE_ALIGNED __attribute__((aligned(2 * CACHE_LINE_SIZE)))
2019
21- static inline void * align_malloc (size_t align , size_t size )
20+ static inline void * align_alloc (size_t align , size_t size )
2221{
2322 void * ptr ;
2423 int ret = posix_memalign (& ptr , align , size );
@@ -30,32 +29,31 @@ static inline void *align_malloc(size_t align, size_t size)
3029}
3130
3231#define N (1 << 12) /* node size */
33- #define NBITS (N - 1)
32+ #define N_BITS (N - 1)
3433
3534typedef struct __node {
36- struct __node * volatile next DOUBLE_CACHE_ALIGNED ;
37- long id DOUBLE_CACHE_ALIGNED ;
38- void * volatile cells [N ] DOUBLE_CACHE_ALIGNED ;
35+ struct __node * volatile next __DOUBLE___CACHE_ALIGNED ;
36+ long id __DOUBLE___CACHE_ALIGNED ;
37+ void * cells [N ] __DOUBLE___CACHE_ALIGNED ;
3938} node_t ;
4039
41- #define HANDLES 128 /* support 127 threads */
40+ #define N_HANDLES 128 /* support 127 threads */
4241
4342typedef struct {
4443 node_t * spare ;
4544
46- node_t * volatile put_node CACHE_ALIGNED ;
47- node_t * volatile pop_node CACHE_ALIGNED ;
45+ node_t * volatile push __CACHE_ALIGNED ;
46+ node_t * volatile pop __CACHE_ALIGNED ;
4847} handle_t ;
4948
5049typedef struct {
5150 node_t * init_node ;
52- volatile long init_id DOUBLE_CACHE_ALIGNED ;
51+ volatile long init_id __DOUBLE___CACHE_ALIGNED ;
5352
54- volatile long put_index DOUBLE_CACHE_ALIGNED ;
55- volatile long pop_index DOUBLE_CACHE_ALIGNED ;
53+ volatile long put_index __DOUBLE___CACHE_ALIGNED ;
54+ volatile long pop_index __DOUBLE___CACHE_ALIGNED ;
5655
57- handle_t * volatile enq_handles [HANDLES ];
58- handle_t * volatile deq_handles [HANDLES ];
56+ handle_t * enqueue_handles [N_HANDLES ], * dequeue_handles [N_HANDLES ];
5957
6058 int threshold ;
6159
@@ -64,24 +62,24 @@ typedef struct {
6462
6563static inline node_t * mpmc_new_node ()
6664{
67- node_t * n = align_malloc (PAGE_SIZE , sizeof (node_t ));
65+ node_t * n = align_alloc (PAGE_SIZE , sizeof (node_t ));
6866 memset (n , 0 , sizeof (node_t ));
6967 return n ;
7068}
7169
7270enum queue_ops {
73- ENQ = 1 << 1 ,
74- DEQ = 1 << 0 ,
71+ DEQUEUE = 1 << 0 ,
72+ ENQUEUE = 1 << 1 ,
7573};
7674
7775/* register the enqueuers first, dequeuers second. */
7876void mpmc_queue_register (mpmc_t * q , handle_t * th , int flag )
7977{
8078 th -> spare = mpmc_new_node ();
81- th -> put_node = th -> pop_node = q -> init_node ;
79+ th -> push = th -> pop = q -> init_node ;
8280
83- if (flag & ENQ ) {
84- handle_t * * tail = q -> enq_handles ;
81+ if (flag & ENQUEUE ) {
82+ handle_t * * tail = q -> enqueue_handles ;
8583 for (int i = 0 ;; ++ i ) {
8684 handle_t * init = NULL ;
8785 if (!tail [i ] &&
@@ -93,8 +91,8 @@ void mpmc_queue_register(mpmc_t *q, handle_t *th, int flag)
9391 pthread_barrier_wait (& q -> enq_barrier );
9492 }
9593
96- if (flag & DEQ ) {
97- handle_t * * tail = q -> deq_handles ;
94+ if (flag & DEQUEUE ) {
95+ handle_t * * tail = q -> dequeue_handles ;
9896 for (int i = 0 ;; ++ i ) {
9997 handle_t * init = NULL ;
10098 if (!tail [i ] &&
@@ -157,10 +155,10 @@ static void *mpmc_find_cell(node_t *volatile *ptr, long i, handle_t *th)
157155 * ptr = curr ; /* update our node to the present node */
158156
159157 /* Orders processor execution, so other thread can see the '*ptr = curr' */
160- __asm( "sfence" ::: "cc" , "memory" ); /* FIXME: x86-only */
158+ __atomic_thread_fence ( __ATOMIC_SEQ_CST );
161159
162160 /* now we get the needed cell, its' node is curr and index is i % N */
163- return & curr -> cells [i & NBITS ];
161+ return & curr -> cells [i & N_BITS ];
164162}
165163
166164#include <linux/futex.h>
@@ -183,8 +181,7 @@ void mpmc_enqueue(mpmc_t *q, handle_t *th, void *v)
183181{
184182 /* return the needed index */
185183 void * volatile * c = mpmc_find_cell (
186- & th -> put_node , __atomic_fetch_add (& q -> put_index , 1 , __ATOMIC_SEQ_CST ),
187- th );
184+ & th -> push , __atomic_fetch_add (& q -> put_index , 1 , __ATOMIC_SEQ_CST ), th );
188185 /* __atomic_fetch_add(ptr, val) is an atomic fetch-and-add that also
189186 * ensures sequential consistency
190187 */
@@ -214,15 +211,19 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
214211 long index = __atomic_fetch_add (& q -> pop_index , 1 , __ATOMIC_SEQ_CST );
215212
216213 /* locate the needed cell */
217- void * volatile * c = mpmc_find_cell (& th -> pop_node , index , th );
214+ void * volatile * c = mpmc_find_cell (& th -> pop , index , th );
218215
219216 /* because the queue is a blocking queue, so we just use more spin. */
220217 int times = (1 << 20 );
221218 do {
222219 cv = * c ;
223220 if (cv )
224221 goto over ;
225- __asm__("pause" ); /* FIXME: x86-only */
222+ #if defined(__i386__ ) || defined(__x86_64__ )
223+ __asm__ __volatile__("pause" );
224+ #elif defined(__aarch64__ ) || defined(__arm__ )
225+ __asm__ __volatile__("isb\n" );
226+ #endif
226227 } while (times -- > 0 );
227228
228229 /* XCHG, if return NULL so this cell is NULL, we just wait and observe the
@@ -236,20 +237,20 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
236237 mpmc_futex_wait (& futex_addr , 1 );
237238 } while (futex_addr == 1 );
238239
239- /* the counterpart put thread has change futex_addr's value to 0. and the
240- * data has into cell(c).
240+ /* the counterpart put thread has change futex_addr's value to 0. and
241+ * the data has into cell(c).
241242 */
242243 cv = * c ;
243244 }
244245
245246over :
246- /* if the index is the node's last cell: (NBITS == 4095), it Try to reclaim
247+ /* if the index is the node's last cell: (N_BITS == 4095), it Try to reclaim
247248 * the memory. so we just take the smallest ID node that is not
248249 * reclaimed(init_node), and At the same time, by traversing the local data
249250 * of other threads, we get a larger ID node(min_node). So it is safe to
250251 * recycle the memory [init_node, min_node).
251252 */
252- if ((index & NBITS ) == NBITS ) {
253+ if ((index & N_BITS ) == N_BITS ) {
253254 /* __atomic_load_n(ptr, __ATOMIC_ACQUIRE) is a load with a following
254255 * acquire fence to ensure no following load and stores can start before
255256 * the current load completes.
@@ -260,33 +261,32 @@ void *mpmc_dequeue(mpmc_t *q, handle_t *th)
260261 * __ATOMIC_RELAXED) is an atomic compare-and-swap that ensures acquire
261262 * semantic when succeed or relaxed semantic when failed.
262263 */
263- if ((th -> pop_node -> id - init_index ) >= q -> threshold &&
264- init_index >= 0 &&
264+ if ((th -> pop -> id - init_index ) >= q -> threshold && init_index >= 0 &&
265265 __atomic_compare_exchange_n (& q -> init_id , & init_index , -1 , 0 ,
266266 __ATOMIC_ACQUIRE , __ATOMIC_RELAXED )) {
267267 node_t * init_node = q -> init_node ;
268- th = q -> deq_handles [0 ];
269- node_t * min_node = th -> pop_node ;
268+ th = q -> dequeue_handles [0 ];
269+ node_t * min_node = th -> pop ;
270270
271271 int i ;
272- handle_t * next = q -> deq_handles [i = 1 ];
272+ handle_t * next = q -> dequeue_handles [i = 1 ];
273273 while (next ) {
274- node_t * next_min = next -> pop_node ;
274+ node_t * next_min = next -> pop ;
275275 if (next_min -> id < min_node -> id )
276276 min_node = next_min ;
277277 if (min_node -> id <= init_index )
278278 break ;
279- next = q -> deq_handles [++ i ];
279+ next = q -> dequeue_handles [++ i ];
280280 }
281281
282- next = q -> enq_handles [i = 0 ];
282+ next = q -> enqueue_handles [i = 0 ];
283283 while (next ) {
284- node_t * next_min = next -> put_node ;
284+ node_t * next_min = next -> push ;
285285 if (next_min -> id < min_node -> id )
286286 min_node = next_min ;
287287 if (min_node -> id <= init_index )
288288 break ;
289- next = q -> enq_handles [++ i ];
289+ next = q -> enqueue_handles [++ i ];
290290 }
291291
292292 long new_id = min_node -> id ;
@@ -322,33 +322,32 @@ static pthread_barrier_t prod_barrier, cons_barrier;
322322static void * producer (void * index )
323323{
324324 mpmc_t * q = & mpmc ;
325- handle_t * th = malloc (sizeof (handle_t ));
326- memset (th , 0 , sizeof (handle_t ));
327- mpmc_queue_register (q , th , ENQ );
325+ handle_t * th = calloc (1 , sizeof (handle_t ));
326+ mpmc_queue_register (q , th , ENQUEUE );
328327
329328 for (;;) {
330329 pthread_barrier_wait (& prod_barrier );
331330 for (int i = 0 ; i < COUNTS_PER_THREAD ; ++ i )
332- mpmc_enqueue (q , th , 1 + i + ((int ) index ) * COUNTS_PER_THREAD );
331+ mpmc_enqueue (
332+ q , th , (void * ) 1 + i + ((intptr_t ) index ) * COUNTS_PER_THREAD );
333333 pthread_barrier_wait (& prod_barrier );
334334 }
335335 return NULL ;
336336}
337337
338- #define THREAD_NUM 4
338+ #define N_THREADS 4
339339static bool * array ;
340340static void * consumer (void * index )
341341{
342342 mpmc_t * q = & mpmc ;
343- handle_t * th = malloc (sizeof (handle_t ));
344- memset (th , 0 , sizeof (handle_t ));
345- mpmc_queue_register (q , th , DEQ );
343+ handle_t * th = calloc (1 , sizeof (handle_t ));
344+ mpmc_queue_register (q , th , DEQUEUE );
346345
347346 for (;;) {
348347 pthread_barrier_wait (& cons_barrier );
349348 for (long i = 0 ; i < COUNTS_PER_THREAD ; ++ i ) {
350349 int value ;
351- if (!(value = mpmc_dequeue (q , th )))
350+ if (!(value = ( intptr_t ) mpmc_dequeue (q , th )))
352351 return NULL ;
353352 array [value ] = true;
354353 }
@@ -361,24 +360,25 @@ static void *consumer(void *index)
361360
362361int main (int argc , char * argv [])
363362{
364- pthread_barrier_init (& prod_barrier , NULL , THREAD_NUM + 1 );
365- pthread_barrier_init (& cons_barrier , NULL , THREAD_NUM + 1 );
363+ pthread_barrier_init (& prod_barrier , NULL , N_THREADS + 1 );
364+ pthread_barrier_init (& cons_barrier , NULL , N_THREADS + 1 );
366365 if (argc >= 3 ) {
367366 COUNTS_PER_THREAD = atol (argv [1 ]);
368367 threshold = atoi (argv [2 ]);
369368 }
370369
371- printf ("Amount: %ld\n" , THREAD_NUM * COUNTS_PER_THREAD );
370+ printf ("Amount: %ld\n" , N_THREADS * COUNTS_PER_THREAD );
372371 fflush (stdout );
373- array = malloc ((1 + THREAD_NUM * COUNTS_PER_THREAD ) * sizeof (bool ));
374- memset (array , 0 , (1 + THREAD_NUM * COUNTS_PER_THREAD ) * sizeof (bool ));
375- mpmc_init_queue (& mpmc , THREAD_NUM , THREAD_NUM , threshold );
372+ array = calloc (1 , (1 + N_THREADS * COUNTS_PER_THREAD ) * sizeof (bool ));
373+ mpmc_init_queue (& mpmc , N_THREADS , N_THREADS , threshold );
376374
377- pthread_t pids [THREAD_NUM ];
375+ pthread_t pids [N_THREADS ];
378376
379- for (int i = 0 ; i < THREAD_NUM ; ++ i ) {
380- if (-1 == pthread_create (& pids [i ], NULL , producer , i ) ||
381- -1 == pthread_create (& pids [i ], NULL , consumer , i )) {
377+ for (int i = 0 ; i < N_THREADS ; ++ i ) {
378+ if (-1 == pthread_create (& pids [i ], NULL , producer ,
379+ (void * ) (intptr_t ) i ) ||
380+ -1 == pthread_create (& pids [i ], NULL , consumer ,
381+ (void * ) (intptr_t ) i )) {
382382 printf ("error create thread\n" );
383383 exit (1 );
384384 }
@@ -388,7 +388,7 @@ int main(int argc, char *argv[])
388388 printf ("\n#%d\n" , i );
389389
390390 pthread_barrier_wait (& cons_barrier );
391- sleep ( 1 );
391+ usleep ( 1e5 );
392392
393393 struct timeval start , prod_end ;
394394 gettimeofday (& start , NULL );
@@ -398,7 +398,7 @@ int main(int argc, char *argv[])
398398 gettimeofday (& prod_end , NULL );
399399
400400 bool verify = true;
401- for (int j = 1 ; j <= THREAD_NUM * COUNTS_PER_THREAD ; ++ j ) {
401+ for (int j = 1 ; j <= N_THREADS * COUNTS_PER_THREAD ; ++ j ) {
402402 if (!array [j ]) {
403403 printf ("Error: ints[%d]\n" , j );
404404 verify = false;
@@ -407,13 +407,13 @@ int main(int argc, char *argv[])
407407 }
408408 if (verify )
409409 printf ("ints[1-%ld] have been verified through\n" ,
410- THREAD_NUM * COUNTS_PER_THREAD );
410+ N_THREADS * COUNTS_PER_THREAD );
411411 float cost_time = (prod_end .tv_sec - start .tv_sec ) +
412412 (prod_end .tv_usec - start .tv_usec ) / 1000000.0 ;
413413 printf ("elapsed time: %f seconds\n" , cost_time );
414414 printf ("DONE #%d\n" , i );
415415 fflush (stdout );
416- memset (array , 0 , (1 + THREAD_NUM * COUNTS_PER_THREAD ) * sizeof (bool ));
416+ memset (array , 0 , (1 + N_THREADS * COUNTS_PER_THREAD ) * sizeof (bool ));
417417 }
418418 return 0 ;
419419}
0 commit comments