CoGroupByKey

Pydoc Pydoc




すべての入力要素をキー別に集約し、ダウンストリーム処理でキーに関連付けられたすべての値を使用できるようにします。GroupByKeyは単一の入力コレクション、つまり単一の型の入力値に対してこの操作を実行するのに対し、CoGroupByKeyは複数の入力コレクションに対して操作を実行します。その結果、各キーの結果は、各入力コレクションのそのキーに関連付けられた値のタプルになります。

Beamプログラミングガイドで詳細情報をご覧ください。

次の例では、農産物の2つのPCollection(アイコンと期間)を含むパイプラインを作成します。両方に農産物名の共通キーがあります。次に、CoGroupByKeyを適用して、キーを使用して両方のPCollectionを結合します。

CoGroupByKeyは、名前付きキー付きPCollectionの辞書を期待し、キーで結合された要素を生成します。各出力要素の値は、入力辞書に対応する名前を持つ辞書であり、そのキーに対して見つかったすべての値のリストが含まれます。

import apache_beam as beam

with beam.Pipeline() as pipeline:
  icon_pairs = pipeline | 'Create icons' >> beam.Create([
      ('Apple', '🍎'),
      ('Apple', '🍏'),
      ('Eggplant', '🍆'),
      ('Tomato', '🍅'),
  ])

  duration_pairs = pipeline | 'Create durations' >> beam.Create([
      ('Apple', 'perennial'),
      ('Carrot', 'biennial'),
      ('Tomato', 'perennial'),
      ('Tomato', 'annual'),
  ])

  plants = (({
      'icons': icon_pairs, 'durations': duration_pairs
  })
            | 'Merge' >> beam.CoGroupByKey()
            | beam.Map(print))

出力

('Apple', {'icons': ['🍎', '🍏'], 'durations': ['perennial']})
('Carrot', {'icons': [], 'durations': ['biennial']})
('Tomato', {'icons': ['🍅'], 'durations': ['perennial', 'annual']})
('Eggplant', {'icons': ['🍆'], 'durations': []})
Pydoc Pydoc