Skip to content

Commit 225361d

Browse files
author
Zhen
committed
When receiving unrecoverable errors, do not ack failure or queuing any more message but just wait to be closed when putting back to session pool
1 parent 1d734a4 commit 225361d

File tree

7 files changed

+47
-7
lines changed

7 files changed

+47
-7
lines changed

driver/src/main/java/org/neo4j/driver/internal/InternalSession.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,14 @@ public void run()
148148
//must check if transaction has been closed
149149
if (currentTransaction != null)
150150
{
151-
currentTransaction.failure();
152-
currentTransaction = null;
153-
connection.onError( null );
151+
if( connection.hasUnrecoverableErrors() )
152+
{
153+
currentTransaction.markToClose();
154+
}
155+
else
156+
{
157+
currentTransaction.failure();
158+
}
154159
}
155160
}
156161
});
@@ -165,12 +170,14 @@ public TypeSystem typeSystem()
165170

166171
private void ensureConnectionIsValidBeforeRunningSession()
167172
{
173+
ensureNoUnrecoverableError();
168174
ensureNoOpenTransactionBeforeRunningSession();
169175
ensureConnectionIsOpen();
170176
}
171177

172178
private void ensureConnectionIsValidBeforeOpeningTransaction()
173179
{
180+
ensureNoUnrecoverableError();
174181
ensureNoOpenTransactionBeforeOpeningTransaction();
175182
ensureConnectionIsOpen();
176183
}
@@ -187,6 +194,16 @@ protected void finalize() throws Throwable
187194
super.finalize();
188195
}
189196

197+
private void ensureNoUnrecoverableError()
198+
{
199+
if( connection.hasUnrecoverableErrors() )
200+
{
201+
throw new ClientException( "Cannot run more statements in the current session as unrecoverable errors " +
202+
"has happened. Please close the currect session and re-run your statement in a" +
203+
" new session." );
204+
}
205+
}
206+
190207
private void ensureNoOpenTransactionBeforeRunningSession()
191208
{
192209
if ( currentTransaction != null )

driver/src/main/java/org/neo4j/driver/internal/InternalTransaction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,5 +200,8 @@ public TypeSystem typeSystem()
200200
return InternalTypeSystem.TYPE_SYSTEM;
201201
}
202202

203-
203+
public void markToClose()
204+
{
205+
state = State.FAILED;
206+
}
204207
}

driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,12 @@ public void onError( Runnable runnable )
195195
delegate.onError( runnable );
196196
}
197197

198+
@Override
199+
public boolean hasUnrecoverableErrors()
200+
{
201+
return delegate.hasUnrecoverableErrors();
202+
}
203+
198204
private void markAsAvailable()
199205
{
200206
inUse.set( false );

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@ private void assertNoServerFailure()
153153
{
154154
if ( responseHandler.serverFailureOccurred() )
155155
{
156-
ackFailure( StreamCollector.NO_OP );
157156
Neo4jException exception = responseHandler.serverFailure();
158157
responseHandler.clearError();
159158
throw exception;
@@ -200,4 +199,10 @@ public void onError( Runnable runnable )
200199
{
201200
throw new UnsupportedOperationException( "Error subscribers are not supported on SocketConnection." );
202201
}
202+
203+
@Override
204+
public boolean hasUnrecoverableErrors()
205+
{
206+
throw new UnsupportedOperationException( "Unrecoverable error detection is not supported on SocketConnection." );
207+
}
203208
}

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ private void onDelegateException( RuntimeException e )
228228
{
229229
unrecoverableErrorsOccurred = true;
230230
}
231+
else
232+
{
233+
ackFailure( StreamCollector.NO_OP );
234+
}
231235
if( onError != null )
232236
{
233237
onError.run();

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnectionReleaseConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ else if ( driverStopped.get() )
8787

8888
boolean validConnection( PooledConnection pooledConnection )
8989
{
90-
return reset(pooledConnection) &&
91-
!pooledConnection.hasUnrecoverableErrors() &&
90+
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
91+
// and we should close the conn without bother to reset the conn at all
92+
return !pooledConnection.hasUnrecoverableErrors() &&
93+
reset(pooledConnection) &&
9294
(pooledConnection.idleTime() <= minIdleBeforeConnectionTest || ping( pooledConnection ));
9395
}
9496

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,7 @@ public interface Connection extends AutoCloseable
102102
* @param runnable
103103
*/
104104
void onError( Runnable runnable );
105+
106+
107+
boolean hasUnrecoverableErrors();
105108
}

0 commit comments

Comments
 (0)