Skip to content

Commit 1ffe696

Browse files
committed
Internal refactoring, extra tests and Driver.servers
1 parent b5b81a4 commit 1ffe696

File tree

18 files changed

+245
-229
lines changed

18 files changed

+245
-229
lines changed

driver/src/main/java/org/neo4j/driver/internal/DirectDriver.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,22 @@
1818
*/
1919
package org.neo4j.driver.internal;
2020

21-
import org.neo4j.driver.internal.connector.socket.SocketConnector;
22-
import org.neo4j.driver.internal.pool.InternalConnectionPool;
21+
import org.neo4j.driver.internal.pool.SocketConnectionPool;
2322
import org.neo4j.driver.internal.pool.PoolSettings;
2423
import org.neo4j.driver.internal.security.SecurityPlan;
2524
import org.neo4j.driver.internal.spi.ConnectionPool;
2625
import org.neo4j.driver.internal.util.BoltServerAddress;
27-
import org.neo4j.driver.internal.util.Clock;
2826
import org.neo4j.driver.v1.Driver;
2927
import org.neo4j.driver.v1.Logging;
3028
import org.neo4j.driver.v1.Session;
3129
import org.neo4j.driver.v1.exceptions.ClientException;
3230
import org.neo4j.driver.v1.exceptions.Neo4jException;
3331

32+
import java.util.*;
33+
3434
public class DirectDriver implements Driver
3535
{
36+
private final Set<BoltServerAddress> servers;
3637
private final BoltServerAddress address;
3738
private final SecurityPlan securityPlan;
3839
private final Logging logging;
@@ -41,9 +42,18 @@ public class DirectDriver implements Driver
4142
public DirectDriver( BoltServerAddress address, SecurityPlan securityPlan, PoolSettings poolSettings, Logging logging )
4243
{
4344
this.address = address;
45+
Set<BoltServerAddress> servers = new HashSet<>();
46+
servers.add( this.address );
47+
this.servers = Collections.unmodifiableSet( servers );
4448
this.securityPlan = securityPlan;
4549
this.logging = logging;
46-
this.connections = new InternalConnectionPool( new SocketConnector(), Clock.SYSTEM, securityPlan, poolSettings, logging );
50+
this.connections = new SocketConnectionPool( securityPlan, poolSettings, logging );
51+
}
52+
53+
@Override
54+
public Set<BoltServerAddress> servers()
55+
{
56+
return servers;
4757
}
4858

4959
@Override

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public boolean isOpen()
170170

171171
private SocketProtocol negotiateProtocol() throws IOException
172172
{
173-
logger.debug( "~~ [HANDSHAKE] [0x6060B017, 1, 0, 0, 0]." );
173+
logger.debug( "~~ [HANDSHAKE] [0x6060B017, 1, 0, 0, 0]" );
174174
//Propose protocol versions
175175
ByteBuffer buf = ByteBuffer.allocateDirect( 5 * 4 ).order( BIG_ENDIAN );
176176
buf.putInt( MAGIC_PREAMBLE );
@@ -240,7 +240,7 @@ public static ByteChannel create( BoltServerAddress address, SecurityPlan securi
240240
SocketChannel soChannel = SocketChannel.open();
241241
soChannel.setOption( StandardSocketOptions.SO_REUSEADDR, true );
242242
soChannel.setOption( StandardSocketOptions.SO_KEEPALIVE, true );
243-
soChannel.connect( new InetSocketAddress( address.host(), address.port() ) );
243+
soChannel.connect( address.toSocketAddress() );
244244

245245
ByteChannel channel;
246246

driver/src/main/java/org/neo4j/driver/internal/connector/socket/SocketConnector.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/pool/InternalConnectionPool.java renamed to driver/src/main/java/org/neo4j/driver/internal/pool/SocketConnectionPool.java

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,26 @@
1818
*/
1919
package org.neo4j.driver.internal.pool;
2020

21-
import java.net.URI;
22-
import java.util.Arrays;
21+
import java.util.Map;
2322
import java.util.concurrent.BlockingQueue;
2423
import java.util.concurrent.ConcurrentHashMap;
2524
import java.util.concurrent.LinkedBlockingQueue;
2625
import java.util.concurrent.atomic.AtomicBoolean;
2726

27+
import org.neo4j.driver.internal.Version;
28+
import org.neo4j.driver.internal.connector.ConcurrencyGuardingConnection;
29+
import org.neo4j.driver.internal.connector.socket.SocketConnection;
30+
import org.neo4j.driver.internal.security.InternalAuthToken;
2831
import org.neo4j.driver.internal.security.SecurityPlan;
2932
import org.neo4j.driver.internal.spi.Connection;
3033
import org.neo4j.driver.internal.spi.ConnectionPool;
31-
import org.neo4j.driver.internal.spi.Connector;
3234
import org.neo4j.driver.internal.util.BoltServerAddress;
3335
import org.neo4j.driver.internal.util.Clock;
36+
import org.neo4j.driver.v1.AuthToken;
37+
import org.neo4j.driver.v1.AuthTokens;
3438
import org.neo4j.driver.v1.Logging;
39+
import org.neo4j.driver.v1.Value;
40+
import org.neo4j.driver.v1.exceptions.ClientException;
3541
import org.neo4j.driver.v1.exceptions.Neo4jException;
3642

3743
/**
@@ -46,31 +52,52 @@
4652
* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
4753
* at the same time.
4854
*/
49-
public class InternalConnectionPool implements ConnectionPool
55+
public class SocketConnectionPool implements ConnectionPool
5056
{
5157

5258
/**
53-
* Pools, organized by URL.
59+
* Pools, organized by server address.
5460
*/
5561
private final ConcurrentHashMap<BoltServerAddress,BlockingQueue<PooledConnection>> pools = new ConcurrentHashMap<>();
5662

57-
private final Connector connector;
63+
private final Clock clock = Clock.SYSTEM;
64+
5865
private final SecurityPlan securityPlan;
59-
private final Clock clock;
6066
private final PoolSettings poolSettings;
6167
private final Logging logging;
6268

6369
/** Shutdown flag */
6470
private final AtomicBoolean stopped = new AtomicBoolean( false );
6571

66-
public InternalConnectionPool( Connector connector, Clock clock, SecurityPlan securityPlan,
67-
PoolSettings poolSettings, Logging logging )
72+
public SocketConnectionPool( SecurityPlan securityPlan, PoolSettings poolSettings, Logging logging )
6873
{
6974
this.securityPlan = securityPlan;
70-
this.clock = clock;
7175
this.poolSettings = poolSettings;
7276
this.logging = logging;
73-
this.connector = connector;
77+
}
78+
79+
private Connection connect( BoltServerAddress address ) throws ClientException
80+
{
81+
Connection conn = new SocketConnection( address, securityPlan, logging );
82+
83+
// Because SocketConnection is not thread safe, wrap it in this guard
84+
// to ensure concurrent access leads causes application errors
85+
conn = new ConcurrencyGuardingConnection( conn );
86+
conn.init( "bolt-java-driver/" + Version.driverVersion(), tokenAsMap( securityPlan.authToken() ) );
87+
return conn;
88+
}
89+
90+
private static Map<String,Value> tokenAsMap( AuthToken token )
91+
{
92+
if( token instanceof InternalAuthToken )
93+
{
94+
return ((InternalAuthToken) token).toMap();
95+
}
96+
else
97+
{
98+
throw new ClientException( "Unknown authentication token, `" + token + "`. Please use one of the supported " +
99+
"tokens from `" + AuthTokens.class.getSimpleName() + "`." );
100+
}
74101
}
75102

76103
@Override
@@ -84,8 +111,8 @@ public Connection acquire( BoltServerAddress address )
84111
PooledConnection conn = connections.poll();
85112
if ( conn == null )
86113
{
87-
conn = new PooledConnection(connector.connect( address, securityPlan, logging ), new
88-
PooledConnectionReleaseConsumer( connections, stopped, poolSettings ), clock);
114+
conn = new PooledConnection( connect( address ), new
115+
PooledConnectionReleaseConsumer( connections, stopped, poolSettings ), clock );
89116
}
90117
conn.updateUsageTimestamp();
91118
return conn;

driver/src/main/java/org/neo4j/driver/internal/spi/ConnectionPool.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,19 @@
1616
* See the License for the specific language governing permissions and
1717
* limitations under the License.
1818
*/
19-
package org.neo4j.driver.internal.spi;
2019

20+
package org.neo4j.driver.internal.spi;
2121

2222
import org.neo4j.driver.internal.util.BoltServerAddress;
2323

2424
public interface ConnectionPool extends AutoCloseable
2525
{
2626
/**
27-
* Acquire a connection - if a live connection exists in the pool, it will be used, otherwise a new connection
28-
* is created with an applicable {@link Connector}.
27+
* Acquire a connection - if a live connection exists in the pool, it will
28+
* be used, otherwise a new connection will be created.
29+
*
2930
* @param address
3031
*/
3132
Connection acquire( BoltServerAddress address );
33+
3234
}

driver/src/main/java/org/neo4j/driver/internal/spi/Connector.java

Lines changed: 0 additions & 54 deletions
This file was deleted.

driver/src/main/java/org/neo4j/driver/internal/util/BoltServerAddress.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public static BoltServerAddress from( URI uri )
4444
private final String host;
4545
private final int port;
4646

47+
private SocketAddress socketAddress = null; // created lazily if required
48+
4749
public BoltServerAddress( String host, int port )
4850
{
4951
this.host = host;
@@ -55,12 +57,61 @@ public BoltServerAddress( String host )
5557
this( host, DEFAULT_PORT );
5658
}
5759

60+
@Override
61+
public boolean equals( Object obj )
62+
{
63+
if ( this == obj )
64+
{
65+
return true;
66+
}
67+
if ( !(obj instanceof BoltServerAddress) )
68+
{
69+
return false;
70+
}
71+
BoltServerAddress address = (BoltServerAddress) obj;
72+
return host.equals( address.host ) && port == address.port;
73+
}
74+
75+
@Override
76+
public int hashCode()
77+
{
78+
return 31 * host.hashCode() + port;
79+
}
80+
5881
@Override
5982
public String toString()
6083
{
6184
return format( "%s:%d", host, port );
6285
}
6386

87+
public SocketAddress toSocketAddress()
88+
{
89+
if (socketAddress == null)
90+
{
91+
socketAddress = new InetSocketAddress( host, port );
92+
}
93+
return socketAddress;
94+
}
95+
96+
/**
97+
* Resolve the host name down to an IP address, if not already resolved.
98+
*
99+
* @return this instance if already resolved, otherwise a new address instance
100+
* @throws UnknownHostException
101+
*/
102+
public BoltServerAddress resolve() throws UnknownHostException
103+
{
104+
String hostAddress = InetAddress.getByName( host ).getHostAddress();
105+
if ( hostAddress.equals( host ) )
106+
{
107+
return this;
108+
}
109+
else
110+
{
111+
return new BoltServerAddress( hostAddress, port );
112+
}
113+
}
114+
64115
public String host()
65116
{
66117
return host;

0 commit comments

Comments
 (0)