Hadoop Input/Output Format IO

重要! HadoopInputFormatIO という名前の以前の Hadoop Input Format IO の実装は、Apache Beam 2.10 から非推奨になりました。現在、InputFormatOutputFormat の両方をサポートしている現在の HadoopFormatIO を使用してください。

HadoopFormatIO は、Hadoop の InputFormat または OutputFormat をそれぞれ実装する任意のソースからのデータの読み取りまたは任意のシンクへのデータの書き込みのための変換です。たとえば、Cassandra、Elasticsearch、HBase、Redis、Postgres などです。

HadoopFormatIO を使用すると、まだ Beam IO 変換がない多くのデータソース/シンクに接続できます。ただし、HadoopFormatIO は、InputFormat または OutputFormat に接続する際に、いくつかのパフォーマンス上のトレードオフを行う必要があります。したがって、特定のデータソース/シンクに接続するための別の Beam IO 変換がある場合は、そちらを使用することをお勧めします。

HadoopFormatIOを使用した読み取り

読み取りがどのように行われるかを指定するパラメーターを含む Hadoop Configuration を渡す必要があります。 Configuration の多くのプロパティはオプションであり、特定の InputFormat クラスには必須ですが、次のプロパティはすべての InputFormat クラスに設定する必要があります。

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
  InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

InputFormat によって出力される Key クラスと Value クラスに利用可能な Beam Coder があるかどうかを確認する必要があります。そうでない場合は、withKeyTranslation または withValueTranslation を使用して、これらのクラスのインスタンスを Beam Coder でサポートされている別のクラスに変換するメソッドを指定できます。これらの設定はオプションであり、キーと値の両方の変換を指定する必要はありません。

SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
  public MyKeyClass apply(InputFormatKeyClass input) {
  // ...logic to transform InputFormatKeyClass to MyKeyClass
  }
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
  public MyValueClass apply(InputFormatValueClass input) {
  // ...logic to transform InputFormatValueClass to MyValueClass
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Hadoop構成のみを使用したデータの読み取り。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

構成とキー変換を使用したデータの読み取り

たとえば、Beam CoderKey クラスで使用できないため、キー変換が必要です。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, InputFormatKeyClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

構成と値変換を使用したデータの読み取り

たとえば、Beam CoderValue クラスで使用できないため、値変換が必要です。

p.apply("read",
  HadoopFormatIO.<InputFormatKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

構成、値変換、キー変換を使用したデータの読み取り

たとえば、InputFormatKey クラスと Value クラスの両方で Beam Coder を使用できないため、キーと値の変換が必要です。

p.apply("read",
  HadoopFormatIO.<MyKeyClass, MyValueClass>read()
  .withConfiguration(myHadoopConfiguration)
  .withKeyTranslation(myOutputKeyType)
  .withValueTranslation(myOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

特定の InputFormat の例

Cassandra - CqlInputFormat

Cassandra からデータを読み取るには、org.apache.cassandra.hadoop.cql3.CqlInputFormat を使用します。これには、次のプロパティを設定する必要があります。

Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

次のように Read 変換を呼び出します。

PCollection<KV<Long, String>> cassandraData =
  p.apply("read",
  HadoopFormatIO.<Long, String>read()
  .withConfiguration(cassandraConf)
  .withValueTranslation(cassandraOutputValueType);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

CqlInputFormat のキー クラスは java.lang.Long Long であり、Beam Coder があります。 CqlInputFormat の値クラスは com.datastax.driver.core.Row Row であり、Beam Coder がありません。新しいコーダーを作成する代わりに、次のように独自の変換メソッドを提供できます。

SimpleFunction<Row, String> cassandraOutputValueType = SimpleFunction<Row, String>()
{
  public String apply(Row row) {
    return row.getString('myColName');
  }
};
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Elasticsearch - EsInputFormat

Elasticsearch からデータを読み取るには、EsInputFormat を使用します。これには、次のプロパティを設定する必要があります。

Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

次のように Read 変換を呼び出します。

PCollection<KV<Text, LinkedMapWritable>> elasticData = p.apply("read",
  HadoopFormatIO.<Text, LinkedMapWritable>read().withConfiguration(elasticsearchConf));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

org.elasticsearch.hadoop.mr.EsInputFormatEsInputFormat キー クラスは org.apache.hadoop.io.Text Text であり、その値クラスは org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable です。キーと値のクラスには両方とも Beam Coder があります。

HCatalog - HCatInputFormat

HCatalog を使用してデータを読み取るには、org.apache.hive.hcatalog.mapreduce.HCatInputFormat を使用します。これには、次のプロパティを設定する必要があります。

Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");

org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

次のように Read 変換を呼び出します。

PCollection<KV<Long, HCatRecord>> hcatData =
  p.apply("read",
  HadoopFormatIO.<Long, HCatRecord>read()
  .withConfiguration(hcatConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Amazon DynamoDB - DynamoDBInputFormat

Amazon DynamoDB からデータを読み取るには、org.apache.hadoop.dynamodb.read.DynamoDBInputFormat を使用します。 DynamoDBInputFormat は、以前の org.apache.hadoop.mapred.InputFormat インターフェースを実装しており、より新しい抽象クラス org.apache.hadoop.mapreduce.InputFormat を使用する HadoopFormatIO と互換性を持たせるには、HadoopFormatIO と DynamoDBInputFormat (または一般的に org.apache.hadoop.mapred.InputFormat を実装する任意の InputFormat) の間のアダプターとして機能するラッパー API が必要です。以下の例では、利用可能なラッパー API (https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java) を使用しています。

Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

次のように Read 変換を呼び出します。

PCollection<Text, DynamoDBItemWritable> dynamoDBData =
  p.apply("read",
  HadoopFormatIO.<Text, DynamoDBItemWritable>read()
  .withConfiguration(dynamoDBConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Apache HBase - TableSnapshotInputFormat

HBase テーブルのスナップショットからデータを読み取るには、org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat を使用します。テーブル スナップショットからの読み取りは、HBase リージョン サーバーをバイパスし、代わりにファイルシステムから HBase データファイルを直接読み取ります。これは、履歴データの読み取りや HBase クラスターからの作業のオフロードなどの場合に役立ちます。 HBaseIO を使用してリージョン サーバーを介してコンテンツにアクセスするよりも高速になる可能性があるシナリオがあります。

テーブル スナップショットは、HBase シェルまたはプログラムで取得できます。

try (
    Connection connection = ConnectionFactory.createConnection(hbaseConf);
    Admin admin = connection.getAdmin()
  ) {
  admin.snapshot(
    "my_snaphshot",
    TableName.valueOf("my_table"),
    HBaseProtos.SnapshotDescription.Type.FLUSH);
}
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

TableSnapshotInputFormat は次のように構成されます。

// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));

Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
    "mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));

// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

次のように Read 変換を呼び出します。

PCollection<ImmutableBytesWritable, Result> hbaseSnapshotData =
  p.apply("read",
  HadoopFormatIO.<ImmutableBytesWritable, Result>read()
  .withConfiguration(hbaseConf);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

HadoopFormatIOを使用した書き込み

書き込みがどのように行われるかを指定するパラメーターを含む Hadoop Configuration を渡す必要があります。 Configuration の多くのプロパティはオプションであり、特定の OutputFormat クラスには必須ですが、次のプロパティはすべての OutputFormat に設定する必要があります。

: 上記の値はすべて適切な定数を持ちます。例: HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR

Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
   MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
   MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
   MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
   MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

Hadoop Configuration で、OutputFormat のキーと値のクラス(つまり、「mapreduce.job.output.key.class」と「mapreduce.job.output.value.class」)を KeyT および ValueT と同じになるように設定する必要があります。OutputFormat の実際のキーまたは値のクラスとは異なる OutputFormat のキーまたは値のクラスを設定した場合、IllegalArgumentException がスローされます。

バッチ書き込み

// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...

// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;

boundedWordsCount.apply(
    "writeBatch",
    HadoopFormatIO.<Text, LongWritable>write()
        .withConfiguration(myHadoopConfiguration)
        .withPartitioning()
        .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.

ストリーム書き込み

// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;

// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
  configTransform = ...;

unboundedWordsCount.apply(
  "writeStream",
  HadoopFormatIO.<Text, LongWritable>write()
      .withConfigurationTransform(configTransform)
      .withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
  # The Beam SDK for Python does not support Hadoop Input/Output Format IO.