1717 */
1818package org .apache .beam .runners .flink ;
1919
20+ import java .util .ArrayList ;
21+ import java .util .Collections ;
22+ import java .util .HashMap ;
23+ import java .util .List ;
24+ import java .util .Map ;
2025import org .apache .beam .runners .core .KeyedWorkItem ;
2126import org .apache .beam .runners .core .SystemReduceFn ;
2227import org .apache .beam .runners .core .construction .SerializablePipelineOptions ;
5863import org .apache .flink .streaming .api .transformations .TwoInputTransformation ;
5964import org .apache .flink .util .Collector ;
6065
61- import java .util .ArrayList ;
62- import java .util .Collections ;
63- import java .util .HashMap ;
64- import java .util .List ;
65- import java .util .Map ;
66-
6766public class FlinkStreamingAggregationsTranslators {
68- public static class ConcatenateAsIterable <T > extends Combine .CombineFn <T , Iterable <T >, Iterable <T >> {
67+ public static class ConcatenateAsIterable <T >
68+ extends Combine .CombineFn <T , Iterable <T >, Iterable <T >> {
6969 @ Override
7070 public Iterable <T > createAccumulator () {
7171 return new ArrayList <>();
@@ -214,8 +214,7 @@ WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperato
214214 WindowedValue .getFullCoder (workItemCoder , windowingStrategy .getWindowFn ().windowCoder ());
215215
216216 // Key selector
217- WorkItemKeySelector <K , InputAccumT > workItemKeySelector =
218- new WorkItemKeySelector <>(keyCoder );
217+ WorkItemKeySelector <K , InputAccumT > workItemKeySelector = new WorkItemKeySelector <>(keyCoder );
219218
220219 return new WindowDoFnOperator <>(
221220 reduceFn ,
@@ -257,29 +256,30 @@ WindowDoFnOperator<K, InputAccumT, OutputAccumT> getWindowedAggregateDoFnOperato
257256 }
258257
259258 private static class FlattenIterable <K , InputT >
260- implements FlatMapFunction <WindowedValue <KV <K , Iterable <Iterable <InputT >>>>, WindowedValue <KV <K , Iterable <InputT >>>> {
259+ implements FlatMapFunction <
260+ WindowedValue <KV <K , Iterable <Iterable <InputT >>>>,
261+ WindowedValue <KV <K , Iterable <InputT >>>> {
261262 @ Override
262263 public void flatMap (
263264 WindowedValue <KV <K , Iterable <Iterable <InputT >>>> w ,
264- Collector <WindowedValue <KV <K , Iterable <InputT >>>> collector ) throws Exception {
265- WindowedValue <KV <K , Iterable <InputT >>> flattened = w .withValue (
266- KV .of (
267- w .getValue ().getKey (),
268- Iterables .concat (w .getValue ().getValue ())));
265+ Collector <WindowedValue <KV <K , Iterable <InputT >>>> collector )
266+ throws Exception {
267+ WindowedValue <KV <K , Iterable <InputT >>> flattened =
268+ w .withValue (KV .of (w .getValue ().getKey (), Iterables .concat (w .getValue ().getValue ())));
269269 collector .collect (flattened );
270270 }
271271 }
272272
273273 public static <K , InputT , AccumT , OutputT >
274- SingleOutputStreamOperator <WindowedValue <KV <K , OutputT >>> getBatchCombinePerKeyOperator (
275- FlinkStreamingTranslationContext context ,
276- PCollection <KV <K , InputT >> input ,
277- Map <Integer , PCollectionView <?>> sideInputTagMapping ,
278- List <PCollectionView <?>> sideInputs ,
279- Coder <WindowedValue <KV <K , AccumT >>> windowedAccumCoder ,
280- CombineFnBase .GlobalCombineFn <InputT , AccumT , ?> combineFn ,
281- WindowDoFnOperator <K , AccumT , OutputT > finalDoFnOperator ,
282- TypeInformation <WindowedValue <KV <K , OutputT >>> outputTypeInfo ){
274+ SingleOutputStreamOperator <WindowedValue <KV <K , OutputT >>> getBatchCombinePerKeyOperator (
275+ FlinkStreamingTranslationContext context ,
276+ PCollection <KV <K , InputT >> input ,
277+ Map <Integer , PCollectionView <?>> sideInputTagMapping ,
278+ List <PCollectionView <?>> sideInputs ,
279+ Coder <WindowedValue <KV <K , AccumT >>> windowedAccumCoder ,
280+ CombineFnBase .GlobalCombineFn <InputT , AccumT , ?> combineFn ,
281+ WindowDoFnOperator <K , AccumT , OutputT > finalDoFnOperator ,
282+ TypeInformation <WindowedValue <KV <K , OutputT >>> outputTypeInfo ) {
283283
284284 String fullName = FlinkStreamingTransformTranslators .getCurrentTransformName (context );
285285 DataStream <WindowedValue <KV <K , InputT >>> inputDataStream = context .getInputDataStream (input );
@@ -314,50 +314,55 @@ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> getBatchCombinePerKeyO
314314 if (sideInputs .isEmpty ()) {
315315 return inputDataStream
316316 .transform (partialName , partialTypeInfo , partialDoFnOperator )
317- .uid (partialName ).name (partialName )
317+ .uid (partialName )
318+ .name (partialName )
318319 .keyBy (accumKeySelector )
319320 .transform (fullName , outputTypeInfo , finalDoFnOperator )
320- .uid (fullName ).name (fullName );
321+ .uid (fullName )
322+ .name (fullName );
321323 } else {
322324
323325 Tuple2 <Map <Integer , PCollectionView <?>>, DataStream <RawUnionValue >> transformSideInputs =
324326 FlinkStreamingTransformTranslators .transformSideInputs (sideInputs , context );
325327
326328 TwoInputTransformation <
327- WindowedValue <KV <K , InputT >>, RawUnionValue , WindowedValue <KV <K , AccumT >>> rawPartialFlinkTransform =
328- new TwoInputTransformation <>(
329- inputDataStream .getTransformation (),
330- transformSideInputs .f1 .broadcast ().getTransformation (),
331- partialName ,
332- partialDoFnOperator ,
333- partialTypeInfo ,
334- inputDataStream .getParallelism ());
329+ WindowedValue <KV <K , InputT >>, RawUnionValue , WindowedValue <KV <K , AccumT >>>
330+ rawPartialFlinkTransform =
331+ new TwoInputTransformation <>(
332+ inputDataStream .getTransformation (),
333+ transformSideInputs .f1 .broadcast ().getTransformation (),
334+ partialName ,
335+ partialDoFnOperator ,
336+ partialTypeInfo ,
337+ inputDataStream .getParallelism ());
335338
336339 SingleOutputStreamOperator <WindowedValue <KV <K , AccumT >>> partialyCombinedStream =
337340 new SingleOutputStreamOperator <WindowedValue <KV <K , AccumT >>>(
338341 inputDataStream .getExecutionEnvironment (),
339342 rawPartialFlinkTransform ) {}; // we have to cheat around the ctor being protected
340343
341- inputDataStream .getExecutionEnvironment ().addOperator (rawPartialFlinkTransform );
344+ inputDataStream .getExecutionEnvironment ().addOperator (rawPartialFlinkTransform );
342345
343- return buildTwoInputStream (
344- partialyCombinedStream .keyBy (accumKeySelector ),
345- transformSideInputs .f1 ,
346- fullName ,
347- finalDoFnOperator ,
348- outputTypeInfo );
346+ return buildTwoInputStream (
347+ partialyCombinedStream .keyBy (accumKeySelector ),
348+ transformSideInputs .f1 ,
349+ fullName ,
350+ finalDoFnOperator ,
351+ outputTypeInfo );
349352 }
350353 }
351354
352355 /**
353- * Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized size since in batch
354- * it's very likely that all the elements will be within the same window and pane.
355- * The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It uses SystemReduceFn.buffering()
356- * instead of SystemReduceFn.combining() so that new element can simply be appended without accessing the existing state.
356+ * Creates a two-steps GBK operation. Elements are first aggregated locally to save on serialized
357+ * size since in batch it's very likely that all the elements will be within the same window and
358+ * pane. The only difference with batchCombinePerKey is the nature of the SystemReduceFn used. It
359+ * uses SystemReduceFn.buffering() instead of SystemReduceFn.combining() so that new element can
360+ * simply be appended without accessing the existing state.
357361 */
358- public static <K , InputT > SingleOutputStreamOperator <WindowedValue <KV <K , Iterable <InputT >>>> batchGroupByKey (
359- FlinkStreamingTranslationContext context ,
360- PTransform <PCollection <KV <K , InputT >>, PCollection <KV <K , Iterable <InputT >>>> transform ) {
362+ public static <K , InputT >
363+ SingleOutputStreamOperator <WindowedValue <KV <K , Iterable <InputT >>>> batchGroupByKey (
364+ FlinkStreamingTranslationContext context ,
365+ PTransform <PCollection <KV <K , InputT >>, PCollection <KV <K , Iterable <InputT >>>> transform ) {
361366
362367 Map <Integer , PCollectionView <?>> sideInputTagMapping = new HashMap <>();
363368 List <PCollectionView <?>> sideInputs = Collections .emptyList ();
@@ -372,58 +377,64 @@ public static <K, InputT> SingleOutputStreamOperator<WindowedValue<KV<K, Iterabl
372377 context .getTypeInfo (context .getOutput (transform ));
373378
374379 Coder <Iterable <InputT >> accumulatorCoder = IterableCoder .of (inputKvCoder .getValueCoder ());
375- KvCoder <K , Iterable <InputT >> accumKvCoder = KvCoder .of (inputKvCoder .getKeyCoder (), accumulatorCoder );
380+ KvCoder <K , Iterable <InputT >> accumKvCoder =
381+ KvCoder .of (inputKvCoder .getKeyCoder (), accumulatorCoder );
376382
377383 Coder <WindowedValue <KV <K , Iterable <InputT >>>> windowedAccumCoder =
378384 WindowedValue .getFullCoder (
379385 accumKvCoder , input .getWindowingStrategy ().getWindowFn ().windowCoder ());
380386
381387 Coder <WindowedValue <KV <K , Iterable <Iterable <InputT >>>>> outputCoder =
382388 WindowedValue .getFullCoder (
383- KvCoder .of (inputKvCoder .getKeyCoder (), IterableCoder .of (accumulatorCoder )) , input .getWindowingStrategy ().getWindowFn ().windowCoder ());
389+ KvCoder .of (inputKvCoder .getKeyCoder (), IterableCoder .of (accumulatorCoder )),
390+ input .getWindowingStrategy ().getWindowFn ().windowCoder ());
384391
385392 TypeInformation <WindowedValue <KV <K , Iterable <Iterable <InputT >>>>> accumulatedTypeInfo =
386393 new CoderTypeInformation <>(
387- WindowedValue .getFullCoder (
388- KvCoder .of (inputKvCoder .getKeyCoder (), IterableCoder .of (IterableCoder .of (inputKvCoder .getValueCoder ()))), input .getWindowingStrategy ().getWindowFn ().windowCoder ()),
394+ WindowedValue .getFullCoder (
395+ KvCoder .of (
396+ inputKvCoder .getKeyCoder (),
397+ IterableCoder .of (IterableCoder .of (inputKvCoder .getValueCoder ()))),
398+ input .getWindowingStrategy ().getWindowFn ().windowCoder ()),
389399 serializablePipelineOptions );
390400
391401 // final aggregation
392402 WindowDoFnOperator <K , Iterable <InputT >, Iterable <Iterable <InputT >>> finalDoFnOperator =
393- getWindowedAccumulateDoFnOperator (
394- context ,
395- transform ,
396- accumKvCoder ,
397- outputCoder ,
398- sideInputTagMapping ,
399- sideInputs );
400-
401- return
402- getBatchCombinePerKeyOperator (
403- context ,
404- input ,
405- sideInputTagMapping ,
406- sideInputs ,
407- windowedAccumCoder ,
408- new ConcatenateAsIterable <>(),
409- finalDoFnOperator ,
410- accumulatedTypeInfo
411- )
412- .flatMap (new FlattenIterable <>(), outputTypeInfo )
413- .name ("concatenate" );
403+ getWindowedAccumulateDoFnOperator (
404+ context , transform , accumKvCoder , outputCoder , sideInputTagMapping , sideInputs );
405+
406+ return getBatchCombinePerKeyOperator (
407+ context ,
408+ input ,
409+ sideInputTagMapping ,
410+ sideInputs ,
411+ windowedAccumCoder ,
412+ new ConcatenateAsIterable <>(),
413+ finalDoFnOperator ,
414+ accumulatedTypeInfo )
415+ .flatMap (new FlattenIterable <>(), outputTypeInfo )
416+ .name ("concatenate" );
414417 }
415418
416- private static <InputT , K > WindowDoFnOperator <K , Iterable <InputT >, Iterable <Iterable <InputT >>> getWindowedAccumulateDoFnOperator (
417- FlinkStreamingTranslationContext context ,
418- PTransform <PCollection <KV <K ,InputT >>, PCollection <KV <K , Iterable <InputT >>>> transform ,
419- KvCoder <K , Iterable <InputT >> accumKvCoder ,
420- Coder <WindowedValue <KV <K , Iterable <Iterable <InputT >>>>> outputCoder ,
421- Map <Integer , PCollectionView <?>> sideInputTagMapping ,
422- List <PCollectionView <?>> sideInputs ) {
419+ private static <InputT , K >
420+ WindowDoFnOperator <K , Iterable <InputT >, Iterable <Iterable <InputT >>>
421+ getWindowedAccumulateDoFnOperator (
422+ FlinkStreamingTranslationContext context ,
423+ PTransform <PCollection <KV <K , InputT >>, PCollection <KV <K , Iterable <InputT >>>>
424+ transform ,
425+ KvCoder <K , Iterable <InputT >> accumKvCoder ,
426+ Coder <WindowedValue <KV <K , Iterable <Iterable <InputT >>>>> outputCoder ,
427+ Map <Integer , PCollectionView <?>> sideInputTagMapping ,
428+ List <PCollectionView <?>> sideInputs ) {
423429
424- // Combining fn
425- SystemReduceFn <K , Iterable <InputT >, Iterable <Iterable <InputT >>, Iterable <Iterable <InputT >>, BoundedWindow > reduceFn =
426- SystemReduceFn .buffering (accumKvCoder .getValueCoder ());
430+ // Combining fn
431+ SystemReduceFn <
432+ K ,
433+ Iterable <InputT >,
434+ Iterable <Iterable <InputT >>,
435+ Iterable <Iterable <InputT >>,
436+ BoundedWindow >
437+ reduceFn = SystemReduceFn .buffering (accumKvCoder .getValueCoder ());
427438
428439 return getWindowedAggregateDoFnOperator (
429440 context , transform , accumKvCoder , outputCoder , reduceFn , sideInputTagMapping , sideInputs );
@@ -482,8 +493,7 @@ SingleOutputStreamOperator<WindowedValue<KV<K, OutputT>>> batchCombinePerKey(
482493 windowedAccumCoder ,
483494 combineFn ,
484495 finalDoFnOperator ,
485- outputTypeInfo
486- );
496+ outputTypeInfo );
487497 }
488498
489499 @ SuppressWarnings ({
0 commit comments