HCatalog I/O

HCatalogIO は、HCatalog で管理されたソースに対してデータの読み取りと書き込みを行うための変換です。

HCatalogIO を使用した読み取り

HCatalog ソースを設定するには、メタストア URI とテーブル名を指定する必要があります。その他のオプションのパラメーターは、データベースとフィルターです。

例えば

Map<String, String> configProperties = new HashMap<String, String>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");
pipeline
  .apply(HCatalogIO.read()
  .withConfigProperties(configProperties)
  .withDatabase("default") //optional, assumes default if none specified
  .withTable("employee")
  .withFilter(filterString)) //optional, may be specified if the table is partitioned
  # The Beam SDK for Python does not support HCatalogIO.

HCatalogIO を使用した書き込み

HCatalog シンクを設定するには、メタストア URI とテーブル名を指定する必要があります。その他のオプションのパラメーターは、データベース、パーティション、batchsize です。変換ではテーブルがない場合に新しいテーブルは作成されないため、宛先テーブルは事前に存在する必要があります。

例えば

Map<String, String> configProperties = new HashMap<String, String>();
configProperties.put("hive.metastore.uris","thrift://metastore-host:port");

pipeline
  .apply(...)
  .apply(HCatalogIO.write()
    .withConfigProperties(configProperties)
    .withDatabase("default") //optional, assumes default if none specified
    .withTable("employee")
    .withPartition(partitionValues) //optional, may be specified if the table is partitioned
    .withBatchSize(1024L)) //optional, assumes a default batch size of 1024 if none specified
  # The Beam SDK for Python does not support HCatalogIO.

古いバージョンの HCatalog(1.x)の使用

HCatalogIO は、Apache HCatalog バージョン 2 以上向けに構築されており、古いバージョンの HCatalog ではそのままでは動作しません。以下は、Hive 1.1 で動作させるための回避策を示しています。

ビルドする uber jar に次の Hive 1.2 jar を含めます。1.2 jar は Beam に必要なメソッドを提供し、Hive 1.1 との互換性を維持します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-hcatalog</artifactId>
    <version>${beam.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hive-hcatalog-core</artifactId>
    <version>1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>1.2</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>1.2</version>
</dependency>

次の hive パッケージのみを再配置します。

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>${maven-shade-plugin.version}</version>
    <configuration>
        <createDependencyReducedPom>false</createDependencyReducedPom>
        <filters>
            <filter>
                <artifact>*:*</artifact>
                <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                </excludes>
            </filter>
        </filters>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <shadedArtifactAttached>true</shadedArtifactAttached>
                <shadedClassifierName>shaded</shadedClassifierName>
                <transformers>
                    <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                </transformers>
                <relocations>
                    <!-- Important: Do not relocate org.apache.hadoop.hive -->
                    <relocation>
                        <pattern>org.apache.hadoop.hive.conf</pattern>
                        <shadedPattern>h12.org.apache.hadoop.hive.conf</shadedPattern>
                    </relocation>
                    <relocation>
                        <pattern>org.apache.hadoop.hive.ql</pattern>
                        <shadedPattern>h12.org.apache.hadoop.hive.ql</shadedPattern>
                    </relocation>
                    <relocation>
                        <pattern>org.apache.hadoop.hive.metastore</pattern>
                        <shadedPattern>h12.org.apache.hadoop.hive.metastore</shadedPattern>
                    </relocation>
                </relocations>
            </configuration>
        </execution>
    </executions>
</plugin>

これは、Cloudera CDH 5.12.2 管理環境で Spark 2.3 / YARN 上で Beam 2.4.0 を実行して、SequenceFile および ORCFile ファイルバックテーブルを読み取るためにテストされています。