Euphoria Java 8 DSL
Euphoriaとは
BeamのJava SDK上に構築された使いやすいJava 8 API。APIはデータ変換の高レベルな抽象化を提供し、Java 8の言語機能(例:ラムダ式とストリーム)に焦点を当てています。既存のBeam SDKと完全に相互運用可能であり、相互に変換できます。(オプションの)Kryoベースのコーダー、ラムダ式、高レベル演算子の使用による迅速なプロトタイピングを可能にし、既存のBeam Pipelines
にシームレスに統合できます。
Euphoria APIプロジェクトは2014年に開始され、Seznam.czのデータインフラストラクチャのための主要な構成要素を提供するという明確な目標がありました。2015年、DataFlowホワイトペーパーに触発されたオリジナルの作者は、一歩進んでストリーム処理とバッチ処理の両方のための統一されたAPIを提供することにしました。このAPIは2016年にオープンソース化され、現在も積極的に開発されています。Beamのコミュニティ目標は非常に似ていたため、Beam Java SDKの上位DSLとしてAPIを提供し、コミュニティと努力を共有することにしました。
Euphoria DSLの統合はまだ進行中です。BEAM-3900として追跡されています。
WordCount例
簡単な例から始めましょう。
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);
// Use Kryo as coder fallback
KryoCoderProvider.of().registerTo(pipeline);
// Source of data loaded from Beam IO.
PCollection<String> input =
pipeline
.apply(Create.of(textLineByLine))
.setTypeDescriptor(TypeDescriptor.of(String.class));
// zero, one, or more output elements. From input lines we will get data set of words.
PCollection<String> words =
FlatMap.named("TOKENIZER")
.of(lines)
.using(
(String line, Collector<String> context) -> {
for (String word : Splitter.onPattern("\\s+").split(line)) {
context.collect(word);
}
})
.output();
// Now we can count input words - the operator ensures that all values for the same
// key (word in this case) end up being processed together. Then it counts number of appearances
// of the same key in 'words' PCollection and emits it to output.
PCollection<KV<String, Long>> counted =
CountByKey.named("COUNT")
.of(words)
.keyBy(w -> w)
.output();
// Format output.
PCollection<String> output =
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getKey() + ": " + p.getValue())
.output();
// Now we can again use Beam transformation. In this case we save words and their count
// into the text file.
output
.apply(TextIO.write()
.to("counted_words"));
pipeline.run();
Euphoriaガイド
Euphoria APIは、アプリケーションのニーズに応じてPipeline
を構築できる一連の演算子で構成されています。
入力と出力
入力データは、BeamのIOを介してPCollection
に供給できます。Beamの場合と同じ方法です。
演算子の追加
Euphoria APIの真の力は、演算子スイートにあります。各演算子は1つ以上の入力を使用し、1つの出力PCollection
を生成します。簡単なMapElements
の例を見てみましょう。
input
を使用し、指定されたラムダ式(String::valueOf
)をinput
の各要素に適用し、マッピングされたPCollection
を返します。開発者は演算子の作成時に一連の手順に従うため、演算子の宣言は簡単です。演算子の構築を開始するには、その名前と「.」を入力します。(ドット)。IDEがヒントを表示します。任意の演算子を構築する最初の手順は、named()
メソッドを使用して名前を付けることです。この名前はシステムを通じて伝播され、後でデバッグに使用できます。
コーダーと型
BeamのJava SDKでは、要素を具体化する手段として、カスタム要素型にCoder
を供給する必要があります。Euphoriaでは、シリアライゼーションの方法としてKryoを使用できます。Kryoは:sdks:java:extensions:kryo
モジュールにあります。
//gradle
dependencies {
compile "org.apache.beam:sdks:java:extensions:kryo:${beam.version}"
}
//maven
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-kryo</artifactId>
<version>${beam.version}</version>
</dependency>
必要なのは、KryoCoderProvider
を作成し、それをPipeline
に登録することだけです。これを行うには2つの方法があります。
プロトタイピング時には、コーダーをあまり気にしない場合、Kryoへのクラス登録なしでKryoCoderProvider
を作成できます。
KryoCoderProvider
は、プリミティブ以外のすべての要素型に対してKryoCoder
を返します。もちろん、Kryoは未知の型のインスタンスを効果的にシリアライズできないため、パフォーマンスが低下します。しかし、パイプライン開発の速度は向上します。この動作はデフォルトで有効になっており、Pipeline
をKryoOptions
で作成する際に無効にできます。2つ目のよりパフォーマンスに優れた方法は、Kryoでシリアライズするすべての型を登録することです。独自のKryoシリアライザも登録するのも良いアイデアです。Euphoriaでは、独自のKryoRegistrar
を実装し、KryoCoderProvider
を作成する際にそれを使用することで、これを行うことができます。
TypeDescriptor
を供給するオプションの方法があります。TypeDescriptors
を供給しない場合、Euphoria演算子はTypeDescriptor<Object>
を使用します。そのため、KryoOptions
で許可されている場合、KryoCoderProvider
は未知の型のすべての要素に対してKryoCoder<Object>
を返す可能性があります。.setKryoRegistrationRequired(true)
を使用する場合は、TypeDescriptors
の供給が必須になります。メトリクスとアキュムレータ
ジョブの内部に関する統計は、分散ジョブの開発中に非常に役立ちます。Euphoriaでは、これらをアキュムレータと呼びます。これらは、環境Context
を介してアクセスできます。これは、Collector
で作業する際に取得できるものです。通常、演算子からゼロから多数の出力要素が期待される場合に存在します。たとえば、FlatMap
の場合です。
MapElements
は、UnaryFunctor
の代わりにUnaryFunctionEnv
(2番目のコンテキスト引数を追加)の実装を提供することで、Context
へのアクセスも許可します。ウィンドウ処理
Euphoriaは、Beam Java SDKと同じウィンドウ処理の原則に従います。すべてのシャッフル演算子(ネットワークを介してデータをシャッフルする必要がある演算子)で設定できます。Beamと同じパラメータが必要です。WindowFn
、Trigger
、WindowingStrategy
など。ユーザーは、演算子の構築時に、すべての必須パラメータといくつかのオプションパラメータを設定するか、何も設定しないかのいずれかを選択するように促されます。ウィンドウ処理はPipeline
を通じて伝播されます。
PCollection<KV<Integer, Long>> countedElements =
CountByKey.of(input)
.keyBy(e -> e)
.windowBy(FixedWindows.of(Duration.standardSeconds(1)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(5))
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.output();
Euphoriaの入手方法
Euphoriaは、Apache Beamプロジェクトのdsl-euphoria
ブランチ、beam-sdks-java-extensions-euphoria
モジュールにあります。euphoria
サブプロジェクトをビルドするには、以下を実行します。
./gradlew beam-sdks-java-extensions-euphoria:build
演算子リファレンス
演算子は基本的に高レベルのデータ変換であり、データ処理ジョブのビジネスロジックを簡単に構築できます。すべてのEuphoria演算子は、このセクションで例を含めて説明されています。簡潔にするため、ウィンドウ処理が適用された例はありません。ウィンドウ処理のセクションを参照して詳細を確認してください。
CountByKey
同じキーを持つ要素をカウントします。入力データセットを、指定されたキー抽出器(UnaryFunction
)によってキーにマッピングする必要があります。その後、キーがカウントされます。出力はKV<K, Long>
(K
はキー型)として出力され、各KV
にはキーと、そのキーに対する入力データセット内の要素数が含まれています。
Distinct
異なる(equalsメソッドに基づく)要素を出力します。要素を出力型にマッピングするオプションのUnaryFunction
マッパーパラメータを受け入れます。
Distinct
。Join
指定されたキーで2つの(左と右の)データセットの内部結合を表し、新しいデータセットを生成します。キーは個別の抽出器によって両方のデータセットから抽出されるため、左と右の要素の型は、LeftT
とRightT
として表される異なる型にすることができます。結合自体は、同じキーを共有する両方のデータセットから要素を受け取るユーザーが提供したBinaryFunctor
によって実行されます。そして、結合の結果(OutputT
)を出力します。この演算子は、KV<K, OutputT>
型の出力データセットを出力します。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
Join.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using((Integer l, String r, Collector<String> c) -> c.collect(l + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),
// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
2 つのデータセット(左と右)を指定されたキーに基づいて左外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftT
と RightT
として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor
によって実行され、両方のデータセットから1 つの要素(右側はオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT
)を出力します。この演算子は、KV<K, OutputT>
型の出力データセットを出力します。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
LeftJoin.named("left-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Integer l, Optional<String> r, Collector<String> c) ->
c.collect(l + "+" + r.orElse(null)))
.output();
// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),
// KV(3, "3+rat"), KV(1, "1+X")]
LeftJoin
に対して「BroadcastHashJoin」と呼ばれるパフォーマンス最適化をサポートしています。ブロードキャスト結合は、一方のデータセットがメモリに収まる2 つのデータセットを結合する場合に非常に効率的です(LeftJoin
では、右側のデータセットがメモリに収まる必要があります)。「Broadcast Hash Join」の使用方法については、翻訳セクションを参照してください。RightJoin
2 つのデータセット(左と右)を指定されたキーに基づいて右外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftT
と RightT
として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor
によって実行され、両方のデータセットから1 つの要素(左側はオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT
)を出力します。この演算子は、KV<K, OutputT>
型の出力データセットを出力します。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
RightJoin.named("right-join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, String r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r))
.output();
// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),
// KV(8, "null+elephant"), KV(5, "null+mouse")]
RightJoin
に対して「BroadcastHashJoin」と呼ばれるパフォーマンス最適化をサポートしています。ブロードキャスト結合は、一方のデータセットがメモリに収まる2 つのデータセットを結合する場合に非常に効率的です(RightJoin
では、左側のデータセットがメモリに収まる必要があります)。「Broadcast Hash Join」の使用方法については、翻訳セクションを参照してください。FullJoin
2 つのデータセット(左と右)を指定されたキーに基づいて完全外部結合し、1 つの新しいデータセットを生成します。キーは、個別の抽出器によって両方のデータセットから抽出されるため、左側の要素と右側の要素は、LeftT
と RightT
として示される異なる型を持つことができます。結合自体は、ユーザーが提供する BinaryFunctor
によって実行され、両方のデータセットから1 つの要素(両方がオプションで存在し、同じキーを共有します)を消費し、結合の結果(OutputT
)を出力します。この演算子は、KV<K, OutputT>
型の出力データセットを出力します。
// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]
// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, String>> joined =
FullJoin.named("join-length-to-words")
.of(left, right)
.by(le -> le, String::length) // key extractors
.using(
(Optional<Integer> l, Optional<String> r, Collector<String> c) ->
c.collect(l.orElse(null) + "+" + r.orElse(null)))
.output();
// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),
// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),
// KV(1, "null+elephant"), KV(5, "null+mouse")]
MapElements
入力型InputT
の1つの入力要素を、別の(同じ可能性のある)OutputT
型の1つの出力要素に変換します。変換は、ユーザーが指定したUnaryFunction
によって実行されます。
FlatMap
入力型InputT
の1つの入力要素を、別の(同じ可能性のある)OutputT
型の0個以上の出力要素に変換します。変換は、ユーザーが指定したUnaryFunctor
によって実行され、Collector<OutputT>
を使用して出力要素を出力します。常に1つの要素しか出力できないMapElements
との類似性に注意してください。
// suppose words contain: ["Brown", "fox", ".", ""]
PCollection<String> letters =
FlatMap.named("str2char")
.of(words)
.using(
(String s, Collector<String> collector) -> {
for (int i = 0; i < s.length(); i++) {
char c = s.charAt(i);
collector.collect(String.valueOf(c));
}
})
.output();
// characters will contain: ["B", "r", "o", "w", "n", "f", "o", "x", "."]
FlatMap
を使用して要素のタイムスタンプを決定できます。これは、構築時にExtractEventTime
タイム抽出器の実装を提供することで行われます。要素にタイムスタンプを割り当てるための特別なAssignEventTime
演算子があります。これを使用することを検討してください。コードの可読性が向上する可能性があります。// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
FlatMap.named("extract-event-time")
.of(events)
.using( (SomeEventObject e, Collector<SomeEventObject> c) -> c.collect(e))
.eventTimeBy(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
Filter
Filter
は、指定された条件を満たさないすべての要素を破棄します。条件は、ユーザーがUnaryPredicate
の実装として提供します。入力要素と出力要素は同じ型です。
ReduceByKey
ユーザーが提供するreduce関数を使用して、同じキーを持つInputT
型要素の集約を実行します。キーは、入力要素を受け取り、型K
のキーを出力するUnaryFunction
を介して各要素から抽出されます。要素は、型V
の値にマップすることもできます。これは要素のシャッフルの前に発生するため、パフォーマンスにプラスの影響を与える可能性があります。
最後に、同じキーを持つ要素は、ユーザー定義のReduceFunctor
、ReduceFunction
、またはCombinableReduceFunction
によって集約されます。これらは、受け取る引数の数と出力の解釈方法が異なります。ReduceFunction
は基本的に、要素のStream
を入力として受け取り、1つの集約結果を出力する関数です。ReduceFunctor
は、Context
へのアクセスを許可する2番目のCollector
を受け取ります。CombinableReduceFunction
が提供されると、シャッフルの前に部分的な削減が行われるため、ネットワークを介して転送する必要があるデータ量が少なくなります。
次の例は、値の抽出を含むReduceByKey
演算子の基本的な使用方法を示しています。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-counts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(Stream::count)
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
次に、カウンターを使用してReduceByKey
の内部を追跡したいとします。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1)
.reduceBy(
(Stream<Integer> s, Collector<Long> collector) -> {
collector.collect(s.count());
collector.asContext().getCounter("num-of-keys").increment();
})
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
最適化された組み合わせ可能な出力を使用した、同じ例です。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLength =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will e used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(s -> s.mapToLong(l -> l).sum()) //Stream::count will not be enough
.output();
// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
CombinableReduceFunction
は、真に組み合わせ可能にするために、結合的かつ交換可能である必要があります。したがって、シャッフルの前に部分的な結果を計算し、その後部分的な結果を1つにマージするために使用できます。そのため、前の例とは異なり、この例では単純なStream::count
は機能しません。Euphoriaは、コードの記述と読み取りを容易にすることを目指しています。そのため、Fold
または折りたたみ関数形式で組み合わせ可能なreduce関数を記述するためのサポートが既に用意されています。これにより、ユーザーは削減ロジック(BinaryFunction
)のみを提供し、そこからCombinableReduceFunction
を作成できます。提供されたBinaryFunction
は、結合的である必要があります。
//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]
PCollection<KV<Integer, Long>> countOfAnimalNamesByLenght =
ReduceByKey.named("to-letters-couts")
.of(animals)
.keyBy(String::length) // length of animal name will be used as grouping key
// we need to count each animal name once, so why not to optimize each string to 1
.valueBy(e -> 1L)
.combineBy(Fold.of((l1, l2) -> l1 + l2))
.output();
// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]
ReduceWindow
すべての要素をウィンドウで削減します。この演算子は、すべての要素に対して同じキーを持つReduceByKey
に対応するため、実際のキーはウィンドウによってのみ定義されます。
//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]
//lets assign time-stamp to each input element
PCollection<Integer> withEventTime = AssignEventTime.of(input).using(i -> 1000L * i).output();
PCollection<Integer> output =
ReduceWindow.of(withEventTime)
.combineBy(Fold.of((i1, i2) -> i1 + i2))
.windowBy(FixedWindows.of(Duration.millis(5000)))
.triggeredBy(DefaultTrigger.of())
.discardingFiredPanes()
.output();
//output will contain: [ 10, 26 ]
SumByKey
同じキーを持つ要素の合計。入力データセットを、指定されたキー抽出器(UnaryFunction
)によってキーにマップする必要があります。値抽出器(Long
に出力するUnaryFunction
)によって値にマップします。これらの値は、キーごとにグループ化され、合計されます。出力はKV<K, Long>
(K
はキー型)として出力され、各KV
にはキーと入力データセットのキーに対する要素数が含まれます。
Union
要素の順序に関する保証がない、同じ型の少なくとも2つのデータセットのマージ。
//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]
//suppose rodents contains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]
PCollection<String> animals =
Union.named("to-animals")
.of(cats, rodents)
.output();
// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"
TopPerKey
キーごとに1つのトップランクの要素を出力します。型K
のキーは、指定されたUnaryFunction
によって抽出されます。別のUnaryFunction
抽出器により、入力要素を型V
の値に変換できます。トップ要素の選択は、スコア計算機と呼ばれるユーザーが提供したUnaryFunction
から取得した*スコア*に基づいています。スコア型はScoreT
として示され、Comparable<ScoreT>
を拡張する必要があるため、2つの要素のスコアを直接比較できます。出力データセット要素の型はTriple<K, V, ScoreT>
です。
// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]
PCollection<Triple<Character, String, Integer>> longestNamesByLetter =
TopPerKey.named("longest-animal-names")
.of(animals)
.keyBy(name -> name.charAt(0)) // first character is the key
.valueBy(UnaryFunction.identity()) // value type is the same as input element type
.scoreBy(String::length) // length defines score, note that Integer implements Comparable<Integer>
.output();
//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]
TopPerKey
はシャッフル演算子であるため、ウィンドウを定義できます。AssignEventTime
ウィンドウ処理が適用されると、Euphoriaは要素からタイムスタンプを抽出する方法を知る必要があります。AssignEventTime
は、指定されたExtractEventTime
関数の実装を介してEuphoriaにその方法を伝えます。
// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp
PCollection<SomeEventObject> timeStampedEvents =
AssignEventTime.named("extract-event-time")
.of(events)
.using(SomeEventObject::getEventTimeInMillis)
.output();
//Euphoria will now know event time for each event
変換
Euphoria APIは、Beam Java SDKの上に構築されています。APIは、バックグラウンドでBeamのPTransforms
に透過的に変換されます。
Euphoria APIがBeam Java SDKに変換されるという事実は、変換自体を微調整するオプションを提供します。Operator
の変換は、OperatorTranslator
の実装を介して実現されます。Euphoriaは、どのトランスレータを使用するべきかを決定するためにTranslationProvider
を使用します。Euphoria APIのユーザーは、EuphoriaOptions
を拡張することにより、TranslationProvider
を介して独自のOperatorTranslator
を提供できます。Euphoriaには、いくつかの便利な実装が既に含まれています。
変換プロバイダー
GenericTranslatorProvider
一般的なTranslationProvider
。3つの異なる方法でOperatorTranslator
の登録を許可します。
- 演算子クラスによる演算子固有のトランスレータの登録。
- 演算子クラスと追加のユーザー定義述語による演算子固有のトランスレータの登録。
- ユーザー定義述語を使用した一般的な(1つの演算子型に固有ではない)トランスレータの登録。
GenericTranslatorProvider
は最初の適切なトランスレータを返すため、登録の順序が重要です。
GenericTranslatorProvider.newBuilder()
.register(FlatMap.class, new FlatMapTranslator<>()) // register by operator class
.register(
Join.class,
(Join op) -> {
String name = ((Optional<String>) op.getName()).orElse("");
return name.toLowerCase().startsWith("broadcast");
},
new BroadcastHashJoinTranslator<>()) // register by class and predicate
.register(
op -> op instanceof CompositeOperator,
new CompositeOperatorTranslator<>()) // register by predicate only
.build();
GenericTranslatorProvider
はデフォルトのプロバイダーです。GenericTranslatorProvider.createWithDefaultTranslators()
を参照してください。
CompositeProvider
指定された順序でTranslationProvider
のチェーンを実装します。これにより、ユーザー定義のTranslationProvider
をEuphoria APIによって既に提供されているものと組み合わせることができます。
演算子トランスレータ
各Operator
はJava Beam SDKに変換する必要があります。これは、OperatorTranslator
の実装によって行われます。Euphoria APIには、それに付属するすべてのOperator
実装に対するトランスレータが含まれています。一部の演算子には、場合によっては適切な代替変換がある可能性があります。Join
には通常、多くの実装があります。ここでは、最も興味深いものだけを説明します。
BroadcastHashJoinTranslator
一方の側のデータセット全体がターゲットエグゼキュータのメモリに収まる場合、LeftJoin
とRightJoin
を変換できます。したがって、Beamのサイド入力を使用して分散できます。その結果、パフォーマンスが向上します。
CompositeOperatorTranslator
一部の演算子は複合型です。つまり、実際には他の演算子のラップされたチェーンであることを意味します。CompositeOperatorTranslator
は、変換プロセス中にそれらが要素演算子に分解されることを保証します。
詳細
変換の大部分は、org.apache.beam.sdk.extensions.euphoria.core.translate
パッケージで行われます。最も興味深いクラスは次のとおりです。
OperatorTranslator
- EuphoriaからBeamへの変換の内部APIを定義するインターフェース。TranslatorProvider
- カスタムトランスレータを提供する方法。OperatorTransform
- Euphoriaの演算子をBeamのPTransform
に実際に変換および/または展開する役割を果たします。EuphoriaOptions
-PipelineOptions
であり、カスタムTranslatorProvider
の設定を許可します。
このパッケージには、サポートされている各演算子型(JoinTranslator
、FlatMapTranslator
、ReduceByKeyTranslator
)のOperatorTranslator
の実装も含まれています。すべての演算子に独自のトランスレータが必要なわけではありません。それらのいくつかは、他の演算子から構成できます。そのため、演算子はCompositeOperator
を実装して、他のEuphoria演算子のセットに展開されるオプションを与えることができます。
変換プロセスは、柔軟性を考慮して設計されました。上位レベルのEuphoria演算子をBeamのSDKプリミティブに変換するさまざまな方法を許可することを目指しました。これにより、ユーザーの選択に基づいて、または自動的に取得されたデータに関する知識に基づいて、パフォーマンスをさらに最適化できます。
サポートされていない機能
元のEuphoriaには、Beamポートではまだサポートされていない機能と演算子が含まれていました。まだサポートされていない機能のリストを以下に示します。
- 元のEuphoriaの
ReduceByKey
では、出力値(キーごと)をソートすることが許可されていました。これはBeamにもまだ変換できないため、サポートされていません。