Skip to content

Commit 1412181

Browse files
committed
binder: make Listener callbacks mutually exclusive and fix in-use race
- Serialize transportShutdown/transportTerminated/transportReady/transportInUse under listener lock - Atomic in-use counter with reconcile to prevent incorrect false when −1/+1 race occurs - Dispatch callbacks asynchronously to avoid lock-order deadlocks - Behavior unchanged for users; improves correctness under concurrency
1 parent f00ff97 commit 1412181

File tree

1 file changed

+80
-23
lines changed

1 file changed

+80
-23
lines changed

binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java

Lines changed: 80 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ public final class BinderClientTransport extends BinderTransport
8686
/** Number of ongoing calls which keep this transport "in-use". */
8787
private final AtomicInteger numInUseStreams;
8888

89-
/** Last in-use state that was reported to the listener */
89+
/** Last in-use state reported to the transport listener */
9090
private final AtomicBoolean listenerInUse;
9191

92-
/** Synchronizes transport listener callbacks */
92+
/** Serializes transport listener callbacks */
9393
private final Object listenerNotifyLock;
9494

9595
private final long readyTimeoutMillis;
@@ -315,22 +315,39 @@ public synchronized void shutdownNow(Status reason) {
315315
@Override
316316
@GuardedBy("this")
317317
void notifyShutdown(Status status) {
318-
clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN);
318+
// Defer listener invocation to the listener executor to avoid calling
319+
// external code while holding the transport lock.
320+
scheduleOnListener(
321+
new Runnable() {
322+
@Override
323+
public void run() {
324+
clientTransportListener.transportShutdown(status, SimpleDisconnectError.UNKNOWN);
325+
}
326+
});
319327
}
320328

321329
@Override
322330
@GuardedBy("this")
323331
void notifyTerminated() {
324332
if (numInUseStreams.getAndSet(0) > 0) {
325-
listenerInUse.set(false);
326-
clientTransportListener.transportInUse(false);
333+
if (listenerInUse.compareAndSet(true, false)) {
334+
scheduleTransportInUseNotification(false);
335+
} else {
336+
listenerInUse.set(false);
337+
}
327338
}
328339
if (readyTimeoutFuture != null) {
329340
readyTimeoutFuture.cancel(false);
330341
readyTimeoutFuture = null;
331342
}
332343
serviceBinding.unbind();
333-
clientTransportListener.transportTerminated();
344+
scheduleOnListener(
345+
new Runnable() {
346+
@Override
347+
public void run() {
348+
clientTransportListener.transportTerminated();
349+
}
350+
});
334351
}
335352

336353
@Override
@@ -450,8 +467,11 @@ public void handleSetupTransport() {
450467
@GuardedBy("this")
451468
private void onHandshakeComplete() {
452469
setState(TransportState.READY);
453-
attributes = clientTransportListener.filterTransport(attributes);
454-
clientTransportListener.transportReady();
470+
final Attributes currentAttrs = attributes;
471+
// Defer listener callbacks (filterTransport and transportReady) to the listener executor
472+
// to avoid invoking listener code while holding the transport lock.
473+
scheduleFilterTransportAndReady(currentAttrs);
474+
455475
if (readyTimeoutFuture != null) {
456476
readyTimeoutFuture.cancel(false);
457477
readyTimeoutFuture = null;
@@ -464,42 +484,40 @@ private synchronized void handleAuthResult(Throwable t) {
464484
}
465485

466486
/**
467-
* Updates in-use stream count and notifies listener only on transitions between 0 and >0, without
468-
* acquiring the transport lock.
487+
* Updates the in-use stream count and triggers reconciliation of the listener in-use state,
488+
* without acquiring the transport lock.
469489
*/
470490
private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) {
471491
Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1");
472492
if (!countsForInUse) {
473493
return;
474494
}
475-
int prev, next;
476495

477496
if (delta > 0) {
478-
next = numInUseStreams.incrementAndGet();
479-
prev = next - 1;
497+
numInUseStreams.incrementAndGet();
480498
} else {
481-
prev = numInUseStreams.get();
482-
int updated;
499+
// Decrement with floor at 0
500+
int prev = numInUseStreams.get();
483501

484502
while (true) {
485503
int current = prev;
486504
int newValue = current > 0 ? current - 1 : 0;
487505
if (numInUseStreams.compareAndSet(current, newValue)) {
488-
updated = newValue;
489506
break;
490507
}
491508
prev = numInUseStreams.get();
492509
}
493-
next = updated;
494510
}
511+
reconcileInUseState();
512+
}
495513

496-
boolean prevInUse = prev > 0;
497-
boolean nextInUse = next > 0;
514+
/** Reconcile listenerInUse with the current stream count to avoid stale toggles under races. */
515+
private void reconcileInUseState() {
516+
boolean nowInUse = numInUseStreams.get() > 0;
517+
boolean prev = listenerInUse.get();
498518

499-
if (prevInUse != nextInUse) {
500-
if (listenerInUse.compareAndSet(prevInUse, nextInUse)) {
501-
scheduleTransportInUseNotification(nextInUse);
502-
}
519+
if(prev != nowInUse && listenerInUse.compareAndSet(prev, nowInUse)) {
520+
scheduleTransportInUseNotification(nowInUse);
503521
}
504522
}
505523

@@ -520,6 +538,45 @@ public void run() {
520538
});
521539
}
522540

541+
private void scheduleFilterTransportAndReady(final Attributes attrsSnapshot) {
542+
getScheduledExecutorService()
543+
.execute(
544+
new Runnable() {
545+
@Override
546+
public void run() {
547+
final Attributes filtered;
548+
synchronized (listenerNotifyLock) {
549+
filtered = clientTransportListener.filterTransport(attrsSnapshot);
550+
}
551+
552+
synchronized (BinderClientTransport.class) {
553+
attributes = filtered;
554+
}
555+
556+
scheduleOnListener(
557+
new Runnable() {
558+
@Override
559+
public void run() {
560+
clientTransportListener.transportReady();
561+
}
562+
});
563+
}
564+
});
565+
}
566+
567+
private void scheduleOnListener(final Runnable task) {
568+
getScheduledExecutorService()
569+
.execute(
570+
new Runnable() {
571+
@Override
572+
public void run() {
573+
synchronized (listenerNotifyLock) {
574+
task.run();
575+
}
576+
}
577+
});
578+
}
579+
523580
@GuardedBy("this")
524581
@Override
525582
protected void handlePingResponse(Parcel parcel) {

0 commit comments

Comments
 (0)