Hadoop Input/Output Format IO
重要!
HadoopInputFormatIO
という名前の以前の Hadoop Input Format IO の実装は、Apache Beam 2.10 から非推奨になりました。現在、InputFormat
とOutputFormat
の両方をサポートしている現在のHadoopFormatIO
を使用してください。
HadoopFormatIO
は、Hadoop の InputFormat
または OutputFormat
をそれぞれ実装する任意のソースからのデータの読み取りまたは任意のシンクへのデータの書き込みのための変換です。たとえば、Cassandra、Elasticsearch、HBase、Redis、Postgres などです。
HadoopFormatIO
を使用すると、まだ Beam IO 変換がない多くのデータソース/シンクに接続できます。ただし、HadoopFormatIO
は、InputFormat
または OutputFormat
に接続する際に、いくつかのパフォーマンス上のトレードオフを行う必要があります。したがって、特定のデータソース/シンクに接続するための別の Beam IO 変換がある場合は、そちらを使用することをお勧めします。
HadoopFormatIOを使用した読み取り
読み取りがどのように行われるかを指定するパラメーターを含む Hadoop Configuration
を渡す必要があります。 Configuration
の多くのプロパティはオプションであり、特定の InputFormat
クラスには必須ですが、次のプロパティはすべての InputFormat
クラスに設定する必要があります。
mapreduce.job.inputformat.class
- 選択したデータソースに接続するために使用されるInputFormat
クラス。key.class
-mapreduce.job.inputformat.class
のInputFormat
によって返されるKey
クラス。value.class
-mapreduce.job.inputformat.class
のInputFormat
によって返されるValue
クラス。
例
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop InputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", InputFormatClass,
InputFormat.class);
myHadoopConfiguration.setClass("key.class", InputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("value.class", InputFormatValueClass, Object.class);
InputFormat
によって出力される Key
クラスと Value
クラスに利用可能な Beam Coder
があるかどうかを確認する必要があります。そうでない場合は、withKeyTranslation
または withValueTranslation
を使用して、これらのクラスのインスタンスを Beam Coder
でサポートされている別のクラスに変換するメソッドを指定できます。これらの設定はオプションであり、キーと値の両方の変換を指定する必要はありません。
例
SimpleFunction<InputFormatKeyClass, MyKeyClass> myOutputKeyType =
new SimpleFunction<InputFormatKeyClass, MyKeyClass>() {
public MyKeyClass apply(InputFormatKeyClass input) {
// ...logic to transform InputFormatKeyClass to MyKeyClass
}
};
SimpleFunction<InputFormatValueClass, MyValueClass> myOutputValueType =
new SimpleFunction<InputFormatValueClass, MyValueClass>() {
public MyValueClass apply(InputFormatValueClass input) {
// ...logic to transform InputFormatValueClass to MyValueClass
}
};
Hadoop構成のみを使用したデータの読み取り。
構成とキー変換を使用したデータの読み取り
たとえば、Beam Coder
が Key
クラスで使用できないため、キー変換が必要です。
構成と値変換を使用したデータの読み取り
たとえば、Beam Coder
が Value
クラスで使用できないため、値変換が必要です。
構成、値変換、キー変換を使用したデータの読み取り
たとえば、InputFormat
の Key
クラスと Value
クラスの両方で Beam Coder を使用できないため、キーと値の変換が必要です。
特定の InputFormat の例
Cassandra - CqlInputFormat
Cassandra からデータを読み取るには、org.apache.cassandra.hadoop.cql3.CqlInputFormat
を使用します。これには、次のプロパティを設定する必要があります。
Configuration cassandraConf = new Configuration();
cassandraConf.set("cassandra.input.thrift.port", "9160");
cassandraConf.set("cassandra.input.thrift.address", CassandraHostIp);
cassandraConf.set("cassandra.input.partitioner.class", "Murmur3Partitioner");
cassandraConf.set("cassandra.input.keyspace", "myKeySpace");
cassandraConf.set("cassandra.input.columnfamily", "myColumnFamily");
cassandraConf.setClass("key.class", java.lang.Long Long.class, Object.class);
cassandraConf.setClass("value.class", com.datastax.driver.core.Row Row.class, Object.class);
cassandraConf.setClass("mapreduce.job.inputformat.class", org.apache.cassandra.hadoop.cql3.CqlInputFormat CqlInputFormat.class, InputFormat.class);
次のように Read 変換を呼び出します。
CqlInputFormat
のキー クラスは java.lang.Long
Long
であり、Beam Coder
があります。 CqlInputFormat
の値クラスは com.datastax.driver.core.Row
Row
であり、Beam Coder
がありません。新しいコーダーを作成する代わりに、次のように独自の変換メソッドを提供できます。
Elasticsearch - EsInputFormat
Elasticsearch からデータを読み取るには、EsInputFormat
を使用します。これには、次のプロパティを設定する必要があります。
Configuration elasticsearchConf = new Configuration();
elasticsearchConf.set("es.nodes", ElasticsearchHostIp);
elasticsearchConf.set("es.port", "9200");
elasticsearchConf.set("es.resource", "ElasticIndexName/ElasticTypeName");
elasticsearchConf.setClass("key.class", org.apache.hadoop.io.Text Text.class, Object.class);
elasticsearchConf.setClass("value.class", org.elasticsearch.hadoop.mr.LinkedMapWritable LinkedMapWritable.class, Object.class);
elasticsearchConf.setClass("mapreduce.job.inputformat.class", org.elasticsearch.hadoop.mr.EsInputFormat EsInputFormat.class, InputFormat.class);
次のように Read 変換を呼び出します。
org.elasticsearch.hadoop.mr.EsInputFormat
の EsInputFormat
キー クラスは org.apache.hadoop.io.Text
Text
であり、その値クラスは org.elasticsearch.hadoop.mr.LinkedMapWritable
LinkedMapWritable
です。キーと値のクラスには両方とも Beam Coder があります。
HCatalog - HCatInputFormat
HCatalog を使用してデータを読み取るには、org.apache.hive.hcatalog.mapreduce.HCatInputFormat
を使用します。これには、次のプロパティを設定する必要があります。
Configuration hcatConf = new Configuration();
hcatConf.setClass("mapreduce.job.inputformat.class", HCatInputFormat.class, InputFormat.class);
hcatConf.setClass("key.class", LongWritable.class, Object.class);
hcatConf.setClass("value.class", HCatRecord.class, Object.class);
hcatConf.set("hive.metastore.uris", "thrift://metastore-host:port");
org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput(hcatConf, "my_database", "my_table", "my_filter");
次のように Read 変換を呼び出します。
Amazon DynamoDB - DynamoDBInputFormat
Amazon DynamoDB からデータを読み取るには、org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
を使用します。 DynamoDBInputFormat は、以前の org.apache.hadoop.mapred.InputFormat
インターフェースを実装しており、より新しい抽象クラス org.apache.hadoop.mapreduce.InputFormat
を使用する HadoopFormatIO と互換性を持たせるには、HadoopFormatIO と DynamoDBInputFormat (または一般的に org.apache.hadoop.mapred.InputFormat
を実装する任意の InputFormat) の間のアダプターとして機能するラッパー API が必要です。以下の例では、利用可能なラッパー API (https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/MapReduceInputFormatWrapper.java) を使用しています。
Configuration dynamoDBConf = new Configuration();
Job job = Job.getInstance(dynamoDBConf);
com.twitter.elephantbird.mapreduce.input.MapReduceInputFormatWrapper.setInputFormat(org.apache.hadoop.dynamodb.read.DynamoDBInputFormat.class, job);
dynamoDBConf = job.getConfiguration();
dynamoDBConf.setClass("key.class", Text.class, WritableComparable.class);
dynamoDBConf.setClass("value.class", org.apache.hadoop.dynamodb.DynamoDBItemWritable.class, Writable.class);
dynamoDBConf.set("dynamodb.servicename", "dynamodb");
dynamoDBConf.set("dynamodb.input.tableName", "table_name");
dynamoDBConf.set("dynamodb.endpoint", "dynamodb.us-west-1.amazonaws.com");
dynamoDBConf.set("dynamodb.regionid", "us-west-1");
dynamoDBConf.set("dynamodb.throughput.read", "1");
dynamoDBConf.set("dynamodb.throughput.read.percent", "1");
dynamoDBConf.set("dynamodb.version", "2011-12-05");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_ACCESS_KEY_CONF, "aws_access_key");
dynamoDBConf.set(DynamoDBConstants.DYNAMODB_SECRET_KEY_CONF, "aws_secret_key");
次のように Read 変換を呼び出します。
Apache HBase - TableSnapshotInputFormat
HBase テーブルのスナップショットからデータを読み取るには、org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat
を使用します。テーブル スナップショットからの読み取りは、HBase リージョン サーバーをバイパスし、代わりにファイルシステムから HBase データファイルを直接読み取ります。これは、履歴データの読み取りや HBase クラスターからの作業のオフロードなどの場合に役立ちます。 HBaseIO
を使用してリージョン サーバーを介してコンテンツにアクセスするよりも高速になる可能性があるシナリオがあります。
テーブル スナップショットは、HBase シェルまたはプログラムで取得できます。
TableSnapshotInputFormat
は次のように構成されます。
// Construct a typical HBase scan
Scan scan = new Scan();
scan.setCaching(1000);
scan.setBatch(1000);
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_1"));
scan.addColumn(Bytes.toBytes("CF"), Bytes.toBytes("col_2"));
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "zk1:2181");
hbaseConf.set("hbase.rootdir", "/hbase");
hbaseConf.setClass(
"mapreduce.job.inputformat.class", TableSnapshotInputFormat.class, InputFormat.class);
hbaseConf.setClass("key.class", ImmutableBytesWritable.class, Writable.class);
hbaseConf.setClass("value.class", Result.class, Writable.class);
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()));
// Make use of existing utility methods
Job job = Job.getInstance(hbaseConf); // creates internal clone of hbaseConf
TableSnapshotInputFormat.setInput(job, "my_snapshot", new Path("/tmp/snapshot_restore"));
hbaseConf = job.getConfiguration(); // extract the modified clone
次のように Read 変換を呼び出します。
HadoopFormatIOを使用した書き込み
書き込みがどのように行われるかを指定するパラメーターを含む Hadoop Configuration
を渡す必要があります。 Configuration
の多くのプロパティはオプションであり、特定の OutputFormat
クラスには必須ですが、次のプロパティはすべての OutputFormat
に設定する必要があります。
mapreduce.job.id
- 書き込みジョブの識別子。例:ウィンドウの終了タイムスタンプ。mapreduce.job.outputformat.class
- 選択したデータシンクに接続するために使用されるOutputFormat
クラス。mapreduce.job.output.key.class
-mapreduce.job.outputformat.class
のOutputFormat
に渡されるキーのクラス。mapreduce.job.output.value.class
-mapreduce.job.outputformat.class
のOutputFormat
に渡される値のクラス。mapreduce.job.reduces
- reduceタスクの数。値は生成される書き込みタスクの数に等しくなります。このプロパティはWrite.PartitionedWriterBuilder#withoutPartitioning()
書き込みには必須ではありません。mapreduce.job.partitioner.class
- パーティション間でレコードを分散するために使用される Hadoop パーティショナークラス。このプロパティはWrite.PartitionedWriterBuilder#withoutPartitioning()
書き込みには必須ではありません。
注: 上記の値はすべて適切な定数を持ちます。例: HadoopFormatIO.OUTPUT_FORMAT_CLASS_ATTR
。
例
Configuration myHadoopConfiguration = new Configuration(false);
// Set Hadoop OutputFormat, key and value class in configuration
myHadoopConfiguration.setClass("mapreduce.job.outputformat.class",
MyDbOutputFormatClass, OutputFormat.class);
myHadoopConfiguration.setClass("mapreduce.job.output.key.class",
MyDbOutputFormatKeyClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.output.value.class",
MyDbOutputFormatValueClass, Object.class);
myHadoopConfiguration.setClass("mapreduce.job.partitioner.class",
MyPartitionerClass, Object.class);
myHadoopConfiguration.setInt("mapreduce.job.reduces", 2);
Hadoop Configuration
で、OutputFormat
のキーと値のクラス(つまり、「mapreduce.job.output.key.class」と「mapreduce.job.output.value.class」)を KeyT
および ValueT
と同じになるように設定する必要があります。OutputFormat
の実際のキーまたは値のクラスとは異なる OutputFormat
のキーまたは値のクラスを設定した場合、IllegalArgumentException
がスローされます。
バッチ書き込み
// Data which will we want to write
PCollection<KV<Text, LongWritable>> boundedWordsCount = ...
// Hadoop configuration for write
// We have partitioned write, so Partitioner and reducers count have to be set - see withPartitioning() javadoc
Configuration myHadoopConfiguration = ...
// Path to directory with locks
String locksDirPath = ...;
boundedWordsCount.apply(
"writeBatch",
HadoopFormatIO.<Text, LongWritable>write()
.withConfiguration(myHadoopConfiguration)
.withPartitioning()
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
ストリーム書き込み
// Data which will we want to write
PCollection<KV<Text, LongWritable>> unboundedWordsCount = ...;
// Transformation which transforms data of one window into one hadoop configuration
PTransform<PCollection<? extends KV<Text, LongWritable>>, PCollectionView<Configuration>>
configTransform = ...;
unboundedWordsCount.apply(
"writeStream",
HadoopFormatIO.<Text, LongWritable>write()
.withConfigurationTransform(configTransform)
.withExternalSynchronization(new HDFSSynchronization(locksDirPath)));
最終更新日: 2024/10/31
お探しの情報はすべて見つかりましたか?
すべて役に立ち、明確でしたか?何か変更したい点はありますか?ぜひお知らせください!