CDAP IO

CdapIO は、ソースからデータを読み取ったり、シンク CDAP プラグインにデータを書き込んだりする変換です。

バッチプラグインのサポート

CdapIO は現在、CDAPプラグインのクラス名を参照することで、以下のCDAPバッチプラグインをサポートしています。

また、HadoopのInputFormatまたはOutputFormatに基づく他のCDAPバッチプラグインも使用できます。クラス名でサポートされているプラグインのリストに簡単に追加できます。詳細はCdapIOのREADMEを参照してください。

ストリーミングプラグインのサポート

CdapIO は現在、Apache Spark Receiverに基づくCDAPストリーミングプラグインをサポートしています。

CDAPストリーミングプラグインの要件

CdapIOを使用したバッチ読み込み

CDAPプラグインから読み取るには、以下を渡す必要があります。

以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。

例えば

Map<String, Object> myPluginConfigParams = new HashMap<>();
// Read plugin parameters (e.g. from PipelineOptions) and put them into 'myPluginConfigParams' map.
myPluginConfigParams.put(MyPluginConstants.USERNAME_PARAMETER_NAME, pipelineOptions.getUsername());
// ...
MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(myPluginConfigParams).build();

プラグインのクラス名でデータを読み込む

一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。

例えば

CdapIO.Read<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("read", readTransform);

バッチプラグインを構築してデータを読み込む

CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。

その後、このPluginオブジェクトをCdapIOに渡すことができます。

例えば

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyInputFormat.class,
        MyInputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

特定のCDAPプラグインの例

CDAP Hubspotバッチソースプラグイン

SourceHubspotConfig pluginConfig =
  new ConfigWrapper<>(SourceHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, JsonElement> readTransform =
  CdapIO.<NullWritable, JsonElement>read()
    .withCdapPluginClass(HubspotBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(JsonElement.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforceバッチソースプラグイン

SalesforceSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<Schema, LinkedHashMap> readTransform =
  CdapIO.<Schema, LinkedHashMap>read()
    .withCdapPluginClass(SalesforceBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(Schema.class)
    .withValueClass(LinkedHashMap.class);
p.apply("readFromSalesforcePlugin", readTransform);

CDAP ServiceNowバッチソースプラグイン

ServiceNowSourceConfig pluginConfig =
  new ConfigWrapper<>(ServiceNowSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ServiceNowSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromServiceNowPlugin", readTransform);

CDAP Zendeskバッチソースプラグイン

ZendeskBatchSourceConfig pluginConfig =
  new ConfigWrapper<>(ZendeskBatchSourceConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, StructuredRecord> readTransform =
  CdapIO.<NullWritable, StructuredRecord>read()
    .withCdapPluginClass(ZendeskBatchSource.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(StructuredRecord.class);
p.apply("readFromZendeskPlugin", readTransform);

詳細については、完全な例をご覧ください。

CdapIOを使用したバッチ書き込み

CDAPプラグインに書き込むには、以下を渡す必要があります。

以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。

例えば

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

プラグインのクラス名でデータを書き込む

一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。

例えば

CdapIO.Write<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(HubspotBatchSink.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

バッチプラグインを構築してデータを書き込む

CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。

その後、このPluginオブジェクトをCdapIOに渡すことができます。

例えば

CdapIO.Write<String, String> writeTransform =
  CdapIO.<String, String>write()
    .withCdapPlugin(
      Plugin.createBatch(
        MyCdapPlugin.class,
        MyOutputFormat.class,
        MyOutputFormatProvider.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(String.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("write", writeTransform);

特定のCDAPプラグインの例

CDAP Hubspotバッチシンクプラグイン

SinkHubspotConfig pluginConfig =
  new ConfigWrapper<>(SinkHubspotConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, String> writeTransform =
  CdapIO.<NullWritable, String>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToHubspotPlugin", writeTransform);

CDAP Salesforceバッチシンクプラグイン

SalesforceSinkConfig pluginConfig =
  new ConfigWrapper<>(SalesforceSinkConfig.class).withParams(pluginConfigParams).build();
CdapIO<NullWritable, CSVRecord> writeTransform =
  CdapIO.<NullWritable, CSVRecord>write()
    .withCdapPluginClass(pluginClass)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(CSVRecord.class)
    .withLocksDirPath(locksDirPath);
p.apply("writeToSalesforcePlugin", writeTransform);

詳細については、完全な例をご覧ください。

CdapIOを使用したストリーミング読み込み

CDAPプラグインから読み取るには、以下を渡す必要があります。

以下を指定することで、ConfigWrapperクラスを使用してPluginConfigオブジェクトを簡単に構築できます。

例えば

MyPluginConfig pluginConfig =
  new ConfigWrapper<>(MyPluginConfig.class).withParams(pluginConfigParams).build();

プラグインのクラス名でデータを読み込む

一部のCDAPプラグインは既にサポートされており、プラグインのクラス名だけで使用できます。

例えば

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

ストリーミングプラグインを構築してデータを読み込む

CDAPプラグインがプラグインのクラス名でサポートされていない場合は、次のパラメータを渡すことで、Pluginオブジェクトを簡単に構築できます。

その後、このPluginオブジェクトをCdapIOに渡すことができます。

例えば

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        MyStreamingPlugin.class,
        myGetOffsetFn,
        MyReceiver.class,
        myGetReceiverArgsFromConfigFn))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("read", readTransform);

オプションパラメータを使用してデータを読み込む

必要に応じて、次のオプションパラメータを渡すことができます。

例えば

CdapIO.Read<String, String> readTransform =
  CdapIO.<String, String>read()
    .withCdapPluginClass(MyStreamingPlugin.class)
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class)
    .withPullFrequencySec(1L)
    .withStartOffset(1L);
p.apply("read", readTransform);

特定のCDAPプラグインの例

CDAP Hubspotストリーミングソースプラグイン

HubspotStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(HubspotStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        HubspotStreamingSource.class,
        GetOffsetUtils.getOffsetFnForHubspot(),
        HubspotReceiver.class))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromHubspotPlugin", readTransform);

CDAP Salesforceストリーミングソースプラグイン

SalesforceStreamingSourceConfig pluginConfig =
  new ConfigWrapper<>(SalesforceStreamingSourceConfig.class)
    .withParams(pluginConfigParams).build();
CdapIO.Read<NullWritable, String> readTransform =
  CdapIO.<NullWritable, String>read()
    .withCdapPlugin(
      Plugin.createStreaming(
        SalesforceStreamingSource.class,
        GetOffsetUtils.getOffsetFnForSalesforce(),
        SalesforceReceiver.class,
        config -> {
          SalesforceStreamingSourceConfig salesforceConfig =
            SalesforceStreamingSourceConfig) config;
          return new Object[] {
            salesforceConfig.getAuthenticatorCredentials(),
            salesforceConfig.getPushTopicName()
          };
        }))
    .withPluginConfig(pluginConfig)
    .withKeyClass(NullWritable.class)
    .withValueClass(String.class);
p.apply("readFromSalesforcePlugin", readTransform);

詳細については、完全な例をご覧ください。