Apache Beam Java SDK拡張機能
結合ライブラリ
結合ライブラリは、内部結合、左外部結合、右外部結合の関数を提供します。目的は、最も一般的な結合の場合を単純な関数呼び出しに簡素化することです。
関数は汎用的であり、Beamでサポートされている任意の型の結合をサポートします。結合関数の入力は、`Key`/`Value` の `PCollection` です。左右両方の `PCollection` でキーの型が同じである必要があります。すべての結合関数は、`Key`/`Value` を返します。ここで、`Key` は結合キーであり、値は `Key`/`Value` です。`Key` は左の値、右は値です。
外部結合の場合、ユーザーは `null` を表す値を提供する必要があります。`null` はシリアル化できないためです。
使用例
PCollection<KV<String, String>> leftPcollection = ...
PCollection<KV<String, Long>> rightPcollection = ...
PCollection<KV<String, KV<String, Long>>> joinedPcollection =
Join.innerJoin(leftPcollection, rightPcollection);
ソーター
このモジュールは、`SortValues`変換を提供します。これは、`PCollection<KV<K, Iterable<KV<K2, V>>>>` を受け取り、`PCollection<KV<K, Iterable<KV<K2, V>>>>` を生成します。ここで、各プライマリキー `K` について、ペアになっている `Iterable<KV<K2, V>>` は、セカンダリキー(`K2`)のバイトエンコーディングによってソートされています。これは、反復処理が大きくても(メモリに収まらない場合でも)、効率的でスケーラブルなソーターです。
注意事項
- この変換は、値のみのソートを実行します。各キーに付随する反復処理はソートされますが、Beamは `PCollection` 内の異なる要素間の定義された関係をサポートしていないため、*異なるキー間に関係はありません*。
- 各 `Iterable<KV<K2, V>>` は、ローカルメモリとディスクを使用して単一のワーカーでソートされます。これは、異なるパイプラインで使用された場合、`SortValues` がパフォーマンスまたはスケーラビリティのボトルネックになる可能性があることを意味します。たとえば、ユーザーは、大きな `PCollection` をグローバルにソートするために、単一要素の `PCollection` で `SortValues` を使用することはお勧めしません。ソートがディスクにスピルする場合に使用されるディスク容量の(概算)推定値は、`numRecords * (numSecondaryKeyBytesPerRecord + numValueBytesPerRecord + 16) * 3` です。
オプション
- ユーザーは、ソートでディスクへのスピルが必要な場合に使用される一時的な場所と、使用する最大メモリ量をカスタマイズできます。そのためには、`BufferedExternalSorter.Options` のカスタムインスタンスを作成して `SortValues.create` に渡します。
SortValues
の使用例
PCollection<KV<String, KV<String, Integer>>> input = ...
// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
input.apply(GroupByKey.<String, KV<String, Integer>>create());
// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
grouped.apply(
SortValues.<String, String, Integer>create(BufferedExternalSorter.options()));