Skip to content

Commit 0f2f821

Browse files
committed
Fix cancellations removing unacked messages.
1 parent 25b40ff commit 0f2f821

File tree

1 file changed

+43
-13
lines changed

1 file changed

+43
-13
lines changed

ClientSource/Connection/PABotBase.cpp

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,53 @@ void PABotBase::clear_all_active_commands(uint64_t seqnum){
239239
WriteSpinLock lg1(m_state_lock, "PABotBase::next_command_interrupt()");
240240
m_logger.log("Clearing all active commands... (Commands: " + std::to_string(m_pending_commands.size()) + ")", COLOR_DARKGREEN);
241241

242-
// if (m_pending_commands.size() > 2){
243-
// cout << "asdf" << endl;
244-
// }
242+
m_cv.notify_all();
245243

246-
if (!m_pending_commands.empty()){
247-
// Remove all active commands up to the seqnum.
248-
while (true){
249-
auto iter = m_pending_commands.begin();
250-
if (iter == m_pending_commands.end() || iter->first > seqnum){
251-
break;
244+
if (m_pending_commands.empty()){
245+
return;
246+
}
247+
248+
// Remove all active commands up to the seqnum.
249+
while (true){
250+
auto iter = m_pending_commands.begin();
251+
if (iter == m_pending_commands.end() || iter->first > seqnum){
252+
break;
253+
}
254+
iter->second.sanitizer.check_usage();
255+
256+
// We cannot remove un-acked messages from our buffer. If an un-acked
257+
// message is dropped and the receiver is still waiting for it, it will
258+
// wait forever since we will never retransmit.
259+
260+
if (iter->second.state == AckState::NOT_ACKED){
261+
// Convert the command into a no-op request.
262+
Microcontroller::DeviceRequest_program_id request;
263+
BotBaseMessage message = request.message();
264+
seqnum_t seqnum_s = (seqnum_t)iter->first;
265+
memcpy(&message.body[0], &seqnum_s, sizeof(seqnum_t));
266+
267+
// cout << "removing = " << seqnum_s << ", " << (int)iter->second.state << endl;
268+
269+
std::pair<std::map<uint64_t, PendingRequest>::iterator, bool> ret = m_pending_requests.emplace(
270+
std::piecewise_construct,
271+
std::forward_as_tuple(iter->first),
272+
std::forward_as_tuple()
273+
);
274+
if (!ret.second){
275+
throw InternalProgramError(&m_logger, PA_CURRENT_FUNCTION, "Duplicate sequence number: " + std::to_string(seqnum));
276+
}
277+
278+
// This block will never throw.
279+
{
280+
PendingRequest& handle = ret.first->second;
281+
handle.silent_remove = true;
282+
handle.request = std::move(message);
283+
handle.first_sent = current_time();
252284
}
253-
iter->second.sanitizer.check_usage();
254-
m_pending_commands.erase(iter);
255285
}
256-
}
257286

258-
m_cv.notify_all();
287+
m_pending_commands.erase(iter);
288+
}
259289
}
260290
template <typename Map>
261291
uint64_t PABotBase::infer_full_seqnum(const Map& map, seqnum_t seqnum) const{

0 commit comments

Comments
 (0)