4545import java .util .stream .Collectors ;
4646import java .util .stream .Stream ;
4747
48- public class ParallelSimilarityExporter extends StatementApi implements SimilarityExporter {
48+ public class ParallelSimilarityExporter extends SimilarityExporter {
4949
50- private final Log log ;
51- private final int propertyId ;
52- private final int relationshipTypeId ;
5350 private final int nodeCount ;
5451
5552 public ParallelSimilarityExporter (GraphDatabaseAPI api ,
5653 Log log ,
5754 String relationshipType ,
5855 String propertyName , int nodeCount ) {
59- super (api );
60- this .log = log ;
61- propertyId = getOrCreatePropertyId (propertyName );
62- relationshipTypeId = getOrCreateRelationshipId (relationshipType );
56+ super (api , log , propertyName , relationshipType );
6357 this .nodeCount = nodeCount ;
6458 }
6559
@@ -83,24 +77,18 @@ public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {
8377
8478 DisjointSetStruct dssResult = computePartitions (graph );
8579
86- Stream <List <DisjointSetStruct .InternalResult >> stream = dssResult .internalResultStream (graph )
87- .collect (Collectors .groupingBy (item -> item .setId ))
88- .values ()
89- .stream ();
80+ Stream <List <DisjointSetStruct .InternalResult >> partitions = groupPartitions (graph , dssResult );
9081
91- int queueSize = dssResult .getSetCount ();
92-
93- if (queueSize == 0 ) {
82+ int numberOfPartitions = dssResult .getSetCount ();
83+ if (numberOfPartitions == 0 ) {
9484 return 0 ;
9585 }
9686
97- log .info ("ParallelSimilarityExporter: Relationships to be created: %d, Partitions found: %d" , numberOfRelationships [0 ], queueSize );
98-
99- ArrayBlockingQueue <List <SimilarityResult >> outQueue = new ArrayBlockingQueue <>(queueSize );
100-
87+ log .info ("ParallelSimilarityExporter: Relationships to be created: %d, Partitions found: %d" , numberOfRelationships [0 ], numberOfPartitions );
88+ ArrayBlockingQueue <List <SimilarityResult >> outQueue = new ArrayBlockingQueue <>(numberOfPartitions );
10189
10290 AtomicInteger inQueueBatchCount = new AtomicInteger (0 );
103- stream .parallel ().forEach (partition -> {
91+ partitions .parallel ().forEach (partition -> {
10492 IntSet nodesInPartition = new IntHashSet ();
10593 for (DisjointSetStruct .InternalResult internalResult : partition ) {
10694 nodesInPartition .add (internalResult .internalNodeId );
@@ -137,13 +125,20 @@ public int export(Stream<SimilarityResult> similarityPairs, long batchSize) {
137125
138126
139127 int inQueueBatches = inQueueBatchCount .get ();
140-
141-
142128 int outQueueBatches = writeSequential (outQueue .stream ().flatMap (Collection ::stream ), batchSize );
129+
143130 log .info ("ParallelSimilarityExporter: Batch Size: %d, Batches written - in parallel: %d, sequentially: %d" , batchSize , inQueueBatches , outQueueBatches );
131+
144132 return inQueueBatches + outQueueBatches ;
145133 }
146134
135+ private Stream <List <DisjointSetStruct .InternalResult >> groupPartitions (HeavyGraph graph , DisjointSetStruct dssResult ) {
136+ return dssResult .internalResultStream (graph )
137+ .collect (Collectors .groupingBy (item -> item .setId ))
138+ .values ()
139+ .stream ();
140+ }
141+
147142 private static <T > void put (BlockingQueue <T > queue , T items ) {
148143 try {
149144 queue .put (items );
@@ -185,27 +180,6 @@ private void export(List<SimilarityResult> similarityResults) {
185180
186181 }
187182
188- private void createRelationship (SimilarityResult similarityResult , KernelTransaction statement ) throws EntityNotFoundException , InvalidTransactionTypeKernelException , AutoIndexingKernelException {
189- long node1 = similarityResult .item1 ;
190- long node2 = similarityResult .item2 ;
191- long relationshipId = statement .dataWrite ().relationshipCreate (node1 , relationshipTypeId , node2 );
192-
193- statement .dataWrite ().relationshipSetProperty (
194- relationshipId , propertyId , Values .doubleValue (similarityResult .similarity ));
195- }
196-
197- private int getOrCreateRelationshipId (String relationshipType ) {
198- return applyInTransaction (stmt -> stmt
199- .tokenWrite ()
200- .relationshipTypeGetOrCreateForName (relationshipType ));
201- }
202-
203- private int getOrCreatePropertyId (String propertyName ) {
204- return applyInTransaction (stmt -> stmt
205- .tokenWrite ()
206- .propertyKeyGetOrCreateForName (propertyName ));
207- }
208-
209183 private int writeSequential (Stream <SimilarityResult > similarityPairs , long batchSize ) {
210184 int [] counter = {0 };
211185 if (batchSize == 1 ) {
0 commit comments