CoGroupByKey(共同グループ化キー)

Javadoc Javadoc


すべての入力要素をキーで集約し、ダウンストリーム処理でキーに関連付けられたすべての値を使用できるようにします。 `GroupByKey` は単一の入力コレクション、つまり単一のタイプの入力値に対してこの操作を実行しますが、`CoGroupByKey` は複数の入力コレクションに対して操作を実行します。 その結果、各キーの結果は、各入力コレクションでそのキーに関連付けられた値のタプルになります。

詳しくは、Beamプログラミングガイドをご覧ください。

例1:ユーザーデータを含む2つの異なるファイルがあるとします。1つのファイルには名前とメールアドレスが含まれ、もう1つのファイルには名前と電話番号が含まれています。

ユーザー名を共通キーとして、他のデータを関連付けられた値として使用して、これら2つのデータセットを結合できます。 結合後、各名前に関連付けられたすべての情報(メールアドレスと電話番号)を含む1つのデータセットが作成されます。

PCollection<KV<UID, Integer>> pt1 = /* ... */;
PCollection<KV<UID, String>> pt2 = /* ... */;

final TupleTag<Integer> t1 = new TupleTag<>();
final TupleTag<String> t2 = new TupleTag<>();
PCollection<KV<UID, CoGBKResult>> result =
  KeyedPCollectionTuple.of(t1, pt1).and(t2, pt2)
    .apply(CoGroupByKey.create());
result.apply(ParDo.of(new DoFn<KV<K, CoGbkResult>, /* some result */>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    KV<K, CoGbkResult> e = c.element();
    CoGbkResult result = e.getValue();
    // Retrieve all integers associated with this key from pt1
    Iterable<Integer> allIntegers = result.getAll(t1);
    // Retrieve the string associated with this key from pt2.
    // Note: This will fail if multiple values had the same key in pt2.
    String string = e.getOnly(t2);
    ...
}));

例2