Skip to content

Commit 9051ba0

Browse files
committed
New ListenerSet design.
1 parent b4153a5 commit 9051ba0

File tree

6 files changed

+148
-47
lines changed

6 files changed

+148
-47
lines changed

Common/Cpp/CancellableScope.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ void Cancellable::add_cancel_listener(CancelListener& listener){
3939
void Cancellable::remove_cancel_listener(CancelListener& listener){
4040
m_impl->m_listeners.remove(listener);
4141
}
42-
bool Cancellable::try_add_cancel_listener(CancelListener& listener){
43-
return m_impl->m_listeners.try_add(listener);
44-
}
4542

4643

4744
Cancellable::Cancellable()

Common/Cpp/CancellableScope.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ class Cancellable{
5252
void add_cancel_listener(CancelListener& listener);
5353
void remove_cancel_listener(CancelListener& listener);
5454

55-
bool try_add_cancel_listener(CancelListener& listener);
56-
5755

5856
public:
5957
virtual ~Cancellable();

Common/Cpp/ListenerSet.h

Lines changed: 144 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,20 @@
1010
#include <map>
1111
#include <atomic>
1212
#include "Common/Cpp/Concurrency/SpinLock.h"
13-
#include "Common/Cpp/LifetimeSanitizer.h"
1413

15-
//#include <iostream>
14+
#include <iostream>
1615
//using std::cout;
1716
//using std::endl;
1817

18+
1919
//#define PA_DEBUG_ListenerSet
2020

2121

22+
#ifdef PA_DEBUG_ListenerSet
23+
#include "Common/Cpp/LifetimeSanitizer.h"
24+
#endif
25+
26+
2227
namespace PokemonAutomation{
2328

2429

@@ -27,29 +32,55 @@ template <typename ListenerType>
2732
class ListenerSet{
2833
public:
2934
bool empty() const{
30-
return m_count.load(std::memory_order_relaxed) == 0;
35+
return m_count.load(std::memory_order_acquire) == 0;
3136
}
3237
size_t count_unique() const{
33-
return m_count.load(std::memory_order_relaxed);
38+
return m_count.load(std::memory_order_acquire);
3439
}
3540

41+
// Add a new listener. This will never fail unless it throws.
42+
// Deadlocking is not possible since there's only one local lock.
3643
void add(ListenerType& listener);
44+
45+
// Remove a listener. This will deadlock if a listener tries to remove
46+
// from inside a callback.
3747
void remove(ListenerType& listener);
3848

39-
bool try_add(ListenerType& listener);
49+
// Same as above, but will return false if it needs to wait.
50+
// This can never deadlock.
4051
bool try_remove(ListenerType& listener);
4152

4253
template <typename Function, class... Args>
4354
void run_method(Function function, Args&&... args);
4455

56+
4557
private:
4658
// Optimization. Keep an atomic version of the count. This will let us
4759
// skip the lock when there are no listeners.
4860
std::atomic<size_t> m_count;
4961

5062
mutable SpinLock m_lock;
51-
// mutable std::mutex m_lock;
52-
std::map<ListenerType*, size_t> m_listeners;
63+
64+
struct Node{
65+
SpinLock lock;
66+
ListenerType& listener;
67+
Node* next = nullptr;
68+
Node** prevs_next = nullptr;
69+
70+
#ifdef PA_DEBUG_ListenerSet
71+
LifetimeSanitizer sanitizer;
72+
#endif
73+
74+
Node(ListenerSet& parent, ListenerType& p_listener)
75+
: listener(p_listener)
76+
, prevs_next(&parent.m_list)
77+
#ifdef PA_DEBUG_ListenerSet
78+
, sanitizer("Node")
79+
#endif
80+
{}
81+
};
82+
Node* m_list = nullptr;
83+
std::map<ListenerType*, Node> m_listeners;
5384

5485
#ifdef PA_DEBUG_ListenerSet
5586
LifetimeSanitizer m_sanitizer;
@@ -72,42 +103,75 @@ void ListenerSet<ListenerType>::add(ListenerType& listener){
72103
auto scope = m_sanitizer.check_scope();
73104
#endif
74105
WriteSpinLock lg(m_lock);
75-
m_listeners[&listener]++;
106+
auto ret = m_listeners.emplace(
107+
std::piecewise_construct,
108+
std::forward_as_tuple(&listener),
109+
std::forward_as_tuple(*this, listener)
110+
);
111+
if (!ret.second){
112+
return;
113+
}
114+
Node& node = ret.first->second;
115+
#ifdef PA_DEBUG_ListenerSet
116+
node.sanitizer.check_usage();
117+
#endif
118+
if (m_list != nullptr){
119+
m_list->prevs_next = &node.next;
120+
}
121+
node.next = m_list;
122+
m_list = &node;
76123
m_count.store(m_listeners.size(), std::memory_order_relaxed);
77124
}
78125
template <typename ListenerType>
79126
void ListenerSet<ListenerType>::remove(ListenerType& listener){
80127
#ifdef PA_DEBUG_ListenerSet
81128
auto scope = m_sanitizer.check_scope();
82129
#endif
83-
WriteSpinLock lg(m_lock);
84-
auto iter = m_listeners.find(&listener);
85-
if (iter == m_listeners.end()){
86-
return;
87-
}
88-
if (--iter->second == 0){
130+
while (true){
131+
WriteSpinLock lg(m_lock);
132+
auto iter = m_listeners.find(&listener);
133+
if (iter == m_listeners.end()){
134+
return;
135+
}
136+
137+
Node& node = iter->second;
138+
139+
#ifdef PA_DEBUG_ListenerSet
140+
node.sanitizer.check_usage();
141+
#endif
142+
143+
if (!node.lock.try_acquire_write()){
144+
std::cout << "ListenerSet::remove(): Retry inner." << std::endl;
145+
continue;
146+
}
147+
148+
// std::cout << "node = " << &node.sanitizer << " : " << &node.prev->sanitizer << " : " << &node.next->sanitizer << std::endl;
149+
150+
*node.prevs_next = node.next;
151+
if (node.next){
152+
#ifdef PA_DEBUG_ListenerSet
153+
node.next->sanitizer.check_usage();
154+
#endif
155+
node.next->prevs_next = node.prevs_next;
156+
}
157+
158+
#ifdef PA_DEBUG_ListenerSet
159+
node.sanitizer.check_usage();
160+
#endif
161+
89162
m_listeners.erase(iter);
163+
m_count.store(m_listeners.size(), std::memory_order_relaxed);
164+
return;
90165
}
91-
m_count.store(m_listeners.size(), std::memory_order_relaxed);
92166
}
93167

94168

95169

96170
template <typename ListenerType>
97-
bool ListenerSet<ListenerType>::try_add(ListenerType& listener){
171+
bool ListenerSet<ListenerType>::try_remove(ListenerType& listener){
98172
#ifdef PA_DEBUG_ListenerSet
99173
auto scope = m_sanitizer.check_scope();
100174
#endif
101-
if (!m_lock.try_acquire_write()){
102-
return false;
103-
}
104-
m_listeners[&listener]++;
105-
m_count.store(m_listeners.size(), std::memory_order_relaxed);
106-
m_lock.unlock_write();
107-
return true;
108-
}
109-
template <typename ListenerType>
110-
bool ListenerSet<ListenerType>::try_remove(ListenerType& listener){
111175
if (!m_lock.try_acquire_write()){
112176
return false;
113177
}
@@ -116,9 +180,30 @@ bool ListenerSet<ListenerType>::try_remove(ListenerType& listener){
116180
m_lock.unlock_write();
117181
return true;
118182
}
119-
if (--iter->second == 0){
120-
m_listeners.erase(iter);
183+
184+
Node& node = iter->second;
185+
if (!node.lock.try_acquire_write()){
186+
std::cout << "ListenerSet::try_remove(): Fail inner." << std::endl;
187+
return false;
121188
}
189+
190+
#ifdef PA_DEBUG_ListenerSet
191+
node.sanitizer.check_usage();
192+
#endif
193+
194+
*node.prevs_next = node.next;
195+
if (node.next){
196+
#ifdef PA_DEBUG_ListenerSet
197+
node.next->sanitizer.check_usage();
198+
#endif
199+
node.next->prevs_next = node.prevs_next;
200+
}
201+
202+
#ifdef PA_DEBUG_ListenerSet
203+
node.sanitizer.check_usage();
204+
#endif
205+
206+
m_listeners.erase(iter);
122207
m_count.store(m_listeners.size(), std::memory_order_relaxed);
123208
m_lock.unlock_write();
124209
return true;
@@ -135,9 +220,37 @@ void ListenerSet<ListenerType>::run_method(Function function, Args&&... args){
135220
if (empty()){
136221
return;
137222
}
138-
ReadSpinLock lg(m_lock);
139-
for (auto& item : m_listeners){
140-
(item.first->*function)(std::forward<Args>(args)...);
223+
std::exception_ptr err;
224+
225+
m_lock.acquire_read();
226+
227+
Node* node = m_list;
228+
while (node){
229+
{
230+
ReadSpinLock lg(node->lock);
231+
232+
#ifdef PA_DEBUG_ListenerSet
233+
node->sanitizer.check_usage();
234+
#endif
235+
m_lock.unlock_read();
236+
try{
237+
(node->listener.*function)(std::forward<Args>(args)...);
238+
}catch (...){
239+
if (!err){
240+
err = std::current_exception();
241+
}
242+
}
243+
m_lock.acquire_read();
244+
}
245+
#ifdef PA_DEBUG_ListenerSet
246+
node->sanitizer.check_usage();
247+
#endif
248+
node = node->next;
249+
}
250+
251+
m_lock.unlock_read();
252+
if (err){
253+
std::rethrow_exception(err);
141254
}
142255
}
143256

SerialPrograms/Source/CommonFramework/VideoPipeline/VideoSession.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ void VideoSession::remove_state_listener(StateListener& listener){
2727
m_state_listeners.remove(listener);
2828
}
2929

30-
bool VideoSession::try_add_state_listener(StateListener& listener){
31-
return m_state_listeners.try_add(listener);
32-
}
3330
bool VideoSession::try_remove_state_listener(StateListener& listener){
3431
return m_state_listeners.try_remove(listener);
3532
}

SerialPrograms/Source/CommonFramework/VideoPipeline/VideoSession.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ class VideoSession
5858
// Remove the state listener.
5959
void remove_state_listener(StateListener& listener);
6060

61-
bool try_add_state_listener(StateListener& listener);
6261
bool try_remove_state_listener(StateListener& listener);
6362

6463
// Implements VideoFeed::add_frame_listener().

SerialPrograms/Source/Controllers/SerialPABotBase/Connection/PABotBase.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -958,17 +958,14 @@ BotBaseMessage PABotBase::wait_for_request(uint64_t seqnum, Cancellable* cancell
958958

959959

960960
void PABotBase::cv_wait(Cancellable* cancellable, std::unique_lock<std::mutex>& lg){
961-
// This doesn't work yet. Disable it.
962-
if (cancellable == nullptr || true){
961+
if (cancellable == nullptr){
963962
m_cv.wait(lg);
964963
return;
965964
}
966965

967-
// Only wait if we're able to attach the cancel listener.
968-
if (cancellable->try_add_cancel_listener(*this)){
969-
m_cv.wait(lg);
970-
cancellable->remove_cancel_listener(*this);
971-
}
966+
cancellable->add_cancel_listener(*this);
967+
m_cv.wait(lg);
968+
cancellable->remove_cancel_listener(*this);
972969
}
973970

974971

0 commit comments

Comments
 (0)