CDAP IO
CdapIO
は、ソースからデータを読み取ったり、シンク CDAP プラグインにデータを書き込んだりする変換です。
バッチプラグインのサポート
CdapIO
は現在、CDAPプラグイン
のクラス名を参照することで、以下のCDAPバッチプラグインをサポートしています。
また、HadoopのInputFormat
またはOutputFormat
に基づく他のCDAPバッチプラグインも使用できます。クラス名でサポートされているプラグインのリストに簡単に追加できます。詳細はCdapIOのREADMEを参照してください。
ストリーミングプラグインのサポート
CdapIO
は現在、Apache Spark Receiverに基づくCDAPストリーミングプラグインをサポートしています。
CDAPストリーミングプラグインの要件
- CDAPストリーミングプラグインは、
Spark Receiver
(Spark 2.4)に基づいている必要があります。 - CDAPストリーミングプラグインは、オフセットを使用した動作をサポートする必要があります。
- 対応するSpark Receiverは、HasOffsetインターフェースを実装する必要があります。
- レコードには、レコードオフセットを表す数値フィールドが必要です。
CdapIOを使用したバッチ読み込み
CDAPプラグインから読み取るには、以下を渡す必要があります。
Key
クラスとValue
クラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。- 特定のCDAPプラグインのパラメータを持つ
PluginConfig
オブジェクト。
以下を指定することで、ConfigWrapper
クラスを使用してPluginConfig
オブジェクトを簡単に構築できます。
- 必要な
PluginConfig
のクラス。 - 対応するCDAPプラグインの
Map<String, Object>
パラメータマップ。
例えば
Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();
プラグインのクラス名でデータを読み込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
バッチプラグインを構築してデータを読み込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Plugin
オブジェクトを簡単に構築できます。
- CDAPバッチプラグインのクラス。
- 選択したCDAPプラグインに接続するために使用される
InputFormat
クラス。 InputFormat
を提供するために使用されるInputFormatProvider
クラス。
その後、このPlugin
オブジェクトをCdapIO
に渡すことができます。
例えば
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyInputFormat.class,
MyInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
p.apply("read", readTransform);
特定のCDAPプラグインの例
CDAP Hubspotバッチソースプラグイン
SourceHubspotConfig pluginConfig =
new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
CdapIO.<NullWritable, JsonElement>read()
.withCdapPluginClass(HubspotBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforceバッチソースプラグイン
SalesforceSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
CdapIO.<Schema, LinkedHashMap>read()
.withCdapPluginClass(SalesforceBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(Schema.class)
.withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);
CDAP ServiceNowバッチソースプラグイン
ServiceNowSourceConfig pluginConfig =
new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ServiceNowSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);
CDAP Zendeskバッチソースプラグイン
ZendeskBatchSourceConfig pluginConfig =
new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ZendeskBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);
詳細については、完全な例をご覧ください。
CdapIOを使用したバッチ書き込み
CDAPプラグインに書き込むには、以下を渡す必要があります。
Key
クラスとValue
クラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。locksDirPath
。これは、ロックが格納されるロックディレクトリパスです。このパラメータは、Hadoop External Synchronization(書き込みジョブに関連するロックを取得するためのメカニズム)に必要です。- 特定のCDAPプラグインのパラメータを持つ
PluginConfig
オブジェクト。
以下を指定することで、ConfigWrapper
クラスを使用してPluginConfig
オブジェクトを簡単に構築できます。
- 必要な
PluginConfig
のクラス。 - 対応するCDAPプラグインの
Map<String, Object>
パラメータマップ。
例えば
プラグインのクラス名でデータを書き込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
バッチプラグインを構築してデータを書き込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Plugin
オブジェクトを簡単に構築できます。
- CDAPプラグインのクラス。
- 選択したCDAPプラグインに接続するために使用される
OutputFormat
クラス。 OutputFormat
を提供するために使用されるOutputFormatProvider
クラス。
その後、このPlugin
オブジェクトをCdapIO
に渡すことができます。
例えば
CdapIO.Write<String, String> writeTransform =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyOutputFormat.class,
MyOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);
特定のCDAPプラグインの例
CDAP Hubspotバッチシンクプラグイン
SinkHubspotConfig pluginConfig =
new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
CdapIO.<NullWritable, String>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);
CDAP Salesforceバッチシンクプラグイン
SalesforceSinkConfig pluginConfig =
new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
CdapIO.<NullWritable, CSVRecord>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(CSVRecord.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);
詳細については、完全な例をご覧ください。
CdapIOを使用したストリーミング読み込み
CDAPプラグインから読み取るには、以下を渡す必要があります。
Key
クラスとValue
クラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。- 特定のCDAPプラグインのパラメータを持つ
PluginConfig
オブジェクト。
以下を指定することで、ConfigWrapper
クラスを使用してPluginConfig
オブジェクトを簡単に構築できます。
- 必要な
PluginConfig
のクラス。 - 対応するCDAPプラグインの
Map<String, Object>
パラメータマップ。
例えば
プラグインのクラス名でデータを読み込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
ストリーミングプラグインを構築してデータを読み込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Plugin
オブジェクトを簡単に構築できます。
- CDAPストリーミングプラグインのクラス。
getOffsetFn
。これは、レコードからLong
レコードオフセットを取得する方法を定義するSerializableFunction
です。receiverClass
。これは、CDAPプラグインに関連付けられたSpark(v 2.4)Receiver
クラスです。- (オプション)
getReceiverArgsFromConfigFn
。これは、PluginConfig
オブジェクトを使用してSparkReceiver
のコンストラクタ引数を取得する方法を定義するSerializableFunction
です。
その後、このPlugin
オブジェクトをCdapIO
に渡すことができます。
例えば
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
MyStreamingPlugin.class,
myGetOffsetFn,
MyReceiver.class,
myGetReceiverArgsFromConfigFn))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("read", readTransform);
オプションパラメータを使用してデータを読み込む
必要に応じて、次のオプションパラメータを渡すことができます。
pullFrequencySec
は、新しいレコードの更新をポーリングする間隔(秒単位)です。startOffset
は、読み取りを開始する開始オフセット(包含)です。
例えば
特定のCDAPプラグインの例
CDAP Hubspotストリーミングソースプラグイン
HubspotStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
HubspotStreamingSource.class,
GetOffsetUtils.getOffsetFnForHubspot(),
HubspotReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);
CDAP Salesforceストリーミングソースプラグイン
SalesforceStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
SalesforceStreamingSource.class,
GetOffsetUtils.getOffsetFnForSalesforce(),
SalesforceReceiver.class,
config -> {
SalesforceStreamingSourceConfig salesforceConfig =
SalesforceStreamingSourceConfig) config;
return new Object[] {
salesforceConfig.getAuthenticatorCredentials(),
salesforceConfig.getPushTopicName()
};
}))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);
詳細については、完全な例をご覧ください。
最終更新日: 2024/10/31
お探しのものは見つかりましたか?
すべて役に立ち、明確でしたか?変更したい点はありますか?ぜひお知らせください!