SparkReceiver IO
SparkReceiverIOは、Apache Spark Receiverから無制限のソースとしてデータを読み取るためのトランスフォームです。
Spark Receiverのサポート
SparkReceiverIO
は現在、Apache Spark Receiverをサポートしています。
Spark Receiver
の要件
- Sparkのバージョンは2.4.*である必要があります。
Spark Receiver
はオフセットを使用した作業をサポートする必要があります。Spark Receiver
はHasOffsetインターフェースを実装する必要があります。- レコードには、レコードオフセットを表す数値フィールドが必要です。
詳細については、SparkReceiverIOのREADMEを参照してください。
SparkReceiverIOを使用したストリーミング読み取り
Spark Receiver
から読み取るには、以下を渡す必要があります。
getOffsetFn
:レコードからLong
型のレコードオフセットを取得する方法を定義するSerializableFunction
。receiverBuilder
:Spark環境ではなくApache Beamメカニズムを使用するSpark Receiver
のインスタンスを構築するために必要です。
以下のパラメータを渡すことで、receiverBuilder
オブジェクトを簡単に作成できます。
Spark Receiver
のクラス。Spark Receiver
のインスタンスを作成するために必要なコンストラクタ引数。
例:
//In this example, MyReceiver accepts a MyConfig object as its only constructor parameter.
MyConfig myPluginConfig = new MyConfig(authToken, apiServerUrl);
Object[] myConstructorArgs = new Object[] {myConfig};
ReceiverBuilder<String, MyReceiver<String>> myReceiverBuilder =
new ReceiverBuilder<>(MyReceiver.class)
.withConstructorArgs(myConstructorArgs);
その後、このreceiverBuilder
オブジェクトをSparkReceiverIO
に渡すことができます。
例:
オプションのパラメータを使用したデータの読み取り
必要に応じて、以下のオプションのパラメータを渡すことができます。
pullFrequencySec
:新しいレコードの更新をポーリングする間の遅延時間(秒単位)。startOffset
:読み取りを開始するインクルード開始オフセット。timestampFn
:レコードからInstant
型のタイムスタンプを取得する方法を定義するSerializableFunction
。
例:
特定のSpark Receiverの例
CDAP Hubspot Receiver
ReceiverBuilder<String, HubspotReceiver<String>> hubspotReceiverBuilder =
new ReceiverBuilder<>(HubspotReceiver.class)
.withConstructorArgs(hubspotConfig);
SparkReceiverIO.Read<String> readTransform =
SparkReceiverIO.<String>read()
.withGetOffsetFn(GetOffsetUtils.getOffsetFnForHubspot())
.withSparkReceiverBuilder(hubspotReceiverBuilder)
p.apply("readFromHubspotReceiver", readTransform);
最終更新日:2024/10/31
お探しのものが見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!