Euphoria Java 8 DSL

Euphoriaとは

BeamのJava SDK上に構築された使いやすいJava 8 API。APIはデータ変換の高レベルな抽象化を提供し、Java 8の言語機能(例:ラムダ式とストリーム)に焦点を当てています。既存のBeam SDKと完全に相互運用可能であり、相互に変換できます。(オプションの)Kryoベースのコーダー、ラムダ式、高レベル演算子の使用による迅速なプロトタイピングを可能にし、既存のBeam Pipelinesにシームレスに統合できます。

Euphoria APIプロジェクトは2014年に開始され、Seznam.czのデータインフラストラクチャのための主要な構成要素を提供するという明確な目標がありました。2015年、DataFlowホワイトペーパーに触発されたオリジナルの作者は、一歩進んでストリーム処理とバッチ処理の両方のための統一されたAPIを提供することにしました。このAPIは2016年にオープンソース化され、現在も積極的に開発されています。Beamのコミュニティ目標は非常に似ていたため、Beam Java SDKの上位DSLとしてAPIを提供し、コミュニティと努力を共有することにしました。

Euphoria DSLの統合はまだ進行中です。BEAM-3900として追跡されています。

WordCount例

簡単な例から始めましょう。

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);

// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);

// Source of data loaded from Beam IO.
PCollection<String> input =
    pipeline
        .apply(Create.of(textLineByLine))
        .setTypeDescriptor(TypeDescriptor.of(String.class));

// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
    FlatMap.named("TOKENIZER")
        .of(lines)
        .using(
            (String line, Collector<String> context) -> {
              for (String word : Splitter.onPattern("\\s+").split(line)) {
                context.collect(word);
              }
            })
        .output();

// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
    CountByKey.named("COUNT")
        .of(words)
        .keyBy(w -> w)
        .output();

// Format output.
PCollection<String> output =
    MapElements.named("FORMAT")
        .of(counted)
        .using(p -> p.getKey() + ": " + p.getValue())
        .output();

// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
    .apply(TextIO.write()
    .to("counted_words"));

pipeline.run();

Euphoriaガイド

Euphoria APIは、アプリケーションのニーズに応じてPipelineを構築できる一連の演算子で構成されています。

入力と出力

入力データは、BeamのIOを介してPCollectionに供給できます。Beamの場合と同じ方法です。

PCollection<String> input =
  pipeline
    .apply(Create.of("mouse", "rat", "elephant", "cat", "X", "duck"))
    .setTypeDescriptor(TypeDescriptor.of(String.class));

演算子の追加

Euphoria APIの真の力は、演算子スイートにあります。各演算子は1つ以上の入力を使用し、1つの出力PCollectionを生成します。簡単なMapElementsの例を見てみましょう。

PCollection<Integer> input = ...

PCollection<String> mappedElements =
  MapElements
    .named("Int2Str")
    .of(input)
    .using(String::valueOf)
    .output();
この演算子はinputを使用し、指定されたラムダ式(String::valueOf)をinputの各要素に適用し、マッピングされたPCollectionを返します。開発者は演算子の作成時に一連の手順に従うため、演算子の宣言は簡単です。演算子の構築を開始するには、その名前と「.」を入力します。(ドット)。IDEがヒントを表示します。

任意の演算子を構築する最初の手順は、named()メソッドを使用して名前を付けることです。この名前はシステムを通じて伝播され、後でデバッグに使用できます。

コーダーと型

BeamのJava SDKでは、要素を具体化する手段として、カスタム要素型にCoderを供給する必要があります。Euphoriaでは、シリアライゼーションの方法としてKryoを使用できます。Kryo:sdks:java:extensions:kryoモジュールにあります。

//gradle
dependencies {
    compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-extensions-kryo</artifactId>
  <version>${beam.version}</version>
</dependency>

必要なのは、KryoCoderProviderを作成し、それをPipelineに登録することだけです。これを行うには2つの方法があります。

プロトタイピング時には、コーダーをあまり気にしない場合、Kryoへのクラス登録なしでKryoCoderProviderを作成できます。

//Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type
KryoCoderProvider.of().registerTo(pipeline);
このようなKryoCoderProviderは、プリミティブ以外のすべての要素型に対してKryoCoderを返します。もちろん、Kryoは未知の型のインスタンスを効果的にシリアライズできないため、パフォーマンスが低下します。しかし、パイプライン開発の速度は向上します。この動作はデフォルトで有効になっており、PipelineKryoOptionsで作成する際に無効にできます。
PipelineOptions options = PipelineOptionsFactory.create();
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

2つ目のよりパフォーマンスに優れた方法は、Kryoでシリアライズするすべての型を登録することです。独自のKryoシリアライザも登録するのも良いアイデアです。Euphoriaでは、独自のKryoRegistrarを実装し、KryoCoderProviderを作成する際にそれを使用することで、これを行うことができます。

//Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types
options.as(KryoOptions.class).setKryoRegistrationRequired(true);

KryoCoderProvider.of(
        (kryo) -> { //KryoRegistrar of your uwn
          kryo.register(KryoSerializedElementType.class); //other may follow
        })
    .registerTo(pipeline);
Beamは要素の型を使用してコーダーを解決します。要素型がラムダ実装によって記述されている場合、ランタイム時に型情報は使用できません。これは、型の消去とラムダ式の動的な性質によるものです。そのため、演算子の構築中に新しい型が導入されるたびにTypeDescriptorを供給するオプションの方法があります。
PCollection<Integer> input = ...

MapElements
  .named("Int2Str")
  .of(input)
  .using(String::valueOf, TypeDescriptors.strings())
  .output();
ユーザーがTypeDescriptorsを供給しない場合、Euphoria演算子はTypeDescriptor<Object>を使用します。そのため、KryoOptionsで許可されている場合、KryoCoderProviderは未知の型のすべての要素に対してKryoCoder<Object>を返す可能性があります。.setKryoRegistrationRequired(true)を使用する場合は、TypeDescriptorsの供給が必須になります。

メトリクスとアキュムレータ

ジョブの内部に関する統計は、分散ジョブの開発中に非常に役立ちます。Euphoriaでは、これらをアキュムレータと呼びます。これらは、環境Contextを介してアクセスできます。これは、Collectorで作業する際に取得できるものです。通常、演算子からゼロから多数の出力要素が期待される場合に存在します。たとえば、FlatMapの場合です。

Pipeline pipeline = ...
PCollection<String> dataset = ..

PCollection<String> mapped =
FlatMap
  .named("FlatMap1")
  .of(dataset)
  .using(
    (String value, Collector<String> context) -> {
      context.getCounter("my-counter").increment();
        context.collect(value);
    })
  .output();
MapElementsは、UnaryFunctorの代わりにUnaryFunctionEnv(2番目のコンテキスト引数を追加)の実装を提供することで、Contextへのアクセスも許可します。
Pipeline pipeline = ...
PCollection<String> dataset = ...

PCollection<String> mapped =
  MapElements
    .named("MapThem")
    .of(dataset)
    .using(
      (input, context) -> {
        // use simple counter
        context.getCounter("my-counter").increment();
        return input.toLowerCase();
        })
      .output();
アキュムレータはバックグラウンドでBeamメトリクスに変換されるため、同じように表示できます。変換されたメトリクスの名前空間は、演算子の名前に設定されます。

ウィンドウ処理

Euphoriaは、Beam Java SDKと同じウィンドウ処理の原則に従います。すべてのシャッフル演算子(ネットワークを介してデータをシャッフルする必要がある演算子)で設定できます。Beamと同じパラメータが必要です。WindowFnTriggerWindowingStrategyなど。ユーザーは、演算子の構築時に、すべての必須パラメータといくつかのオプションパラメータを設定するか、何も設定しないかのいずれかを選択するように促されます。ウィンドウ処理はPipelineを通じて伝播されます。

PCollection<KV<Integer, Long>> countedElements =
  CountByKey.of(input)
      .keyBy(e -> e)
      .windowBy(FixedWindows.of(Duration.standardSeconds(1)))
      .triggeredBy(DefaultTrigger.of())
      .discardingFiredPanes()
      .withAllowedLateness(Duration.standardSeconds(5))
      .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
      .withTimestampCombiner(TimestampCombiner.EARLIEST)
      .output();

Euphoriaの入手方法

Euphoriaは、Apache Beamプロジェクトのdsl-euphoriaブランチ、beam-sdks-java-extensions-euphoriaモジュールにあります。euphoriaサブプロジェクトをビルドするには、以下を実行します。

./gradlew beam-sdks-java-extensions-euphoria:build

演算子リファレンス

演算子は基本的に高レベルのデータ変換であり、データ処理ジョブのビジネスロジックを簡単に構築できます。すべてのEuphoria演算子は、このセクションで例を含めて説明されています。簡潔にするため、ウィンドウ処理が適用された例はありません。ウィンドウ処理のセクションを参照して詳細を確認してください。

CountByKey

同じキーを持つ要素をカウントします。入力データセットを、指定されたキー抽出器(UnaryFunction)によってキーにマッピングする必要があります。その後、キーがカウントされます。出力はKV<K, Long>Kはキー型)として出力され、各KVにはキーと、そのキーに対する入力データセット内の要素数が含まれています。

// suppose input: [1, 2, 4, 1, 1, 3]
PCollection<KV<Integer, Long>> output =
  CountByKey.of(input)
    .keyBy(e -> e)
    .output();
// Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]

Distinct

異なる(equalsメソッドに基づく)要素を出力します。要素を出力型にマッピングするオプションのUnaryFunctionマッパーパラメータを受け入れます。

// suppose input: [1, 2, 3, 3, 2, 1]
Distinct.named("unique-integers-only")
  .of(input)
  .output();
// Output will contain:  1, 2, 3
 
マッパーを使用したDistinct
// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]
Distinct.named("unique-keys-only")
  .of(keyValueInput)
  .projected(KV::getKey)
  .output();
// Output will contain kvs with keys:  1, 3, 42 with some arbitrary values associated with given keys

Join

指定されたキーで2つの(左と右の)データセットの内部結合を表し、新しいデータセットを生成します。キーは個別の抽出器によって両方のデータセットから抽出されるため、左と右の要素の型は、LeftTRightTとして表される異なる型にすることができます。結合自体は、同じキーを共有する両方のデータセットから要素を受け取るユーザーが提供したBinaryFunctorによって実行されます。そして、結合の結果(OutputT)を出力します。この演算子は、KV<K, OutputT>型の出力データセットを出力します。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  Join.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]

LeftJoin

2 つのデータセット(左と右)を指定されたキーに基づいて左外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftTRightT として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor によって実行され、両方のデータセットから1 つの要素(右側はオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT)を出力します。この演算子は、KV<K, OutputT> 型の出力データセットを出力します。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  LeftJoin.named("left-join-length-to-words")
      .of(left, right)
      .by(le -> le, String::length) // key extractors
      .using(
          (Integer l, Optional<String> r, Collector<String> c) ->
              c.collect(l + "+" + r.orElse(null)))
      .output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
Euphoria は、LeftJoin に対して「BroadcastHashJoin」と呼ばれるパフォーマンス最適化をサポートしています。ブロードキャスト結合は、一方のデータセットがメモリに収まる2 つのデータセットを結合する場合に非常に効率的です(LeftJoin では、右側のデータセットがメモリに収まる必要があります)。「Broadcast Hash Join」の使用方法については、翻訳セクションを参照してください。

RightJoin

2 つのデータセット(左と右)を指定されたキーに基づいて右外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftTRightT として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor によって実行され、両方のデータセットから1 つの要素(左側はオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT)を出力します。この演算子は、KV<K, OutputT> 型の出力データセットを出力します。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  RightJoin.named("right-join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, String r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r))
    .output();
    // joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
    // KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
    // KV(8, "null+elephant"), KV(5, "null+mouse")]
Euphoria は、RightJoin に対して「BroadcastHashJoin」と呼ばれるパフォーマンス最適化をサポートしています。ブロードキャスト結合は、一方のデータセットがメモリに収まる2 つのデータセットを結合する場合に非常に効率的です(RightJoin では、左側のデータセットがメモリに収まる必要があります)。「Broadcast Hash Join」の使用方法については、翻訳セクションを参照してください。

FullJoin

2 つのデータセット(左と右)を指定されたキーに基づいて完全外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftTRightT として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor によって実行され、両方のデータセットから1 つの要素(両方がオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT)を出力します。この演算子は、KV<K, OutputT> 型の出力データセットを出力します。

// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
  FullJoin.named("join-length-to-words")
    .of(left, right)
    .by(le -> le, String::length) // key extractors
    .using(
      (Optional<Integer> l, Optional<String> r, Collector<String> c) ->
        c.collect(l.orElse(null) + "+" + r.orElse(null)))
    .output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
//  KV(1, "null+elephant"), KV(5, "null+mouse")]

MapElements

入力型InputTの1つの入力要素を、別の(同じ可能性のある)OutputT型の1つの出力要素に変換します。変換は、ユーザーが指定したUnaryFunctionによって実行されます。

// suppose inputs contains: [ 0, 1, 2, 3, 4, 5]
PCollection<String> strings =
  MapElements.named("int2str")
    .of(input)
    .using(i -> "#" + i)
    .output();
// strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]

FlatMap

入力型InputTの1つの入力要素を、別の(同じ可能性のある)OutputT型の0個以上の出力要素に変換します。変換は、ユーザーが指定したUnaryFunctorによって実行され、Collector<OutputT>を使用して出力要素を出力します。常に1つの要素しか出力できないMapElementsとの類似性に注意してください。

// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
  FlatMap.named("str2char")
    .of(words)
    .using(
      (String s, Collector<String> collector) -> {
        for (int i = 0; i < s.length(); i++) {
          char c = s.charAt(i);
          collector.collect(String.valueOf(c));
        }
      })
    .output();
// characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]
FlatMapを使用して要素のタイムスタンプを決定できます。これは、構築時にExtractEventTimeタイム抽出器の実装を提供することで行われます。要素にタイムスタンプを割り当てるための特別なAssignEventTime演算子があります。これを使用することを検討してください。コードの可読性が向上する可能性があります。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  FlatMap.named("extract-event-time")
    .of(events)
    .using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
    .eventTimeBy(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

Filter

Filterは、指定された条件を満たさないすべての要素を破棄します。条件は、ユーザーがUnaryPredicateの実装として提供します。入力要素と出力要素は同じ型です。

// suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]
PCollection<Integer> divisibleBythree =
  Filter.named("divisibleByThree").of(nums).by(e -> e % 3 == 0).output();
//divisibleBythree will contain: [ 0, 3, 6, 9]

ReduceByKey

ユーザーが提供するreduce関数を使用して、同じキーを持つInputT型要素の集約を実行します。キーは、入力要素を受け取り、型Kのキーを出力するUnaryFunctionを介して各要素から抽出されます。要素は、型Vの値にマップすることもできます。これは要素のシャッフルの前に発生するため、パフォーマンスにプラスの影響を与える可能性があります。

最後に、同じキーを持つ要素は、ユーザー定義のReduceFunctorReduceFunction、またはCombinableReduceFunctionによって集約されます。これらは、受け取る引数の数と出力の解釈方法が異なります。ReduceFunctionは基本的に、要素のStreamを入力として受け取り、1つの集約結果を出力する関数です。ReduceFunctorは、Contextへのアクセスを許可する2番目のCollectorを受け取ります。CombinableReduceFunctionが提供されると、シャッフルの前に部分的な削減が行われるため、ネットワークを介して転送する必要があるデータ量が少なくなります。

次の例は、値の抽出を含むReduceByKey演算子の基本的な使用方法を示しています。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-counts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(Stream::count)
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

次に、カウンターを使用してReduceByKeyの内部を追跡したいとします。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1)
    .reduceBy(
      (Stream<Integer> s, Collector<Long> collector) -> {
        collector.collect(s.count());
        collector.asContext().getCounter("num-of-keys").increment();
      })
      .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

最適化された組み合わせ可能な出力を使用した、同じ例です。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will e used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
    .output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
提供されたCombinableReduceFunctionは、真に組み合わせ可能にするために、結合的かつ交換可能である必要があります。したがって、シャッフルの前に部分的な結果を計算し、その後部分的な結果を1つにマージするために使用できます。そのため、前の例とは異なり、この例では単純なStream::countは機能しません。

Euphoriaは、コードの記述と読み取りを容易にすることを目指しています。そのため、Foldまたは折りたたみ関数形式で組み合わせ可能なreduce関数を記述するためのサポートが既に用意されています。これにより、ユーザーは削減ロジック(BinaryFunction)のみを提供し、そこからCombinableReduceFunctionを作成できます。提供されたBinaryFunctionは、結合的である必要があります。

//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
  ReduceByKey.named("to-letters-couts")
    .of(animals)
    .keyBy(String::length) // length of animal name will be used as grouping key
    // we need to count each animal name once, so why not to optimize each string to 1
    .valueBy(e -> 1L)
    .combineBy(Fold.of((l1, l2) -> l1 + l2))
    .output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]

ReduceWindow

すべての要素をウィンドウで削減します。この演算子は、すべての要素に対して同じキーを持つReduceByKeyに対応するため、実際のキーはウィンドウによってのみ定義されます。

//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();

PCollection<Integer> output =
  ReduceWindow.of(withEventTime)
    .combineBy(Fold.of((i1, i2) -> i1 + i2))
    .windowBy(FixedWindows.of(Duration.millis(5000)))
    .triggeredBy(DefaultTrigger.of())
    .discardingFiredPanes()
    .output();
//output will contain: [ 10, 26 ]

SumByKey

同じキーを持つ要素の合計。入力データセットを、指定されたキー抽出器(UnaryFunction)によってキーにマップする必要があります。値抽出器(Longに出力するUnaryFunction)によって値にマップします。これらの値は、キーごとにグループ化され、合計されます。出力はKV<K, Long>Kはキー型)として出力され、各KVにはキーと入力データセットのキーに対する要素数が含まれます。

//suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
PCollection<KV<Integer, Long>> output =
  SumByKey.named("sum-odd-and-even")
    .of(input)
    .keyBy(e -> e % 2)
    .valueBy(e -> (long) e)
    .output();
// output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]

Union

要素の順序に関する保証がない、同じ型の少なくとも2つのデータセットのマージ。

//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
  Union.named("to-animals")
    .of(cats, rodents)
    .output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"

TopPerKey

キーごとに1つのトップランクの要素を出力します。型Kのキーは、指定されたUnaryFunctionによって抽出されます。別のUnaryFunction抽出器により、入力要素を型Vの値に変換できます。トップ要素の選択は、スコア計算機と呼ばれるユーザーが提供したUnaryFunctionから取得した*スコア*に基づいています。スコア型はScoreTとして示され、Comparable<ScoreT>を拡張する必要があるため、2つの要素のスコアを直接比較できます。出力データセット要素の型はTriple<K, V, ScoreT>です。

// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
  TopPerKey.named("longest-animal-names")
    .of(animals)
    .keyBy(name -> name.charAt(0)) // first character is the key
    .valueBy(UnaryFunction.identity()) // value type is the same as input element type
    .scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
    .output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKeyはシャッフル演算子であるため、ウィンドウを定義できます。

AssignEventTime

ウィンドウ処理が適用されると、Euphoriaは要素からタイムスタンプを抽出する方法を知る必要があります。AssignEventTimeは、指定されたExtractEventTime関数の実装を介してEuphoriaにその方法を伝えます。

// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
  AssignEventTime.named("extract-event-time")
    .of(events)
    .using(SomeEventObject::getEventTimeInMillis)
    .output();
//Euphoria will now know event time for each event

変換

Euphoria APIは、Beam Java SDKの上に構築されています。APIは、バックグラウンドでBeamのPTransformsに透過的に変換されます。

Euphoria APIがBeam Java SDKに変換されるという事実は、変換自体を微調整するオプションを提供します。Operatorの変換は、OperatorTranslatorの実装を介して実現されます。Euphoriaは、どのトランスレータを使用するべきかを決定するためにTranslationProviderを使用します。Euphoria APIのユーザーは、EuphoriaOptionsを拡張することにより、TranslationProviderを介して独自のOperatorTranslatorを提供できます。Euphoriaには、いくつかの便利な実装が既に含まれています。

変換プロバイダー

GenericTranslatorProvider

一般的なTranslationProvider。3つの異なる方法でOperatorTranslatorの登録を許可します。

GenericTranslatorProvider.newBuilder()
  .register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
  .register(
    Join.class,
    (Join op) -> {
      String name = ((Optional<String>) op.getName()).orElse("");
      return name.toLowerCase().startsWith("broadcast");
    },
    new BroadcastHashJoinTranslator<>()) // register by class and predicate
  .register(
    op -> op instanceof CompositeOperator,
    new CompositeOperatorTranslator<>()) // register by predicate only
  .build();

GenericTranslatorProviderはデフォルトのプロバイダーです。GenericTranslatorProvider.createWithDefaultTranslators()を参照してください。

CompositeProvider

指定された順序でTranslationProviderのチェーンを実装します。これにより、ユーザー定義のTranslationProviderをEuphoria APIによって既に提供されているものと組み合わせることができます。

CompositeProvider.of(
  CustomTranslatorProvider.of(), // first ask CustomTranslatorProvider for translator
  GenericTranslatorProvider.createWithDefaultTranslators()); // then ask default provider if needed

演算子トランスレータ

OperatorはJava Beam SDKに変換する必要があります。これは、OperatorTranslatorの実装によって行われます。Euphoria APIには、それに付属するすべてのOperator実装に対するトランスレータが含まれています。一部の演算子には、場合によっては適切な代替変換がある可能性があります。Joinには通常、多くの実装があります。ここでは、最も興味深いものだけを説明します。

BroadcastHashJoinTranslator

一方の側のデータセット全体がターゲットエグゼキュータのメモリに収まる場合、LeftJoinRightJoinを変換できます。したがって、Beamのサイド入力を使用して分散できます。その結果、パフォーマンスが向上します。

CompositeOperatorTranslator

一部の演算子は複合型です。つまり、実際には他の演算子のラップされたチェーンであることを意味します。CompositeOperatorTranslatorは、変換プロセス中にそれらが要素演算子に分解されることを保証します。

詳細

変換の大部分は、org.apache.beam.sdk.extensions.euphoria.core.translateパッケージで行われます。最も興味深いクラスは次のとおりです。

このパッケージには、サポートされている各演算子型(JoinTranslatorFlatMapTranslatorReduceByKeyTranslator)のOperatorTranslatorの実装も含まれています。すべての演算子に独自のトランスレータが必要なわけではありません。それらのいくつかは、他の演算子から構成できます。そのため、演算子はCompositeOperatorを実装して、他のEuphoria演算子のセットに展開されるオプションを与えることができます。

変換プロセスは、柔軟性を考慮して設計されました。上位レベルのEuphoria演算子をBeamのSDKプリミティブに変換するさまざまな方法を許可することを目指しました。これにより、ユーザーの選択に基づいて、または自動的に取得されたデータに関する知識に基づいて、パフォーマンスをさらに最適化できます。

サポートされていない機能

元のEuphoriaには、Beamポートではまだサポートされていない機能と演算子が含まれていました。まだサポートされていない機能のリストを以下に示します。