Skip to content

Commit fccaaa7

Browse files
pontusmelkeZhen
authored andcommitted
Send ACK_FAILURE instead of RESET
1 parent deb30b7 commit fccaaa7

File tree

11 files changed

+173
-6
lines changed

11 files changed

+173
-6
lines changed

driver/src/main/java/org/neo4j/driver/internal/connector/ConcurrencyGuardingConnection.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ public void reset( StreamCollector collector )
113113
}
114114
}
115115

116+
@Override
117+
public void ackFailure( StreamCollector collector )
118+
{
119+
try
120+
{
121+
markAsInUse();
122+
delegate.ackFailure( collector );
123+
}
124+
finally
125+
{
126+
markAsAvailable();
127+
}
128+
}
129+
116130
@Override
117131
public void sync()
118132
{

driver/src/main/java/org/neo4j/driver/internal/connector/socket/LoggingResponseHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.neo4j.driver.v1.Logger;
2525
import org.neo4j.driver.v1.Value;
2626

27+
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
2728
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
2829
import static org.neo4j.driver.internal.messaging.IgnoredMessage.IGNORED;
2930
import static org.neo4j.driver.internal.messaging.PullAllMessage.PULL_ALL;
@@ -74,6 +75,13 @@ public void handleResetMessage()
7475
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, RESET );
7576
}
7677

78+
@Override
79+
public void handleAckFailureMessage()
80+
{
81+
super.handleAckFailureMessage();
82+
logger.debug( DEFAULT_DEBUG_LOGGING_FORMAT, ACK_FAILURE );
83+
}
84+
7785
@Override
7886
public void handleSuccessMessage( Map<String,Value> meta )
7987
{

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnection.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.neo4j.driver.internal.messaging.InitMessage;
2828
import org.neo4j.driver.internal.messaging.Message;
2929
import org.neo4j.driver.internal.messaging.PullAllMessage;
30-
import org.neo4j.driver.internal.messaging.ResetMessage;
3130
import org.neo4j.driver.internal.messaging.RunMessage;
3231
import org.neo4j.driver.internal.spi.Connection;
3332
import org.neo4j.driver.v1.Logger;
@@ -37,7 +36,9 @@
3736
import org.neo4j.driver.v1.exceptions.ClientException;
3837
import org.neo4j.driver.v1.exceptions.Neo4jException;
3938

39+
import static org.neo4j.driver.internal.messaging.AckFailureMessage.ACK_FAILURE;
4040
import static org.neo4j.driver.internal.messaging.DiscardAllMessage.DISCARD_ALL;
41+
import static org.neo4j.driver.internal.messaging.ResetMessage.RESET;
4142

4243
public class SocketConnection implements Connection
4344
{
@@ -91,7 +92,13 @@ public void pullAll( StreamCollector collector )
9192
@Override
9293
public void reset( StreamCollector collector )
9394
{
94-
queueMessage( ResetMessage.RESET, collector );
95+
queueMessage( RESET, collector );
96+
}
97+
98+
@Override
99+
public void ackFailure( StreamCollector collector )
100+
{
101+
queueMessage( ACK_FAILURE, collector );
95102
}
96103

97104
@Override
@@ -146,7 +153,7 @@ private void assertNoServerFailure()
146153
{
147154
if ( responseHandler.serverFailureOccurred() )
148155
{
149-
reset( StreamCollector.NO_OP );
156+
ackFailure( StreamCollector.NO_OP );
150157
Neo4jException exception = responseHandler.serverFailure();
151158
responseHandler.clearError();
152159
throw exception;

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketResponseHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ public void handleResetMessage()
194194

195195
}
196196

197+
@Override
198+
public void handleAckFailureMessage()
199+
{
200+
201+
}
202+
197203
@Override
198204
public void handlePullAllMessage()
199205
{
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.messaging;
20+
21+
import java.io.IOException;
22+
23+
/**
24+
* ACK_FAILURE request message
25+
*
26+
* This message acts as a barrier after an error, informing the server that we've seen the error
27+
* message, and that messages that follow this one are safe to execute.
28+
*/
29+
public class AckFailureMessage implements Message
30+
{
31+
public static final AckFailureMessage ACK_FAILURE= new AckFailureMessage();
32+
33+
@Override
34+
public void dispatch( MessageHandler handler ) throws IOException
35+
{
36+
handler.handleAckFailureMessage();
37+
}
38+
39+
@Override
40+
public String toString()
41+
{
42+
return "[ACK_FAILURE]";
43+
}
44+
45+
@Override
46+
public boolean equals( Object obj )
47+
{
48+
return obj != null && obj.getClass() == getClass();
49+
}
50+
51+
@Override
52+
public int hashCode()
53+
{
54+
return 1;
55+
}
56+
}

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageHandler.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public interface MessageHandler
3636

3737
void handleResetMessage() throws IOException;
3838

39+
void handleAckFailureMessage() throws IOException;
40+
3941
// Responses
4042
void handleSuccessMessage( Map<String,Value> meta ) throws IOException;
4143

@@ -44,4 +46,5 @@ public interface MessageHandler
4446
void handleFailureMessage( String code, String message ) throws IOException;
4547

4648
void handleIgnoredMessage() throws IOException;
49+
4750
}

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
public class PackStreamMessageFormatV1 implements MessageFormat
5757
{
5858
public final static byte MSG_INIT = 0x01;
59+
public final static byte MSG_ACK_FAILURE = 0x0E;
5960
public final static byte MSG_RESET = 0x0F;
6061
public final static byte MSG_RUN = 0x10;
6162
public final static byte MSG_DISCARD_ALL = 0x2F;
@@ -151,6 +152,13 @@ public void handleResetMessage() throws IOException
151152
onMessageComplete.run();
152153
}
153154

155+
@Override
156+
public void handleAckFailureMessage() throws IOException
157+
{
158+
packer.packStructHeader( 0, MSG_ACK_FAILURE );
159+
onMessageComplete.run();
160+
}
161+
154162
@Override
155163
public void handleSuccessMessage( Map<String,Value> meta ) throws IOException
156164
{

driver/src/main/java/org/neo4j/driver/internal/pool/PooledConnection.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,19 @@ public void reset( StreamCollector collector )
136136
}
137137
}
138138

139+
@Override
140+
public void ackFailure( StreamCollector collector )
141+
{
142+
try
143+
{
144+
delegate.ackFailure( collector );
145+
}
146+
catch ( RuntimeException e )
147+
{
148+
onDelegateException( e );
149+
}
150+
}
151+
139152
@Override
140153
public void sync()
141154
{

driver/src/main/java/org/neo4j/driver/internal/spi/Connection.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ public interface Connection extends AutoCloseable
6060
*/
6161
void reset( StreamCollector collector );
6262

63+
/**
64+
* Queue a reset action, output will be handed to the collector once the pull starts. This will
65+
* close the stream once its completed, allowing another {@link #run(String, java.util.Map, StreamCollector) run}
66+
*/
67+
void ackFailure( StreamCollector collector );
68+
6369
/**
6470
* Ensure all outstanding actions are carried out on the server.
6571
*/

driver/src/test/java/org/neo4j/driver/v1/integration/ResultStreamIT.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.neo4j.driver.v1.Record;
2525
import org.neo4j.driver.v1.StatementResult;
26+
import org.neo4j.driver.v1.types.Node;
2627
import org.neo4j.driver.v1.util.TestNeo4jSession;
2728

2829
import static org.junit.Assert.assertEquals;
@@ -61,11 +62,26 @@ public void shouldHaveFieldNamesInResult()
6162
assertEquals( "[n]", res.keys().toString() );
6263
}
6364

65+
@Test
66+
public void should()
67+
{
68+
// When
69+
StatementResult res = session.run( "CREATE (n:TestNode {name:'test', prop: [42]}) RETURN n" );
70+
71+
Node n = res.next().get( "n" ).asNode();
72+
System.out.println( n );
73+
// Then
74+
assertEquals( "[n]", res.keys().toString() );
75+
assertNotNull( res.single() );
76+
assertEquals( "[n]", res.keys().toString() );
77+
}
78+
6479
@Test
6580
public void shouldGiveHelpfulFailureMessageWhenAccessNonExistingField() throws Throwable
6681
{
6782
// Given
68-
StatementResult rs = session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) );
83+
StatementResult rs =
84+
session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) );
6985

7086
// When
7187
Record single = rs.single();
@@ -78,7 +94,8 @@ public void shouldGiveHelpfulFailureMessageWhenAccessNonExistingField() throws T
7894
public void shouldGiveHelpfulFailureMessageWhenAccessNonExistingPropertyOnNode() throws Throwable
7995
{
8096
// Given
81-
StatementResult rs = session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) );
97+
StatementResult rs =
98+
session.run( "CREATE (n:Person {name:{name}}) RETURN n", parameters( "name", "Tom Hanks" ) );
8299

83100
// When
84101
Record record = rs.single();
@@ -96,4 +113,26 @@ public void shouldNotReturnNullKeysOnEmptyResult()
96113
// THEN
97114
assertNotNull( rs.keys() );
98115
}
116+
117+
@Test
118+
public void shouldBeAbleToReuseSessionAfterFailure() throws Throwable
119+
{
120+
// Given
121+
StatementResult res1 = session.run( "INVALID" );
122+
try
123+
{
124+
res1.consume();
125+
}
126+
catch ( Exception e )
127+
{
128+
//ignore
129+
}
130+
131+
// When
132+
StatementResult res2 = session.run( "RETURN 1" );
133+
134+
// Then
135+
assertTrue( res2.hasNext() );
136+
assertEquals( res2.next().get("1").asLong(), 1L );
137+
}
99138
}

0 commit comments

Comments
 (0)