組み込みの I/O 変換

Google BigQuery I/O コネクタ

Beam SDK には、Google BigQuery テーブルからデータを読み書きできる組み込みの変換が含まれています。

開始する前に

BigQueryIO を使用するには、Maven アーティファクトの依存関係を pom.xml ファイルに追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>2.60.0</version>
</dependency>

追加リソース

BigQueryIO を使用するには、pip install apache-beam[gcp] を実行して Google Cloud Platform の依存関係をインストールする必要があります。

追加リソース

BigQuery の基本

テーブル名

BigQuery テーブルから読み書きするには、完全修飾された BigQuery テーブル名 (たとえば、bigquery-public-data:github_repos.sample_contents) を指定する必要があります。完全修飾された BigQuery テーブル名は、次の 3 つの部分で構成されています。

テーブル名には、時間パーティション化されたテーブルを使用している場合、テーブルデコレータを含めることもできます。

BigQuery テーブルを指定するには、テーブルの完全修飾名を文字列として使用するか、TableReference TableReference オブジェクトを使用できます。

文字列の使用

文字列でテーブルを指定するには、形式 [project_id]:[dataset_id].[table_id] を使用して、完全修飾された BigQuery テーブル名を指定します。

String tableSpec = "apache-beam-testing.samples.weather_stations";
# project-id:dataset_id.table_id
table_spec = 'apache-beam-testing.samples.weather_stations'

project_id を省略して、[dataset_id].[table_id] 形式を使用することもできます。プロジェクト ID を省略すると、Beam は、パイプラインオプション パイプラインオプション からデフォルトのプロジェクト ID を使用します。

String tableSpec = "samples.weather_stations";
# dataset_id.table_id
table_spec = 'samples.weather_stations'

TableReference の使用

TableReference でテーブルを指定するには、BigQuery テーブル名の 3 つの部分を使用して、新しい TableReference を作成します。

TableReference tableSpec =
    new TableReference()
        .setProjectId("clouddataflow-readonly")
        .setDatasetId("samples")
        .setTableId("weather_stations");
from apache_beam.io.gcp.internal.clients import bigquery

table_spec = bigquery.TableReference(
    projectId='clouddataflow-readonly',
    datasetId='samples',
    tableId='weather_stations')

Java 用の Beam SDK は、完全修飾された BigQuery テーブル名を含む文字列から TableReference オブジェクトを構築する parseTableSpec ヘルパーメソッドも提供しています。ただし、BigQueryIO 変換の静的ファクトリメソッドは、テーブル名を文字列として受け入れ、TableReference オブジェクトを構築します。

テーブル行

BigQueryIO の読み取りおよび書き込み変換は、データの PCollection を生成および消費します。PCollection 内の各要素は、テーブル内の単一行を表します。

スキーマ

BigQuery に書き込む場合、作成処理CREATE_NEVER を指定しない限り、書き込む先のテーブルのテーブルスキーマを指定する必要があります。テーブルスキーマの作成では、スキーマについて詳しく説明します。

データ型

BigQuery は、STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME、GEOGRAPHY のデータ型をサポートしています。Google 標準 SQL データ型の概要については、データ型を参照してください。BigQueryIO では、これらのすべてのデータ型を使用できます。次の例は、BigQuery からの読み取りおよび書き込み時に使用されるデータ型の正しい形式を示しています。

import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class BigQueryTableRowCreate {
  public static TableRow createTableRow() {
    TableRow row =
        new TableRow()
            // To learn more about BigQuery data types:
            // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
            .set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
            .set("int64_field", 432)
            .set("float64_field", 3.141592653589793)
            .set("numeric_field", new BigDecimal("1234.56").toString())
            .set("bool_field", true)
            .set(
                "bytes_field",
                Base64.getEncoder()
                    .encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))

            // To learn more about date formatting:
            // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
            .set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
            .set(
                "datetime_field",
                LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
            .set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
            .set(
                "timestamp_field",
                Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT

            // To learn more about the geography Well-Known Text (WKT) format:
            // https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
            .set("geography_field", "POINT(30 10)")

            // An array has its mode set to REPEATED.
            .set("array_field", Arrays.asList(1, 2, 3, 4))

            // Any class can be written as a STRUCT as long as all the fields in the
            // schema are present and they are encoded correctly as BigQuery types.
            .set(
                "struct_field",
                Stream.of(
                        new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
                        new SimpleEntry<>("int64_value", "42"))
                    .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
    return row;
  }
}
bigquery_data = [{
    'string': 'abc',
    'bytes': base64.b64encode(b'\xab\xac'),
    'integer': 5,
    'float': 0.5,
    'numeric': Decimal('5'),
    'boolean': True,
    'timestamp': '2018-12-31 12:44:31.744957 UTC',
    'date': '2018-12-31',
    'time': '12:44:31',
    'datetime': '2018-12-31T12:44:31',
    'geography': 'POINT(30 10)'
}]

Beam 2.7.0 以降、NUMERIC データ型がサポートされています。このデータ型は、高精度の 10 進数(精度 38 桁、スケール 9 桁)をサポートします。GEOGRAPHY データ型は、Well-Known Text (https://en.wikipedia.org/wiki/Well-known_text を参照) 形式で、BigQuery との読み書きを行います。BigQuery IO では、BigQuery に書き込む際に BYTES データ型の値を base64 エンコードする必要があります。BigQuery からバイトを読み込むと、base64 エンコードされた文字列として返されます。

Beam 2.7.0 以降、NUMERIC データ型がサポートされています。このデータ型は、高精度の 10 進数(精度 38 桁、スケール 9 桁)をサポートします。GEOGRAPHY データ型は、Well-Known Text (https://en.wikipedia.org/wiki/Well-known_text を参照) 形式で、BigQuery との読み書きを行います。BigQuery IO では、BigQuery に書き込む際に BYTES データ型の値を base64 エンコードする必要があります。BigQuery からバイトを読み込むと、base64 エンコードされたバイトとして返されます。

BigQuery からの読み取り

BigQueryIO を使用すると、BigQuery テーブルから読み込んだり、SQL クエリを実行して結果を読み込んだりできます。デフォルトでは、BigQueryIO の読み取り変換を適用すると、Beam は BigQuery エクスポート リクエスト を呼び出します。ただし、Java 用 Beam SDK は、BigQuery Storage Read API を使用して BigQuery ストレージから直接読み込むこともサポートしています。詳細については、Storage Read API の使用 を参照してください。

Beam による BigQuery API の使用には、BigQuery の 割り当て および 料金 ポリシーが適用されます。

Java 用 Beam SDK には、BigQueryIO の読み取りメソッドが 2 つあります。これらのメソッドはいずれも、テーブルから読み込んだり、クエリ文字列を使用してフィールドを読み込んだりできます。

  1. read(SerializableFunction) は Avro 形式のレコードを読み込み、指定された解析関数を使用してカスタム型オブジェクトの PCollection に解析します。PCollection 内の各要素は、テーブル内の単一行を表します。クエリ文字列を使用した読み取りのコード例では、read(SerializableFunction) の使用方法を示しています。

  2. readTableRows は BigQuery の TableRow オブジェクトの PCollection を返します。PCollection 内の各要素は、テーブル内の単一行を表します。TableRow オブジェクトの整数値は、BigQuery のエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。このメソッドは便利ですが、read(SerializableFunction) と比較してパフォーマンスが 2~3 倍遅くなる可能性があります。テーブルからの読み取りのコード例では、readTableRows の使用方法を示しています。

注: BigQueryIO.read() は Beam SDK 2.2.0 以降で非推奨になりました。代わりに、Avro GenericRecord から BigQuery 行をカスタム型に解析するには read(SerializableFunction<SchemaAndRecord, T>) を使用するか、JSON TableRow オブジェクトに解析するには readTableRows() を使用してください。

Python 用 Beam SDK を使用して BigQuery テーブルから読み取るには、ReadFromBigQuery 変換を適用します。ReadFromBigQuery はディクショナリの PCollection を返します。PCollection 内の各要素は、テーブル内の単一行を表します。TableRow オブジェクトの整数値は、BigQuery のエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。

注: BigQuerySource() は Beam SDK 2.25.0 以降で非推奨になりました。2.25.0 より前のバージョンでは、Beam SDK を使用して BigQuery テーブルから読み取るには、BigQuerySourceRead 変換を適用します。例: beam.io.Read(beam.io.BigQuerySource(table_spec))

テーブルからの読み取り

BigQuery テーブル全体を読み取るには、BigQuery テーブル名を持つ from メソッドを使用します。この例では readTableRows を使用しています。

BigQuery テーブル全体を読み取るには、BigQuery テーブル名を持つ table パラメーターを使用します。

次のコードは、気象観測所のデータを含むテーブル全体を読み取り、max_temperature 列を抽出します。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTable {
  public static PCollection<MyData> readFromTable(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

クエリ文字列を使用した読み取り

テーブル全体を読み取る必要がない場合は、fromQuery メソッドでクエリ文字列を指定できます。

テーブル全体を読み取る必要がない場合は、query パラメーターを指定して ReadFromBigQuery にクエリ文字列を指定できます。

次のコードは、SQL クエリを使用して max_temperature 列のみを読み取ります。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQuery {
  public static PCollection<MyData> readFromQuery(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery query",
                BigQueryIO.readTableRows()
                    .fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
                    .usingStandardSql())
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'QueryTable' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '[apache-beam-testing.samples.weather_stations]')
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

次の例に示すように、BigQuery の標準 SQL 言語をクエリ文字列で使用することもできます。

PCollection<Double> maxTemperatures =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
            .fromQuery(
                "SELECT max_temperature FROM `clouddataflow-readonly.samples.weather_stations`")
            .usingStandardSql()
            .withCoder(DoubleCoder.of()));
max_temperatures = (
    pipeline
    | 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature FROM '\
              '`clouddataflow-readonly.samples.weather_stations`',
        use_standard_sql=True)
    # Each row is a dictionary where the keys are the BigQuery columns
    | beam.Map(lambda elem: elem['max_temperature']))

クエリ実行プロジェクト

デフォルトでは、パイプラインはパイプラインに関連付けられた Google Cloud プロジェクトでクエリを実行します(Dataflow ランナーの場合は、パイプラインが実行されるプロジェクトです)。クエリ実行プロジェクトがパイプライン プロジェクトと異なる必要がある場合があります。Java SDK を使用している場合は、パイプライン オプション "bigQueryProject" を目的の Google Cloud プロジェクト ID に設定することで、クエリ実行プロジェクトを定義できます。

Storage Read API の使用

BigQuery Storage API を使用すると、BigQuery ストレージ内のテーブルに直接アクセスでき、列の選択や述語フィルターのプッシュダウンなどの機能をサポートしているため、より効率的なパイプライン実行が可能になります。

Java 用 Beam SDK は、BigQuery から読み取る際に BigQuery Storage API の使用をサポートしています。2.25.0 より前の SDK バージョンでは、BigQuery Storage API が 実験的な機能としてサポートされており、GA 前の BigQuery Storage API サーフェスが使用されています。BigQuery Storage API を使用する呼び出し元は、SDK バージョン 2.25.0 以降を使用するようにパイプラインを移行する必要があります。

Python 用 Beam SDK は、BigQuery Storage API をサポートしています。ReadFromBigQuery にパラメーターとして method=DIRECT_READ を渡すことで有効にできます。

コードの更新

テーブルから読み取る場合は、次のメソッドを使用してください。

次のコード スニペットは、テーブルから読み取ります。この例は、BigQueryTornadoes の例からのものです。例の読み取りメソッド オプションが DIRECT_READ に設定されている場合、パイプラインは BigQuery Storage API と列のプロジェクションを使用して、BigQuery テーブルから気象データの公開サンプルを読み取ります。GitHub で完全なソース コードを参照できます。

import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromTableWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
      String project, String dataset, String table, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .from(String.format("%s:%s.%s", project, dataset, table))
                    .withMethod(Method.DIRECT_READ)
                    .withSelectedFields(
                        Arrays.asList(
                            "string_field",
                            "int64_field",
                            "float64_field",
                            "numeric_field",
                            "bool_field",
                            "bytes_field",
                            "date_field",
                            "datetime_field",
                            "time_field",
                            "timestamp_field",
                            "geography_field",
                            "array_field",
                            "struct_field")))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
max_temperatures = (
    pipeline
    | 'ReadTableWithStorageAPI' >> beam.io.ReadFromBigQuery(
        table=table_spec, method=beam.io.ReadFromBigQuery.Method.DIRECT_READ)
    | beam.Map(lambda elem: elem['max_temperature']))

次のコード スニペットは、クエリ文字列を使用して読み取ります。

import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;

class BigQueryReadFromQueryWithBigQueryStorageAPI {
  public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
      String project, String dataset, String table, String query, Pipeline pipeline) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // Pipeline pipeline = Pipeline.create();

    /*
    String query = String.format("SELECT\n" +
        "  string_field,\n" +
        "  int64_field,\n" +
        "  float64_field,\n" +
        "  numeric_field,\n" +
        "  bool_field,\n" +
        "  bytes_field,\n" +
        "  date_field,\n" +
        "  datetime_field,\n" +
        "  time_field,\n" +
        "  timestamp_field,\n" +
        "  geography_field,\n" +
        "  array_field,\n" +
        "  struct_field\n" +
        "FROM\n" +
        "  `%s:%s.%s`", project, dataset, table)
    */

    PCollection<MyData> rows =
        pipeline
            .apply(
                "Read from BigQuery table",
                BigQueryIO.readTableRows()
                    .fromQuery(query)
                    .usingStandardSql()
                    .withMethod(Method.DIRECT_READ))
            .apply(
                "TableRows to MyData",
                MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));

    return rows;
  }
}
# The SDK for Python does not support the BigQuery Storage API.

BigQuery への書き込み

BigQueryIO を使用すると、BigQuery テーブルに書き込むことができます。Java 用 Beam SDK を使用している場合は、異なる行を異なるテーブルに書き込むことができます。Java 用 Beam SDK は、BigQuery Storage Write API を使用して BigQuery ストレージに直接書き込むこともサポートしています。詳細については、Storage Write API の使用 を参照してください。

BigQueryIO の書き込み変換は、BigQuery の 割り当て および 料金 ポリシーが適用される API を使用します。

書き込み変換を適用する場合は、宛先テーブルに関する次の情報を提供する必要があります。

さらに、書き込み操作で新しい BigQuery テーブルを作成する場合は、宛先テーブルのテーブル スキーマも指定する必要があります。

作成処理

作成処理は、宛先テーブルが存在しない場合に BigQuery の書き込み操作でテーブルを作成するかどうかを制御します。

作成処理を指定するには、.withCreateDisposition を使用します。有効な列挙値は次のとおりです。

  • Write.CreateDisposition.CREATE_IF_NEEDED: 書き込み操作で、テーブルが存在しない場合に新しいテーブルを作成する必要があることを指定します。この値を使用する場合は、withSchema メソッドでテーブル スキーマを指定する必要があります。CREATE_IF_NEEDED がデフォルトの動作です。

  • Write.CreateDisposition.CREATE_NEVER: テーブルを作成してはならないことを指定します。宛先テーブルが存在しない場合、書き込み操作は失敗します。

作成処理を指定するには、create_disposition パラメーターを使用します。有効な列挙値は次のとおりです。

  • BigQueryDisposition.CREATE_IF_NEEDED: 書き込み操作で、テーブルが存在しない場合に新しいテーブルを作成する必要があることを指定します。この値を使用する場合は、テーブル スキーマを指定する必要があります。CREATE_IF_NEEDED がデフォルトの動作です。

  • BigQueryDisposition.CREATE_NEVER: テーブルを作成してはならないことを指定します。宛先テーブルが存在しない場合、書き込み操作は失敗します。

作成処理として CREATE_IF_NEEDED を指定し、テーブル スキーマを指定しない場合、宛先テーブルが存在しないと、変換は実行時に失敗する可能性があります。

書き込み処理

書き込み処理は、BigQuery の書き込み操作を既存のテーブルにどのように適用するかを制御します。

書き込み処理を指定するには、.withWriteDisposition を使用します。有効な列挙値は次のとおりです。

  • Write.WriteDisposition.WRITE_EMPTY: 宛先テーブルが空でない場合、書き込み操作は実行時に失敗する必要があることを指定します。WRITE_EMPTY がデフォルトの動作です。

  • Write.WriteDisposition.WRITE_TRUNCATE: 書き込み操作で既存のテーブルを置き換える必要があることを指定します。宛先テーブルの既存の行はすべて削除され、新しい行がテーブルに追加されます。

  • Write.WriteDisposition.WRITE_APPEND: 書き込み操作で、既存のテーブルの末尾に行を追加する必要があることを指定します。

書き込み処理を指定するには、write_disposition パラメーターを使用します。有効な列挙値は次のとおりです。

  • BigQueryDisposition.WRITE_EMPTY: 宛先テーブルが空でない場合、書き込み操作は実行時に失敗する必要があることを指定します。WRITE_EMPTY がデフォルトの動作です。

  • BigQueryDisposition.WRITE_TRUNCATE: 書き込み操作で既存のテーブルを置き換える必要があることを指定します。宛先テーブルの既存の行はすべて削除され、新しい行がテーブルに追加されます。

  • BigQueryDisposition.WRITE_APPEND: 書き込み操作で、既存のテーブルの末尾に行を追加する必要があることを指定します。

WRITE_EMPTY を使用すると、宛先テーブルが空かどうかを確認する処理が、実際の書き込み操作の前に行われる可能性があります。このチェックでは、パイプラインがテーブルへの排他的アクセスを持つことは保証されません。書き込み処理が WRITE_EMPTY である同じ出力テーブルに書き込む 2 つの同時パイプラインが正常に開始される可能性がありますが、両方のパイプラインは後で書き込みが試行されたときに失敗する可能性があります。

テーブルスキーマの作成

BigQuery の書き込み操作で新しいテーブルを作成する場合は、スキーマ情報を提供する必要があります。スキーマには、テーブルの各フィールドに関する情報が含まれています。新しいスキーマでパイプラインを更新する場合、既存のスキーマ フィールドは同じ順序を維持する必要があります。そうしないと、パイプラインが中断し、BigQuery への書き込みに失敗します。

Java でテーブル スキーマを作成するには、TableSchema オブジェクトを使用するか、JSON シリアル化された TableSchema オブジェクトを含む文字列を使用できます。

Python でテーブル スキーマを作成するには、TableSchema オブジェクトを使用するか、フィールドのリストを定義する文字列を使用できます。単一の文字列ベースのスキーマは、ネストされたフィールド、繰り返しフィールド、またはフィールドの BigQuery モードの指定をサポートしていません(モードは常に NULLABLE に設定されます)。

TableSchema の使用

TableSchemaオブジェクトとしてテーブルスキーマを作成および使用するには、次の手順に従います。

  1. TableFieldSchemaオブジェクトのリストを作成します。各TableFieldSchemaオブジェクトは、テーブル内のフィールドを表します。

  2. TableSchemaオブジェクトを作成し、setFieldsメソッドを使用してフィールドのリストを指定します。

  3. 書き込み変換を適用するときに、withSchemaメソッドを使用してテーブルスキーマを指定します。

  1. TableSchemaオブジェクトを作成します。

  2. テーブルの各フィールドに対してTableFieldSchemaオブジェクトを作成して追加します。

  3. 書き込み変換を適用するときに、schemaパラメータを使用してテーブルスキーマを指定します。パラメータの値をTableSchemaオブジェクトに設定します。

次のコード例は、2つの文字列型フィールド(sourceとquote)を持つテーブルのTableSchemaを作成する方法を示しています。

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;

class BigQuerySchemaCreate {
  public static TableSchema createSchema() {
    // To learn more about BigQuery schemas:
    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema schema =
        new TableSchema()
            .setFields(
                Arrays.asList(
                    new TableFieldSchema()
                        .setName("string_field")
                        .setType("STRING")
                        .setMode("REQUIRED"),
                    new TableFieldSchema()
                        .setName("int64_field")
                        .setType("INT64")
                        .setMode("NULLABLE"),
                    new TableFieldSchema()
                        .setName("float64_field")
                        .setType("FLOAT64"), // default mode is "NULLABLE"
                    new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
                    new TableFieldSchema().setName("bool_field").setType("BOOL"),
                    new TableFieldSchema().setName("bytes_field").setType("BYTES"),
                    new TableFieldSchema().setName("date_field").setType("DATE"),
                    new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
                    new TableFieldSchema().setName("time_field").setType("TIME"),
                    new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
                    new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
                    new TableFieldSchema()
                        .setName("array_field")
                        .setType("INT64")
                        .setMode("REPEATED")
                        .setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
                    new TableFieldSchema()
                        .setName("struct_field")
                        .setType("STRUCT")
                        .setDescription(
                            "A STRUCT accepts a custom data class, the fields must match the custom class fields.")
                        .setFields(
                            Arrays.asList(
                                new TableFieldSchema().setName("string_value").setType("STRING"),
                                new TableFieldSchema().setName("int64_value").setType("INT64")))));
    return schema;
  }
}
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

文字列の使用

JSONシリアライズされたTableSchemaオブジェクトを含む文字列としてテーブルスキーマを作成および使用するには、次の手順に従います。

  1. JSONシリアライズされたTableSchemaオブジェクトを含む文字列を作成します。

  2. 書き込み変換を適用するときに、withJsonSchemaメソッドを使用してテーブルスキーマを指定します。

文字列としてテーブルスキーマを作成および使用するには、次の手順に従います。

  1. フィールドのリストを定義する「field1:type1,field2:type2,field3:type3」の形式のコンマ区切り文字列を作成します。typeには、フィールドのBigQuery型を指定する必要があります。

  2. 書き込み変換を適用するときに、schemaパラメータを使用してテーブルスキーマを指定します。パラメータの値を文字列に設定します。

次の例は、前の例と同じテーブルスキーマを文字列で指定する方法を示しています。

String tableSchemaJson =
    ""
        + "{"
        + "  \"fields\": ["
        + "    {"
        + "      \"name\": \"source\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"NULLABLE\""
        + "    },"
        + "    {"
        + "      \"name\": \"quote\","
        + "      \"type\": \"STRING\","
        + "      \"mode\": \"REQUIRED\""
        + "    }"
        + "  ]"
        + "}";
# column_name:BIGQUERY_TYPE, ...
table_schema = 'source:STRING, quote:STRING'

挿入方法の設定

BigQueryIOは、BigQueryにデータを挿入する2つの方法(ロードジョブとストリーミング挿入)をサポートしています。各挿入方法には、コスト、クォータ、データ整合性の異なるトレードオフがあります。これらのトレードオフの詳細については、BigQueryドキュメントのさまざまなデータ取り込みオプション(特に、ロードジョブストリーミング挿入)を参照してください。

BigQueryIOは、入力PCollectionに基づいてデフォルトの挿入方法を選択します。withMethodを使用して、必要な挿入方法を指定できます。使用可能な方法とその制限事項の一覧については、Write.Methodを参照してください。

BigQueryIOは、入力PCollectionに基づいてデフォルトの挿入方法を選択します。methodを使用して、必要な挿入方法を指定できます。使用可能な方法とその制限事項の一覧については、WriteToBigQueryを参照してください。

BigQueryIOは、次の状況でロードジョブを使用します。

  • BigQueryIO書き込み変換をバインドされたPCollectionに適用する場合。
  • BigQueryIO.write().withMethod(FILE_LOADS)を使用して、挿入方法としてロードジョブを指定する場合。
  • BigQueryIO書き込み変換をバインドされたPCollectionに適用する場合。
  • WriteToBigQuery(method='FILE_LOADS')を使用して、挿入方法としてロードジョブを指定する場合。

注:ストリーミングパイプラインでバッチロードを使用する場合

ロードジョブを開始するためのトリガー頻度を指定するには、withTriggeringFrequencyを使用する必要があります。パイプラインがBigQueryロードジョブのクォータ制限を超えないように、頻度の設定には注意してください。

withNumFileShardsを使用して書き込まれるファイルシャードの数を明示的に設定するか、withAutoShardingを使用して動的シャーディングを有効にするか(2.29.0リリース以降)、シャードの数がランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。

ロードジョブを開始するためのトリガー頻度を指定するには、triggering_frequencyを使用する必要があります。パイプラインがBigQueryロードジョブのクォータ制限を超えないように、頻度の設定には注意してください。

with_auto_sharding=Trueを設定して、動的シャーディングを有効にできます(2.29.0リリース以降)。シャードの数はランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。

BigQueryIOは、次の状況でストリーミング挿入を使用します。

  • BigQueryIO書き込み変換をアンバインドされたPCollectionに適用する場合。
  • BigQueryIO.write().withMethod(STREAMING_INSERTS)を使用して、挿入方法としてストリーミング挿入を指定する場合。
  • BigQueryIO書き込み変換をアンバインドされたPCollectionに適用する場合。
  • WriteToBigQuery(method='STREAMING_INSERTS')を使用して、挿入方法としてストリーミング挿入を指定する場合。

注:ストリーミング挿入では、デフォルトでBigQueryのベストエフォート重複排除メカニズムが有効になります。ignoreInsertIdsを設定することで無効にできます。重複排除が有効になっている場合と無効になっている場合では、クォータ制限が異なります。

ストリーミング挿入では、各テーブルの宛先にデフォルトのシャーディングが適用されます。withAutoSharding(2.28.0リリース以降)を使用して動的シャーディングを有効にすると、シャードの数がランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。

注:ストリーミング挿入では、デフォルトでBigQueryのベストエフォート重複排除メカニズムが有効になります。ignore_insert_ids=Trueを設定することで無効にできます。重複排除が有効になっている場合と無効になっている場合では、クォータ制限が異なります。

ストリーミング挿入では、各テーブルの宛先にデフォルトのシャーディングが適用されます。with_auto_sharding=True(2.29.0リリース以降)を設定して、動的シャーディングを有効にできます。シャードの数はランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。

テーブルへの書き込み

BigQueryテーブルに書き込むには、writeTableRowsまたはwrite変換を適用します。

BigQueryテーブルに書き込むには、WriteToBigQuery変換を適用します。WriteToBigQueryは、バッチモードとストリーミングモードの両方をサポートしています。変換は、辞書のPCollectionに適用する必要があります。通常、ParDoなどの別の変換を使用して、出力データをコレクションにフォーマットする必要があります。

次の例では、引用符を含むこのPCollectionを使用します。

writeTableRowsメソッドは、BigQuery TableRowオブジェクトのPCollectionをBigQueryテーブルに書き込みます。PCollectionの各要素は、テーブル内の1つの行を表します。この例では、writeTableRowsを使用して、PCollection<TableRow>に要素を書き込みます。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;

class BigQueryWriteToTable {
  public static void writeToTable(
      String project,
      String dataset,
      String table,
      TableSchema schema,
      PCollection<TableRow> rows) {

    // String project = "my-project-id";
    // String dataset = "my_bigquery_dataset_id";
    // String table = "my_bigquery_table_id";

    // TableSchema schema = new TableSchema().setFields(Arrays.asList(...));

    // Pipeline pipeline = Pipeline.create();
    // PCollection<TableRow> rows = ...

    rows.apply(
        "Write to BigQuery",
        BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s", project, dataset, table))
            .withSchema(schema)
            // For CreateDisposition:
            // - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
            // required
            // - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            // For WriteDisposition:
            // - WRITE_EMPTY (default): raises an error if the table is not empty
            // - WRITE_APPEND: appends new rows to existing rows
            // - WRITE_TRUNCATE: deletes the existing rows before writing
            .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    // pipeline.run().waitUntilFinish();
  }
}
quotes = pipeline | beam.Create([
    {
        'source': 'Mahatma Gandhi', 'quote': 'My life is my message.'
    },
    {
        'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'."
    },
])

次のコード例は、WriteToBigQuery変換を適用して、辞書のPCollectionをBigQueryテーブルに書き込む方法を示しています。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。

quotes | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

write変換は、カスタム型オブジェクトのPCollectionをBigQueryテーブルに書き込みます。.withFormatFunction(SerializableFunction)を使用して、PCollectionの各入力要素をTableRowに変換するフォーマット関数を提供します。この例では、writeを使用してPCollection<String>を書き込みます。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。

quotes.apply(
    BigQueryIO.<Quote>write()
        .to(tableSpec)
        .withSchema(tableSchema)
        .withFormatFunction(
            (Quote elem) ->
                new TableRow().set("source", elem.source).set("quote", elem.quote))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

ストリーミング挿入を使用する場合、失敗したレコードをどうするかを決定できます。再試行を続けるか、WriteResult.getFailedInserts()メソッドを使用して、失敗したレコードを別のPCollectionで返すことができます。

Storage Write API の使用

Beam SDK for Javaのバージョン2.36.0以降では、BigQueryIOコネクタからBigQuery Storage Write APIを使用できます。

また、Beam SDK for Pythonのバージョン2.47.0以降では、SDKはBigQuery Storage Write APIをサポートしています。

Python SDK用のBigQuery Storage Write APIには、現在、サポートされているデータ型にいくつかの制限があります。このメソッドはクロスランゲージ変換を利用するため、クロスランゲージ境界でサポートされている型に制限されます。たとえば、TIMESTAMP BigQuery型を書き込むには、apache_beam.utils.timestamp.Timestampが必要です。また、一部の型(DATETIMEなど)はまだサポートされていません。詳細については、完全な型マッピングを参照してください。

注:ソースコードからStorage Write APIを使用してWriteToBigQueryを実行する場合は、./gradlew :sdks:java:io:google-cloud-platform:expansion-service:buildを実行してexpansion-service jarをビルドする必要があります。リリースされたBeam SDKから実行する場合は、jarが既に含まれています。

正確に 1 回のセマンティクス

Storage Write APIを使用してBigQueryに書き込むには、withMethodMethod.STORAGE_WRITE_APIに設定します。次に、Storage Write APIと正確に一度のセマンティクスを使用してBigQueryに書き込む変換の例を示します。

WriteResult writeResult = rows.apply("Save Rows to BigQuery",
BigQueryIO.writeTableRows()
        .to(options.getFullyQualifiedTableName())
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withMethod(Method.STORAGE_WRITE_API)
);
quotes | "WriteTableWithStorageAPI" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)

パイプラインのすべてのBigQueryシンクがデフォルトでStorage Write APIを使用するようにBigQueryIOの動作を変更する場合は、UseStorageWriteApiオプションを設定します。

パイプラインでテーブルを作成する必要がある場合(存在しない場合に、作成処理をCREATE_IF_NEEDEDとして指定した場合)、テーブルスキーマを提供する必要があります。APIは、スキーマを使用してデータを検証し、バイナリプロトコルに変換します。

TableSchema schema = new TableSchema().setFields(
        List.of(
            new TableFieldSchema()
                .setName("request_ts")
                .setType("TIMESTAMP")
                .setMode("REQUIRED"),
            new TableFieldSchema()
                .setName("user_name")
                .setType("STRING")
                .setMode("REQUIRED")));
table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

ストリーミングパイプラインの場合、ストリーム数とトリガー頻度の2つの追加パラメータを設定する必要があります。

BigQueryIO.writeTableRows()
        // ...
        .withTriggeringFrequency(Duration.standardSeconds(5))
        .withNumStorageWriteApiStreams(3)
);
# The Python SDK doesn't currently support setting the number of write streams
quotes | "StorageWriteAPIWithFrequency" >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API,
    triggering_frequency=5)

ストリーム数は、BigQueryIO Write変換の並列処理を定義し、パイプラインが使用するStorage Write APIストリームの数にほぼ対応します。withNumStorageWriteApiStreamsを介して変換で明示的に設定するか、BigQueryOptionsで定義されているように、numStorageWriteApiStreamsオプションをパイプラインに提供できます。これはストリーミングパイプラインでのみサポートされることに注意してください。

トリガー頻度は、BigQueryでデータをクエリできるまでの時間を決定します。withTriggeringFrequencyを介して明示的に設定するか、storageWriteApiTriggeringFrequencySecオプションを設定して秒数を指定できます。

これらの2つのパラメータの組み合わせは、BigQueryIOがStorage Write APIを呼び出す前に作成する行のバッチサイズに影響を与えます。頻度を高く設定しすぎると、バッチが小さくなり、パフォーマンスに影響を与える可能性があります。一般的なルールとして、単一のストリームは少なくとも1秒あたり1MBのスループットを処理できる必要があります。排他的なストリームを作成することはBigQueryサービスにとってコストのかかる操作であるため、ユースケースに必要なストリーム数のみを使用する必要があります。ほとんどのパイプラインでは、1桁秒単位でのトリガー頻度が適切な選択です。

ストリーミング挿入と同様に、STORAGE_WRITE_APIは、BigQueryへの書き込みに使用する並列ストリーム数を動的に決定することをサポートしています(2.42.0以降)。withAutoShardingを使用して明示的に有効にできます。

STORAGE_WRITE_APIは、numStorageWriteApiStreamsが0または未指定の場合、デフォルトで動的シャーディングを使用します。

STORAGE_WRITE_APIを使用する場合、WriteResult.getFailedStorageApiInsertsによって返されるPCollectionには、Storage Write APIシンクへの書き込みに失敗した行が含まれます。

少なくとも 1 回のセマンティクス

ターゲットテーブルに重複レコードが含まれる可能性があっても構わない場合は、STORAGE_API_AT_LEAST_ONCEメソッドを使用できます。このメソッドは、STORAGE_WRITE_APIメソッドの正確に1回のセマンティクスを提供するために必要な、書き込まれるレコードをシャッフルストレージに保持しません。したがって、ほとんどのパイプラインでは、このメソッドを使用する方が安価であり、レイテンシが低くなることがよくあります。STORAGE_API_AT_LEAST_ONCEを使用する場合、ストリーム数を指定する必要はなく、トリガー頻度を指定することもできません。

自動シャーディングはSTORAGE_API_AT_LEAST_ONCEには適用されません。

STORAGE_API_AT_LEAST_ONCEを使用する場合、WriteResult.getFailedStorageApiInsertsによって返されるPCollectionには、Storage Write APIシンクへの書き込みに失敗した行が含まれます。

クォータ

Storage Write APIを使用する前に、BigQuery Storage Write APIの割り当てに注意してください。

動的な宛先の使用

動的宛先機能を使用すると、PCollection内の要素を異なるBigQueryテーブルに、場合によっては異なるスキーマで書き込むことができます。

動的宛先機能は、ユーザー定義の宛先キーでユーザータイプをグループ化し、キーを使用して宛先テーブルやスキーマを計算し、各グループの要素を計算された宛先に書き込みます。

さらに、TableRowへのマッピング関数を持つ独自の型を書き込むこともでき、すべてのDynamicDestinationsメソッドでサイド入力を使用できます。

動的宛先を使用するには、DynamicDestinationsオブジェクトを作成し、次のメソッドを実装する必要があります。

  • getDestination: getTablegetSchemaが宛先キーとして使用して、宛先テーブルやスキーマを計算できるオブジェクトを返します。

  • getTable: 宛先キーのテーブル(TableDestinationオブジェクトとして)を返します。このメソッドは、一意の宛先ごとに一意のテーブルを返す必要があります。

  • getSchema: 宛先キーのテーブルスキーマ(TableSchemaオブジェクトとして)を返します。

次に、write().toDynamicDestinationsオブジェクトとともに使用します。この例では、天気データを含むPCollectionを使用し、年ごとに異なるテーブルにデータを書き込みます。

/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
  final long year;
  final long month;
  final long day;
  final double maxTemp;

  public WeatherData() {
    this.year = 0;
    this.month = 0;
    this.day = 0;
    this.maxTemp = 0.0f;
  }
  public WeatherData(long year, long month, long day, double maxTemp) {
    this.year = year;
    this.month = month;
    this.day = day;
    this.maxTemp = maxTemp;
  }
}
*/

PCollection<WeatherData> weatherData =
    p.apply(
        BigQueryIO.read(
                (SchemaAndRecord elem) -> {
                  GenericRecord record = elem.getRecord();
                  return new WeatherData(
                      (Long) record.get("year"),
                      (Long) record.get("month"),
                      (Long) record.get("day"),
                      (Double) record.get("max_temperature"));
                })
            .fromQuery(
                "SELECT year, month, day, max_temperature "
                    + "FROM [apache-beam-testing.samples.weather_stations] "
                    + "WHERE year BETWEEN 2007 AND 2009")
            .withCoder(AvroCoder.of(WeatherData.class)));

// We will send the weather data into different tables for every year.
weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(
            new DynamicDestinations<WeatherData, Long>() {
              @Override
              public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
                return elem.getValue().year;
              }

              @Override
              public TableDestination getTable(Long destination) {
                return new TableDestination(
                    new TableReference()
                        .setProjectId(writeProject)
                        .setDatasetId(writeDataset)
                        .setTableId(writeTable + "_" + destination),
                    "Table for year " + destination);
              }

              @Override
              public TableSchema getSchema(Long destination) {
                return new TableSchema()
                    .setFields(
                        ImmutableList.of(
                            new TableFieldSchema()
                                .setName("year")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("month")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("day")
                                .setType("INTEGER")
                                .setMode("REQUIRED"),
                            new TableFieldSchema()
                                .setName("maxTemp")
                                .setType("FLOAT")
                                .setMode("NULLABLE")));
              }
            })
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
    pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
                                                  ('Obi Wan Kenobi', True)]))

def table_fn(element, fictional_characters):
  if element in fictional_characters:
    return 'my_dataset.fictional_quotes'
  else:
    return 'my_dataset.real_quotes'

quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema,
    table_side_inputs=(fictional_characters_view, ),
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

時間パーティショニングの使用

BigQueryの時間パーティショニングは、テーブルをより小さなパーティションに分割します。これはパーティション分割テーブルと呼ばれます。パーティション分割テーブルを使用すると、データの管理とクエリが簡単になります。

BigQueryの時間パーティショニングを使用するには、次の2つの方法のいずれかを使用します。

  • withTimePartitioning: このメソッドはTimePartitioningクラスを受け取り、単一のテーブルに書き込む場合にのみ使用できます。

  • withJsonTimePartitioning: このメソッドはwithTimePartitioningと同じですが、JSONシリアル化されたStringオブジェクトを受け取ります。

この例では、1日あたり1つのパーティションが生成されます。

weatherData.apply(
    BigQueryIO.<WeatherData>write()
        .to(tableSpec + "_partitioning")
        .withSchema(tableSchema)
        .withFormatFunction(
            (WeatherData elem) ->
                new TableRow()
                    .set("year", elem.year)
                    .set("month", elem.month)
                    .set("day", elem.day)
                    .set("maxTemp", elem.maxTemp))
        // NOTE: an existing table without time partitioning set up will not work
        .withTimePartitioning(new TimePartitioning().setType("DAY"))
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
quotes | 'WriteWithTimePartitioning' >> beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    additional_bq_parameters={'timePartitioning': {
        'type': 'HOUR'
    }})

制限事項

BigQueryIOには現在、次の制限があります。

  1. BigQueryの書き込み完了をパイプラインの他のステップと順序付けることはできません。

  2. Beam SDK for Pythonを使用している場合、非常に大きなデータセットを書き込むと、インポートサイズの割り当ての問題が発生する可能性があります。回避策として、データセットをパーティション分割(たとえば、BeamのPartition変換を使用)して、複数のBigQueryテーブルに書き込むことができます。Beam SDK for Javaには、データセットを自動的にパーティション分割するため、この制限はありません。

  3. BigQueryにデータをロードすると、これらの制限が適用されます。デフォルトでは、BigQueryはデータのロードにスロットの共有プールを使用します。これは、利用可能な容量が保証されず、スロットが利用可能になるまでロードがキューに入れられる可能性があることを意味します。スロットが6時間以内に利用可能にならない場合、BigQueryによって設定された制限によりロードは失敗します。この状況を回避するために、BigQuery予約を使用することを強くお勧めします。これにより、ロードがキューに入れられたり、容量の問題で失敗したりすることがなくなります。

その他の例

Beamのサンプルディレクトリには、BigQueryを使用する追加の例があります。

Java クックブックの例

これらの例は、Javaのcookbook examplesディレクトリからのものです。

Java の完全な例

これらの例は、Javaのcomplete examplesディレクトリからのものです。

Python クックブックの例

これらの例は、Pythonのcookbook examplesディレクトリからのものです。