@@ -109,8 +109,8 @@ void PABotBase::stop(std::string error_message){
109109 // Wake everyone up.
110110 {
111111 std::lock_guard<std::mutex> lg (m_sleep_lock);
112- m_cv.notify_all ();
113112 }
113+ m_cv.notify_all ();
114114 m_retransmit_thread.join ();
115115
116116 {
@@ -137,7 +137,9 @@ void PABotBase::stop(std::string error_message){
137137 m_state.store (State::STOPPED, std::memory_order_release);
138138}
139139void PABotBase::on_cancellable_cancel (){
140- std::unique_lock<std::mutex> lg (m_sleep_lock);
140+ {
141+ std::unique_lock<std::mutex> lg (m_sleep_lock);
142+ }
141143 m_cv.notify_all ();
142144}
143145
@@ -211,59 +213,67 @@ void PABotBase::next_command_interrupt(){
211213}
212214void PABotBase::clear_all_active_commands (uint64_t seqnum){
213215 auto scope_check = m_sanitizer.check_scope ();
216+ {
217+ // Remove all commands at or before the specified seqnum.
218+ std::lock_guard<std::mutex> lg0 (m_sleep_lock);
219+ WriteSpinLock lg1 (m_state_lock, " PABotBase::next_command_interrupt()" );
220+ m_logger.log (
221+ " Clearing all active commands... (Commands: " + std::to_string (m_pending_commands.size ()) + " )" ,
222+ COLOR_DARKGREEN
223+ );
214224
215- // Remove all commands at or before the specified seqnum.
216- std::lock_guard<std::mutex> lg0 (m_sleep_lock);
217- WriteSpinLock lg1 (m_state_lock, " PABotBase::next_command_interrupt()" );
218- m_logger.log (" Clearing all active commands... (Commands: " + std::to_string (m_pending_commands.size ()) + " )" , COLOR_DARKGREEN);
219-
220- m_cv.notify_all ();
221-
222- if (m_pending_commands.empty ()){
223- return ;
224- }
225225
226- // Remove all active commands up to the seqnum.
227- while (true ){
228- auto iter = m_pending_commands.begin ();
229- if (iter == m_pending_commands.end () || iter->first > seqnum){
230- break ;
226+ if (m_pending_commands.empty ()){
227+ return ;
231228 }
232- iter->second .sanitizer .check_usage ();
233-
234- // We cannot remove un-acked messages from our buffer. If an un-acked
235- // message is dropped and the receiver is still waiting for it, it will
236- // wait forever since we will never retransmit.
237-
238- if (iter->second .state == AckState::NOT_ACKED){
239- // Convert the command into a no-op request.
240- SerialPABotBase::DeviceRequest_program_id request;
241- BotBaseMessage message = request.message ();
242- seqnum_t seqnum_s = (seqnum_t )iter->first ;
243- memcpy (&message.body [0 ], &seqnum_s, sizeof (seqnum_t ));
244229
245- // cout << "removing = " << seqnum_s << ", " << (int)iter->second.state << endl;
246-
247- std::pair<std::map<uint64_t , PendingRequest>::iterator, bool > ret = m_pending_requests.emplace (
248- std::piecewise_construct,
249- std::forward_as_tuple (iter->first ),
250- std::forward_as_tuple ()
251- );
252- if (!ret.second ){
253- throw InternalProgramError (&m_logger, PA_CURRENT_FUNCTION, " Duplicate sequence number: " + std::to_string (seqnum));
230+ // Remove all active commands up to the seqnum.
231+ while (true ){
232+ auto iter = m_pending_commands.begin ();
233+ if (iter == m_pending_commands.end () || iter->first > seqnum){
234+ break ;
254235 }
236+ iter->second .sanitizer .check_usage ();
255237
256- // This block will never throw.
257- {
258- PendingRequest& handle = ret.first ->second ;
259- handle.silent_remove = true ;
260- handle.request = std::move (message);
261- handle.first_sent = current_time ();
238+ // We cannot remove un-acked messages from our buffer. If an un-acked
239+ // message is dropped and the receiver is still waiting for it, it will
240+ // wait forever since we will never retransmit.
241+
242+ if (iter->second .state == AckState::NOT_ACKED){
243+ // Convert the command into a no-op request.
244+ SerialPABotBase::DeviceRequest_program_id request;
245+ BotBaseMessage message = request.message ();
246+ seqnum_t seqnum_s = (seqnum_t )iter->first ;
247+ memcpy (&message.body [0 ], &seqnum_s, sizeof (seqnum_t ));
248+
249+ // cout << "removing = " << seqnum_s << ", " << (int)iter->second.state << endl;
250+
251+ std::pair<std::map<uint64_t , PendingRequest>::iterator, bool > ret = m_pending_requests.emplace (
252+ std::piecewise_construct,
253+ std::forward_as_tuple (iter->first ),
254+ std::forward_as_tuple ()
255+ );
256+ if (!ret.second ){
257+ throw InternalProgramError (
258+ &m_logger,
259+ PA_CURRENT_FUNCTION,
260+ " Duplicate sequence number: " + std::to_string (seqnum)
261+ );
262+ }
263+
264+ // This block will never throw.
265+ {
266+ PendingRequest& handle = ret.first ->second ;
267+ handle.silent_remove = true ;
268+ handle.request = std::move (message);
269+ handle.first_sent = current_time ();
270+ }
262271 }
263- }
264272
265- m_pending_commands.erase (iter);
273+ m_pending_commands.erase (iter);
274+ }
266275 }
276+ m_cv.notify_all ();
267277}
268278template <typename Map>
269279uint64_t PABotBase::infer_full_seqnum (const Map& map, seqnum_t seqnum) const {
@@ -354,8 +364,8 @@ void PABotBase::process_ack_request(BotBaseMessage message){
354364 case AckState::NOT_ACKED:
355365 {
356366 std::lock_guard<std::mutex> lg (m_sleep_lock);
357- m_cv.notify_all ();
358367 }
368+ m_cv.notify_all ();
359369 return ;
360370 case AckState::ACKED:
361371 m_logger.log (" Duplicate request ack message: seqnum = " + std::to_string (seqnum));
@@ -424,52 +434,54 @@ void PABotBase::process_command_finished(BotBaseMessage message){
424434 ack.seqnum = seqnum;
425435// m_send_queue.emplace_back((uint8_t)PABB_MSG_ACK, std::string((char*)&ack, sizeof(ack)));
426436
427- std::lock_guard<std::mutex> lg0 (m_sleep_lock);
428- WriteSpinLock lg1 (m_state_lock, " PABotBase::process_command_finished() - 0" );
437+ {
438+ std::lock_guard<std::mutex> lg0 (m_sleep_lock);
439+ WriteSpinLock lg1 (m_state_lock, " PABotBase::process_command_finished() - 0" );
429440
430441#ifdef INTENTIONALLY_DROP_MESSAGES
431- if (rand () % 10 != 0 ){
432- send_message (BotBaseMessage (PABB_MSG_ACK_REQUEST, std::string ((char *)&ack, sizeof (ack))), false );
433- }else {
434- m_logger.log (" Intentionally dropping finish ack: " + std::to_string (seqnum), COLOR_RED);
435- }
442+ if (rand () % 10 != 0 ){
443+ send_message (BotBaseMessage (PABB_MSG_ACK_REQUEST, std::string ((char *)&ack, sizeof (ack))), false );
444+ }else {
445+ m_logger.log (" Intentionally dropping finish ack: " + std::to_string (seqnum), COLOR_RED);
446+ }
436447#else
437- send_message (BotBaseMessage (PABB_MSG_ACK_REQUEST, std::string ((char *)&ack, sizeof (ack))), false );
448+ send_message (BotBaseMessage (PABB_MSG_ACK_REQUEST, std::string ((char *)&ack, sizeof (ack))), false );
438449#endif
439450
440- if (m_pending_commands.empty ()){
441- m_logger.log (
442- " Unexpected command finished message: seqnum = " + std::to_string (seqnum) +
443- " , command_seqnum = " + std::to_string (command_seqnum)
444- );
445- return ;
446- }
451+ if (m_pending_commands.empty ()){
452+ m_logger.log (
453+ " Unexpected command finished message: seqnum = " + std::to_string (seqnum) +
454+ " , command_seqnum = " + std::to_string (command_seqnum)
455+ );
456+ return ;
457+ }
447458
448- uint64_t full_seqnum = infer_full_seqnum (m_pending_commands, command_seqnum);
449- auto iter = m_pending_commands.find (full_seqnum);
450- if (iter == m_pending_commands.end ()){
451- m_logger.log (
452- " Unexpected command finished message: seqnum = " + std::to_string (seqnum) +
453- " , command_seqnum = " + std::to_string (command_seqnum)
454- );
455- return ;
456- }
457- iter->second .sanitizer .check_usage ();
459+ uint64_t full_seqnum = infer_full_seqnum (m_pending_commands, command_seqnum);
460+ auto iter = m_pending_commands.find (full_seqnum);
461+ if (iter == m_pending_commands.end ()){
462+ m_logger.log (
463+ " Unexpected command finished message: seqnum = " + std::to_string (seqnum) +
464+ " , command_seqnum = " + std::to_string (command_seqnum)
465+ );
466+ return ;
467+ }
468+ iter->second .sanitizer .check_usage ();
458469
459- switch (iter->second .state ){
460- case AckState::NOT_ACKED:
461- case AckState::ACKED:
462- iter->second .state = AckState::FINISHED;
463- iter->second .ack = std::move (message);
464- if (iter->second .silent_remove ){
465- m_pending_commands.erase (iter);
470+ switch (iter->second .state ){
471+ case AckState::NOT_ACKED:
472+ case AckState::ACKED:
473+ iter->second .state = AckState::FINISHED;
474+ iter->second .ack = std::move (message);
475+ if (iter->second .silent_remove ){
476+ m_pending_commands.erase (iter);
477+ }
478+ break ;
479+ case AckState::FINISHED:
480+ m_logger.log (" Duplicate command finish: seqnum = " + std::to_string (seqnum));
481+ return ;
466482 }
467- m_cv.notify_all ();
468- return ;
469- case AckState::FINISHED:
470- m_logger.log (" Duplicate command finish: seqnum = " + std::to_string (seqnum));
471- return ;
472483 }
484+ m_cv.notify_all ();
473485}
474486void PABotBase::on_recv_message (BotBaseMessage message){
475487 auto scope_check = m_sanitizer.check_scope ();
@@ -505,7 +517,9 @@ void PABotBase::on_recv_message(BotBaseMessage message){
505517 m_logger.log (m_error_message, COLOR_RED);
506518 }
507519 m_error.store (true , std::memory_order_release);
508- std::lock_guard<std::mutex> lg0 (m_sleep_lock);
520+ {
521+ std::lock_guard<std::mutex> lg0 (m_sleep_lock);
522+ }
509523 m_cv.notify_all ();
510524 }
511525 case PABB_MSG_ERROR_MISSED_REQUEST:{
@@ -521,7 +535,9 @@ void PABotBase::on_recv_message(BotBaseMessage message){
521535 m_logger.log (m_error_message, COLOR_RED);
522536 }
523537 m_error.store (true , std::memory_order_release);
524- std::lock_guard<std::mutex> lg0 (m_sleep_lock);
538+ {
539+ std::lock_guard<std::mutex> lg0 (m_sleep_lock);
540+ }
525541 m_cv.notify_all ();
526542 }
527543 return ;
@@ -533,7 +549,9 @@ void PABotBase::on_recv_message(BotBaseMessage message){
533549 m_error_message = " Disconnected by console." ;
534550 }
535551 m_error.store (true , std::memory_order_release);
536- std::lock_guard<std::mutex> lg0 (m_sleep_lock);
552+ {
553+ std::lock_guard<std::mutex> lg0 (m_sleep_lock);
554+ }
537555 m_cv.notify_all ();
538556 return ;
539557 }
0 commit comments