1+ #include <assert.h>
2+ #include <errno.h>
3+ #include <stdbool.h>
4+
5+ #include "atomics.h"
6+ #include "lfq.h"
7+
8+ #define MAX_FREE 150
9+
10+ static bool in_hp (struct lfq_ctx * ctx , struct lfq_node * node )
11+ {
12+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
13+ if (atomic_load (& ctx -> HP [i ]) == node )
14+ return true;
15+ }
16+ return false;
17+ }
18+
19+ /* add to tail of the free list */
20+ static void insert_pool (struct lfq_ctx * ctx , struct lfq_node * node )
21+ {
22+ atomic_store (& node -> free_next , NULL );
23+ struct lfq_node * old_tail = XCHG (& ctx -> fpt , node ); /* seq_cst */
24+ atomic_store (& old_tail -> free_next , node );
25+ }
26+
27+ static void free_pool (struct lfq_ctx * ctx , bool freeall )
28+ {
29+ bool old = 0 ;
30+ if (!CAS (& ctx -> is_freeing , & old , 1 ))
31+ return ;
32+
33+ for (int i = 0 ; i < MAX_FREE || freeall ; i ++ ) {
34+ struct lfq_node * p = ctx -> fph ;
35+ if ((!atomic_load (& p -> can_free )) || (!atomic_load (& p -> free_next )) ||
36+ in_hp (ctx , (struct lfq_node * ) p ))
37+ break ;
38+ ctx -> fph = p -> free_next ;
39+ free (p );
40+ }
41+ atomic_store (& ctx -> is_freeing , false);
42+ smp_mb ();
43+ }
44+
45+ static void safe_free (struct lfq_ctx * ctx , struct lfq_node * node )
46+ {
47+ if (atomic_load (& node -> can_free ) && !in_hp (ctx , node )) {
48+ /* free is not thread-safe */
49+ bool old = 0 ;
50+ if (CAS (& ctx -> is_freeing , & old , 1 )) {
51+ /* poison the pointer to detect use-after-free */
52+ node -> next = (void * ) -1 ;
53+ free (node ); /* we got the lock; actually free */
54+ atomic_store (& ctx -> is_freeing , false);
55+ smp_mb ();
56+ } else /* we did not get the lock; only add to a freelist */
57+ insert_pool (ctx , node );
58+ } else
59+ insert_pool (ctx , node );
60+ free_pool (ctx , false);
61+ }
62+
63+ static int alloc_tid (struct lfq_ctx * ctx )
64+ {
65+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
66+ if (ctx -> tid_map [i ] == 0 ) {
67+ int old = 0 ;
68+ if (CAS (& ctx -> tid_map [i ], & old , 1 ))
69+ return i ;
70+ }
71+ }
72+
73+ return -1 ;
74+ }
75+
76+ static void free_tid (struct lfq_ctx * ctx , int tid )
77+ {
78+ ctx -> tid_map [tid ] = 0 ;
79+ }
80+
81+ int lfq_init (struct lfq_ctx * ctx , int max_consume_thread )
82+ {
83+ struct lfq_node * tmp = calloc (1 , sizeof (struct lfq_node ));
84+ if (!tmp )
85+ return - errno ;
86+
87+ struct lfq_node * node = calloc (1 , sizeof (struct lfq_node ));
88+ if (!node )
89+ return - errno ;
90+
91+ tmp -> can_free = node -> can_free = true;
92+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
93+ ctx -> MAX_HP_SIZE = max_consume_thread ;
94+ ctx -> HP = calloc (max_consume_thread , sizeof (struct lfq_node ));
95+ ctx -> tid_map = calloc (max_consume_thread , sizeof (struct lfq_node ));
96+ ctx -> head = ctx -> tail = tmp ;
97+ ctx -> fph = ctx -> fpt = node ;
98+
99+ return 0 ;
100+ }
101+
102+ long lfg_count_freelist (const struct lfq_ctx * ctx )
103+ {
104+ long count = 0 ;
105+ for (struct lfq_node * p = (struct lfq_node * ) ctx -> fph ; p ; p = p -> free_next )
106+ count ++ ;
107+ return count ;
108+ }
109+
110+ int lfq_release (struct lfq_ctx * ctx )
111+ {
112+ if (ctx -> tail && ctx -> head ) { /* if we have data in queue */
113+ while ((struct lfq_node * ) ctx -> head ) { /* while still have node */
114+ struct lfq_node * tmp = (struct lfq_node * ) ctx -> head -> next ;
115+ safe_free (ctx , (struct lfq_node * ) ctx -> head );
116+ ctx -> head = tmp ;
117+ }
118+ ctx -> tail = 0 ;
119+ }
120+ if (ctx -> fph && ctx -> fpt ) {
121+ free_pool (ctx , true);
122+ if (ctx -> fph != ctx -> fpt )
123+ return -1 ;
124+ free (ctx -> fpt ); /* free the empty node */
125+ ctx -> fph = ctx -> fpt = 0 ;
126+ }
127+ if (ctx -> fph || ctx -> fpt )
128+ return -1 ;
129+
130+ free (ctx -> HP );
131+ free (ctx -> tid_map );
132+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
133+
134+ return 0 ;
135+ }
136+
137+ int lfq_enqueue (struct lfq_ctx * ctx , void * data )
138+ {
139+ struct lfq_node * insert_node = calloc (1 , sizeof (struct lfq_node ));
140+ if (!insert_node )
141+ return - errno ;
142+
143+ insert_node -> data = data ;
144+ struct lfq_node * old_tail = XCHG (& ctx -> tail , insert_node );
145+ /* We have claimed our spot in the insertion order by modifying tail.
146+ * we are the only inserting thread with a pointer to the old tail.
147+ *
148+ * Now we can make it part of the list by overwriting the NULL pointer in
149+ * the old tail. This is safe whether or not other threads have updated
150+ * ->next in our insert_node.
151+ */
152+ assert (!old_tail -> next && "old tail was not NULL" );
153+ atomic_store (& old_tail -> next , insert_node );
154+
155+ return 0 ;
156+ }
157+
158+ void * lfq_dequeue_tid (struct lfq_ctx * ctx , int tid )
159+ {
160+ struct lfq_node * old_head , * new_head ;
161+
162+ /* HP[tid] is necessary for deallocation. */
163+ do {
164+ retry :
165+ /* continue jumps to the bottom of the loop, and would attempt a CAS
166+ * with uninitialized new_head.
167+ */
168+ old_head = atomic_load (& ctx -> head );
169+
170+ atomic_store (& ctx -> HP [tid ], old_head );
171+ mb ();
172+
173+ /* another thread freed it before seeing our HP[tid] store */
174+ if (old_head != atomic_load (& ctx -> head ))
175+ goto retry ;
176+ new_head = atomic_load (& old_head -> next );
177+
178+ if (new_head == 0 ) {
179+ atomic_store (& ctx -> HP [tid ], 0 );
180+ return NULL ; /* never remove the last node */
181+ }
182+ } while (!CAS (& ctx -> head , & old_head , new_head ));
183+
184+ /* We have atomically advanced head, and we are the thread that won the race
185+ * to claim a node. We return the data from the *new* head. The list starts
186+ * off with a dummy node, so the current head is always a node that is
187+ * already been read.
188+ */
189+ atomic_store (& ctx -> HP [tid ], 0 );
190+ void * ret = new_head -> data ;
191+ atomic_store (& new_head -> can_free , true);
192+
193+ /* we need to avoid freeing until other readers are definitely not going to
194+ * load its ->next in the CAS loop
195+ */
196+ safe_free (ctx , (struct lfq_node * ) old_head );
197+
198+ return ret ;
199+ }
200+
201+ void * lfq_dequeue (struct lfq_ctx * ctx )
202+ {
203+ int tid = alloc_tid (ctx );
204+ /* To many thread race */
205+ if (tid == -1 )
206+ return (void * ) -1 ;
207+
208+ void * ret = lfq_dequeue_tid (ctx , tid );
209+ free_tid (ctx , tid );
210+ return ret ;
211+ }
0 commit comments