CDAP IO
CdapIO は、ソースからデータを読み取ったり、シンク CDAP プラグインにデータを書き込んだりする変換です。
バッチプラグインのサポート
CdapIO は現在、CDAPプラグインのクラス名を参照することで、以下のCDAPバッチプラグインをサポートしています。
また、HadoopのInputFormatまたはOutputFormatに基づく他のCDAPバッチプラグインも使用できます。クラス名でサポートされているプラグインのリストに簡単に追加できます。詳細はCdapIOのREADMEを参照してください。
ストリーミングプラグインのサポート
CdapIO は現在、Apache Spark Receiverに基づくCDAPストリーミングプラグインをサポートしています。
CDAPストリーミングプラグインの要件
- CDAPストリーミングプラグインは、
Spark Receiver(Spark 2.4)に基づいている必要があります。 - CDAPストリーミングプラグインは、オフセットを使用した動作をサポートする必要があります。
- 対応するSpark Receiverは、HasOffsetインターフェースを実装する必要があります。
- レコードには、レコードオフセットを表す数値フィールドが必要です。
CdapIOを使用したバッチ読み込み
CDAPプラグインから読み取るには、以下を渡す必要があります。
KeyクラスとValueクラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。- 特定のCDAPプラグインのパラメータを持つ
PluginConfigオブジェクト。
以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。
- 必要な
PluginConfigのクラス。 - 対応するCDAPプラグインの
Map<String, Object>パラメータマップ。
例えば
Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();プラグインのクラス名でデータを読み込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
バッチプラグインを構築してデータを読み込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。
- CDAPバッチプラグインのクラス。
- 選択したCDAPプラグインに接続するために使用される
InputFormatクラス。 InputFormatを提供するために使用されるInputFormatProviderクラス。
その後、このPluginオブジェクトをCdapIOに渡すことができます。
例えば
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyInputFormat.class,
MyInputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class);
p.apply("read", readTransform);特定のCDAPプラグインの例
CDAP Hubspotバッチソースプラグイン
SourceHubspotConfig pluginConfig =
new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
CdapIO.<NullWritable, JsonElement>read()
.withCdapPluginClass(HubspotBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);CDAP Salesforceバッチソースプラグイン
SalesforceSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
CdapIO.<Schema, LinkedHashMap>read()
.withCdapPluginClass(SalesforceBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(Schema.class)
.withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);CDAP ServiceNowバッチソースプラグイン
ServiceNowSourceConfig pluginConfig =
new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ServiceNowSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);CDAP Zendeskバッチソースプラグイン
ZendeskBatchSourceConfig pluginConfig =
new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
CdapIO.<NullWritable, StructuredRecord>read()
.withCdapPluginClass(ZendeskBatchSource.class)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);詳細については、完全な例をご覧ください。
CdapIOを使用したバッチ書き込み
CDAPプラグインに書き込むには、以下を渡す必要があります。
KeyクラスとValueクラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。locksDirPath。これは、ロックが格納されるロックディレクトリパスです。このパラメータは、Hadoop External Synchronization(書き込みジョブに関連するロックを取得するためのメカニズム)に必要です。- 特定のCDAPプラグインのパラメータを持つ
PluginConfigオブジェクト。
以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。
- 必要な
PluginConfigのクラス。 - 対応するCDAPプラグインの
Map<String, Object>パラメータマップ。
例えば
プラグインのクラス名でデータを書き込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
バッチプラグインを構築してデータを書き込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。
- CDAPプラグインのクラス。
- 選択したCDAPプラグインに接続するために使用される
OutputFormatクラス。 OutputFormatを提供するために使用されるOutputFormatProviderクラス。
その後、このPluginオブジェクトをCdapIOに渡すことができます。
例えば
CdapIO.Write<String, String> writeTransform =
CdapIO.<String, String>write()
.withCdapPlugin(
Plugin.createBatch(
MyCdapPlugin.class,
MyOutputFormat.class,
MyOutputFormatProvider.class))
.withPluginConfig(pluginConfig)
.withKeyClass(String.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);特定のCDAPプラグインの例
CDAP Hubspotバッチシンクプラグイン
SinkHubspotConfig pluginConfig =
new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
CdapIO.<NullWritable, String>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);CDAP Salesforceバッチシンクプラグイン
SalesforceSinkConfig pluginConfig =
new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
CdapIO.<NullWritable, CSVRecord>write()
.withCdapPluginClass(pluginClass)
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(CSVRecord.class)
.withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);詳細については、完全な例をご覧ください。
CdapIOを使用したストリーミング読み込み
CDAPプラグインから読み取るには、以下を渡す必要があります。
KeyクラスとValueクラス。これらのクラスにBeam Coderが利用可能かどうかを確認する必要があります。- 特定のCDAPプラグインのパラメータを持つ
PluginConfigオブジェクト。
以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。
- 必要な
PluginConfigのクラス。 - 対応するCDAPプラグインの
Map<String, Object>パラメータマップ。
例えば
プラグインのクラス名でデータを読み込む
一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。
例えば
ストリーミングプラグインを構築してデータを読み込む
CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。
- CDAPストリーミングプラグインのクラス。
getOffsetFn。これは、レコードからLongレコードオフセットを取得する方法を定義するSerializableFunctionです。receiverClass。これは、CDAPプラグインに関連付けられたSpark(v 2.4)Receiverクラスです。- (オプション)
getReceiverArgsFromConfigFn。これは、PluginConfigオブジェクトを使用してSparkReceiverのコンストラクタ引数を取得する方法を定義するSerializableFunctionです。
その後、このPluginオブジェクトをCdapIOに渡すことができます。
例えば
CdapIO.Read<String, String> readTransform =
CdapIO.<String, String>read()
.withCdapPlugin(
Plugin.createStreaming(
MyStreamingPlugin.class,
myGetOffsetFn,
MyReceiver.class,
myGetReceiverArgsFromConfigFn))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("read", readTransform);オプションパラメータを使用してデータを読み込む
必要に応じて、次のオプションパラメータを渡すことができます。
pullFrequencySecは、新しいレコードの更新をポーリングする間隔(秒単位)です。startOffsetは、読み取りを開始する開始オフセット(包含)です。
例えば
特定のCDAPプラグインの例
CDAP Hubspotストリーミングソースプラグイン
HubspotStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
HubspotStreamingSource.class,
GetOffsetUtils.getOffsetFnForHubspot(),
HubspotReceiver.class))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);CDAP Salesforceストリーミングソースプラグイン
SalesforceStreamingSourceConfig pluginConfig =
new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
.withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
CdapIO.<NullWritable, String>read()
.withCdapPlugin(
Plugin.createStreaming(
SalesforceStreamingSource.class,
GetOffsetUtils.getOffsetFnForSalesforce(),
SalesforceReceiver.class,
config -> {
SalesforceStreamingSourceConfig salesforceConfig =
SalesforceStreamingSourceConfig) config;
return new Object[] {
salesforceConfig.getAuthenticatorCredentials(),
salesforceConfig.getPushTopicName()
};
}))
.withPluginConfig(pluginConfig)
.withKeyClass(NullWritable.class)
.withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);詳細については、完全な例をご覧ください。
最終更新日: 2024/10/31
お探しのものは見つかりましたか?
すべて役に立ち、明確でしたか?変更したい点はありますか?ぜひお知らせください!

