Google BigQuery I/O コネクタ
- Java SDK
- Python SDK
Beam SDK には、Google BigQuery テーブルからデータを読み書きできる組み込みの変換が含まれています。
開始する前に
BigQueryIO を使用するには、Maven アーティファクトの依存関係を pom.xml
ファイルに追加します。
追加リソース
BigQueryIO を使用するには、pip install apache-beam[gcp]
を実行して Google Cloud Platform の依存関係をインストールする必要があります。
追加リソース
BigQuery の基本
テーブル名
BigQuery テーブルから読み書きするには、完全修飾された BigQuery テーブル名 (たとえば、bigquery-public-data:github_repos.sample_contents
) を指定する必要があります。完全修飾された BigQuery テーブル名は、次の 3 つの部分で構成されています。
- プロジェクト ID: Google Cloud プロジェクトの ID。デフォルト値は、パイプラインオプションオブジェクトから取得されます。
- データセット ID: BigQuery データセット ID。これは、特定の Cloud プロジェクト内で一意です。
- テーブル ID: BigQuery テーブル ID。これは、特定のデータセット内で一意です。
テーブル名には、時間パーティション化されたテーブルを使用している場合、テーブルデコレータを含めることもできます。
BigQuery テーブルを指定するには、テーブルの完全修飾名を文字列として使用するか、TableReference TableReference オブジェクトを使用できます。
文字列の使用
文字列でテーブルを指定するには、形式 [project_id]:[dataset_id].[table_id]
を使用して、完全修飾された BigQuery テーブル名を指定します。
project_id
を省略して、[dataset_id].[table_id]
形式を使用することもできます。プロジェクト ID を省略すると、Beam は、パイプラインオプション パイプラインオプション からデフォルトのプロジェクト ID を使用します。
TableReference の使用
TableReference
でテーブルを指定するには、BigQuery テーブル名の 3 つの部分を使用して、新しい TableReference
を作成します。
Java 用の Beam SDK は、完全修飾された BigQuery テーブル名を含む文字列から TableReference
オブジェクトを構築する parseTableSpec
ヘルパーメソッドも提供しています。ただし、BigQueryIO 変換の静的ファクトリメソッドは、テーブル名を文字列として受け入れ、TableReference
オブジェクトを構築します。
テーブル行
BigQueryIO の読み取りおよび書き込み変換は、データの PCollection
を生成および消費します。PCollection
内の各要素は、テーブル内の単一行を表します。
スキーマ
BigQuery に書き込む場合、作成処理で CREATE_NEVER
を指定しない限り、書き込む先のテーブルのテーブルスキーマを指定する必要があります。テーブルスキーマの作成では、スキーマについて詳しく説明します。
データ型
BigQuery は、STRING、BYTES、INTEGER、FLOAT、NUMERIC、BOOLEAN、TIMESTAMP、DATE、TIME、DATETIME、GEOGRAPHY のデータ型をサポートしています。Google 標準 SQL データ型の概要については、データ型を参照してください。BigQueryIO では、これらのすべてのデータ型を使用できます。次の例は、BigQuery からの読み取りおよび書き込み時に使用されるデータ型の正しい形式を示しています。
import com.google.api.services.bigquery.model.TableRow;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.Base64;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class BigQueryTableRowCreate {
public static TableRow createTableRow() {
TableRow row =
new TableRow()
// To learn more about BigQuery data types:
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
.set("string_field", "UTF-8 strings are supported! 🌱🌳🌍")
.set("int64_field", 432)
.set("float64_field", 3.141592653589793)
.set("numeric_field", new BigDecimal("1234.56").toString())
.set("bool_field", true)
.set(
"bytes_field",
Base64.getEncoder()
.encodeToString("UTF-8 byte string 🌱🌳🌍".getBytes(StandardCharsets.UTF_8)))
// To learn more about date formatting:
// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/time/format/DateTimeFormatter.html
.set("date_field", LocalDate.parse("2020-03-19").toString()) // ISO_LOCAL_DATE
.set(
"datetime_field",
LocalDateTime.parse("2020-03-19T20:41:25.123").toString()) // ISO_LOCAL_DATE_TIME
.set("time_field", LocalTime.parse("20:41:25.123").toString()) // ISO_LOCAL_TIME
.set(
"timestamp_field",
Instant.parse("2020-03-20T03:41:42.123Z").toString()) // ISO_INSTANT
// To learn more about the geography Well-Known Text (WKT) format:
// https://en.wikipedia.org/wiki/Well-known_text_representation_of_geometry
.set("geography_field", "POINT(30 10)")
// An array has its mode set to REPEATED.
.set("array_field", Arrays.asList(1, 2, 3, 4))
// Any class can be written as a STRUCT as long as all the fields in the
// schema are present and they are encoded correctly as BigQuery types.
.set(
"struct_field",
Stream.of(
new SimpleEntry<>("string_value", "Text 🌱🌳🌍"),
new SimpleEntry<>("int64_value", "42"))
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)));
return row;
}
}
bigquery_data = [{
'string': 'abc',
'bytes': base64.b64encode(b'\xab\xac'),
'integer': 5,
'float': 0.5,
'numeric': Decimal('5'),
'boolean': True,
'timestamp': '2018-12-31 12:44:31.744957 UTC',
'date': '2018-12-31',
'time': '12:44:31',
'datetime': '2018-12-31T12:44:31',
'geography': 'POINT(30 10)'
}]
Beam 2.7.0 以降、NUMERIC データ型がサポートされています。このデータ型は、高精度の 10 進数(精度 38 桁、スケール 9 桁)をサポートします。GEOGRAPHY データ型は、Well-Known Text (https://en.wikipedia.org/wiki/Well-known_text を参照) 形式で、BigQuery との読み書きを行います。BigQuery IO では、BigQuery に書き込む際に BYTES データ型の値を base64 エンコードする必要があります。BigQuery からバイトを読み込むと、base64 エンコードされた文字列として返されます。
Beam 2.7.0 以降、NUMERIC データ型がサポートされています。このデータ型は、高精度の 10 進数(精度 38 桁、スケール 9 桁)をサポートします。GEOGRAPHY データ型は、Well-Known Text (https://en.wikipedia.org/wiki/Well-known_text を参照) 形式で、BigQuery との読み書きを行います。BigQuery IO では、BigQuery に書き込む際に BYTES データ型の値を base64 エンコードする必要があります。BigQuery からバイトを読み込むと、base64 エンコードされたバイトとして返されます。
BigQuery からの読み取り
BigQueryIO を使用すると、BigQuery テーブルから読み込んだり、SQL クエリを実行して結果を読み込んだりできます。デフォルトでは、BigQueryIO の読み取り変換を適用すると、Beam は BigQuery エクスポート リクエスト を呼び出します。ただし、Java 用 Beam SDK は、BigQuery Storage Read API を使用して BigQuery ストレージから直接読み込むこともサポートしています。詳細については、Storage Read API の使用 を参照してください。
Beam による BigQuery API の使用には、BigQuery の 割り当て および 料金 ポリシーが適用されます。
Java 用 Beam SDK には、BigQueryIO の読み取りメソッドが 2 つあります。これらのメソッドはいずれも、テーブルから読み込んだり、クエリ文字列を使用してフィールドを読み込んだりできます。
read(SerializableFunction)
は Avro 形式のレコードを読み込み、指定された解析関数を使用してカスタム型オブジェクトのPCollection
に解析します。PCollection
内の各要素は、テーブル内の単一行を表します。クエリ文字列を使用した読み取りのコード例では、read(SerializableFunction)
の使用方法を示しています。readTableRows
は BigQuery のTableRow
オブジェクトのPCollection
を返します。PCollection
内の各要素は、テーブル内の単一行を表します。TableRow
オブジェクトの整数値は、BigQuery のエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。このメソッドは便利ですが、read(SerializableFunction)
と比較してパフォーマンスが 2~3 倍遅くなる可能性があります。テーブルからの読み取りのコード例では、readTableRows
の使用方法を示しています。
注: BigQueryIO.read()
は Beam SDK 2.2.0 以降で非推奨になりました。代わりに、Avro GenericRecord
から BigQuery 行をカスタム型に解析するには read(SerializableFunction<SchemaAndRecord, T>)
を使用するか、JSON TableRow
オブジェクトに解析するには readTableRows()
を使用してください。
Python 用 Beam SDK を使用して BigQuery テーブルから読み取るには、ReadFromBigQuery
変換を適用します。ReadFromBigQuery
はディクショナリの PCollection
を返します。PCollection
内の各要素は、テーブル内の単一行を表します。TableRow
オブジェクトの整数値は、BigQuery のエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。
注: BigQuerySource()
は Beam SDK 2.25.0 以降で非推奨になりました。2.25.0 より前のバージョンでは、Beam SDK を使用して BigQuery テーブルから読み取るには、BigQuerySource
で Read
変換を適用します。例: beam.io.Read(beam.io.BigQuerySource(table_spec))
。
テーブルからの読み取り
BigQuery テーブル全体を読み取るには、BigQuery テーブル名を持つ from
メソッドを使用します。この例では readTableRows
を使用しています。
BigQuery テーブル全体を読み取るには、BigQuery テーブル名を持つ table
パラメーターを使用します。
次のコードは、気象観測所のデータを含むテーブル全体を読み取り、max_temperature
列を抽出します。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTable {
public static PCollection<MyData> readFromTable(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows().from(String.format("%s:%s.%s", project, dataset, table)))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
クエリ文字列を使用した読み取り
テーブル全体を読み取る必要がない場合は、fromQuery
メソッドでクエリ文字列を指定できます。
テーブル全体を読み取る必要がない場合は、query
パラメーターを指定して ReadFromBigQuery
にクエリ文字列を指定できます。
次のコードは、SQL クエリを使用して max_temperature
列のみを読み取ります。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQuery {
public static PCollection<MyData> readFromQuery(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery query",
BigQueryIO.readTableRows()
.fromQuery(String.format("SELECT * FROM `%s.%s.%s`", project, dataset, table))
.usingStandardSql())
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
次の例に示すように、BigQuery の標準 SQL 言語をクエリ文字列で使用することもできます。
max_temperatures = (
pipeline
| 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
query='SELECT max_temperature FROM '\
'`clouddataflow-readonly.samples.weather_stations`',
use_standard_sql=True)
# Each row is a dictionary where the keys are the BigQuery columns
| beam.Map(lambda elem: elem['max_temperature']))
クエリ実行プロジェクト
デフォルトでは、パイプラインはパイプラインに関連付けられた Google Cloud プロジェクトでクエリを実行します(Dataflow ランナーの場合は、パイプラインが実行されるプロジェクトです)。クエリ実行プロジェクトがパイプライン プロジェクトと異なる必要がある場合があります。Java SDK を使用している場合は、パイプライン オプション "bigQueryProject" を目的の Google Cloud プロジェクト ID に設定することで、クエリ実行プロジェクトを定義できます。
Storage Read API の使用
BigQuery Storage API を使用すると、BigQuery ストレージ内のテーブルに直接アクセスでき、列の選択や述語フィルターのプッシュダウンなどの機能をサポートしているため、より効率的なパイプライン実行が可能になります。
Java 用 Beam SDK は、BigQuery から読み取る際に BigQuery Storage API の使用をサポートしています。2.25.0 より前の SDK バージョンでは、BigQuery Storage API が 実験的な機能としてサポートされており、GA 前の BigQuery Storage API サーフェスが使用されています。BigQuery Storage API を使用する呼び出し元は、SDK バージョン 2.25.0 以降を使用するようにパイプラインを移行する必要があります。
Python 用 Beam SDK は、BigQuery Storage API をサポートしています。ReadFromBigQuery
にパラメーターとして method=DIRECT_READ
を渡すことで有効にできます。
コードの更新
テーブルから読み取る場合は、次のメソッドを使用してください。
- 必須: 読み取り操作に BigQuery Storage API を使用するには、withMethod(Method.DIRECT_READ) を指定します。
- オプション: 列のプロジェクションや列のフィルタリングなどの機能を使用するには、それぞれ withSelectedFields および withRowRestriction を指定する必要があります。
次のコード スニペットは、テーブルから読み取ります。この例は、BigQueryTornadoes の例からのものです。例の読み取りメソッド オプションが DIRECT_READ
に設定されている場合、パイプラインは BigQuery Storage API と列のプロジェクションを使用して、BigQuery テーブルから気象データの公開サンプルを読み取ります。GitHub で完全なソース コードを参照できます。
import java.util.Arrays;
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromTableWithBigQueryStorageAPI {
public static PCollection<MyData> readFromTableWithBigQueryStorageAPI(
String project, String dataset, String table, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.from(String.format("%s:%s.%s", project, dataset, table))
.withMethod(Method.DIRECT_READ)
.withSelectedFields(
Arrays.asList(
"string_field",
"int64_field",
"float64_field",
"numeric_field",
"bool_field",
"bytes_field",
"date_field",
"datetime_field",
"time_field",
"timestamp_field",
"geography_field",
"array_field",
"struct_field")))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
次のコード スニペットは、クエリ文字列を使用して読み取ります。
import org.apache.beam.examples.snippets.transforms.io.gcp.bigquery.BigQueryMyData.MyData;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
class BigQueryReadFromQueryWithBigQueryStorageAPI {
public static PCollection<MyData> readFromQueryWithBigQueryStorageAPI(
String project, String dataset, String table, String query, Pipeline pipeline) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// Pipeline pipeline = Pipeline.create();
/*
String query = String.format("SELECT\n" +
" string_field,\n" +
" int64_field,\n" +
" float64_field,\n" +
" numeric_field,\n" +
" bool_field,\n" +
" bytes_field,\n" +
" date_field,\n" +
" datetime_field,\n" +
" time_field,\n" +
" timestamp_field,\n" +
" geography_field,\n" +
" array_field,\n" +
" struct_field\n" +
"FROM\n" +
" `%s:%s.%s`", project, dataset, table)
*/
PCollection<MyData> rows =
pipeline
.apply(
"Read from BigQuery table",
BigQueryIO.readTableRows()
.fromQuery(query)
.usingStandardSql()
.withMethod(Method.DIRECT_READ))
.apply(
"TableRows to MyData",
MapElements.into(TypeDescriptor.of(MyData.class)).via(MyData::fromTableRow));
return rows;
}
}
BigQuery への書き込み
BigQueryIO を使用すると、BigQuery テーブルに書き込むことができます。Java 用 Beam SDK を使用している場合は、異なる行を異なるテーブルに書き込むことができます。Java 用 Beam SDK は、BigQuery Storage Write API を使用して BigQuery ストレージに直接書き込むこともサポートしています。詳細については、Storage Write API の使用 を参照してください。
BigQueryIO の書き込み変換は、BigQuery の 割り当て および 料金 ポリシーが適用される API を使用します。
書き込み変換を適用する場合は、宛先テーブルに関する次の情報を提供する必要があります。
- テーブル名。
- 宛先テーブルの作成処理。作成処理は、宛先テーブルが存在する必要があるか、書き込み操作で作成できるかを指定します。
- 宛先テーブルの書き込み処理。書き込み処理は、書き込むデータが既存のテーブルを置き換えるか、既存のテーブルに行を追加するか、空のテーブルにのみ書き込むかを指定します。
さらに、書き込み操作で新しい BigQuery テーブルを作成する場合は、宛先テーブルのテーブル スキーマも指定する必要があります。
作成処理
作成処理は、宛先テーブルが存在しない場合に BigQuery の書き込み操作でテーブルを作成するかどうかを制御します。
作成処理を指定するには、.withCreateDisposition
を使用します。有効な列挙値は次のとおりです。
Write.CreateDisposition.CREATE_IF_NEEDED
: 書き込み操作で、テーブルが存在しない場合に新しいテーブルを作成する必要があることを指定します。この値を使用する場合は、withSchema
メソッドでテーブル スキーマを指定する必要があります。CREATE_IF_NEEDED
がデフォルトの動作です。Write.CreateDisposition.CREATE_NEVER
: テーブルを作成してはならないことを指定します。宛先テーブルが存在しない場合、書き込み操作は失敗します。
作成処理を指定するには、create_disposition
パラメーターを使用します。有効な列挙値は次のとおりです。
BigQueryDisposition.CREATE_IF_NEEDED
: 書き込み操作で、テーブルが存在しない場合に新しいテーブルを作成する必要があることを指定します。この値を使用する場合は、テーブル スキーマを指定する必要があります。CREATE_IF_NEEDED
がデフォルトの動作です。BigQueryDisposition.CREATE_NEVER
: テーブルを作成してはならないことを指定します。宛先テーブルが存在しない場合、書き込み操作は失敗します。
作成処理として CREATE_IF_NEEDED
を指定し、テーブル スキーマを指定しない場合、宛先テーブルが存在しないと、変換は実行時に失敗する可能性があります。
書き込み処理
書き込み処理は、BigQuery の書き込み操作を既存のテーブルにどのように適用するかを制御します。
書き込み処理を指定するには、.withWriteDisposition
を使用します。有効な列挙値は次のとおりです。
Write.WriteDisposition.WRITE_EMPTY
: 宛先テーブルが空でない場合、書き込み操作は実行時に失敗する必要があることを指定します。WRITE_EMPTY
がデフォルトの動作です。Write.WriteDisposition.WRITE_TRUNCATE
: 書き込み操作で既存のテーブルを置き換える必要があることを指定します。宛先テーブルの既存の行はすべて削除され、新しい行がテーブルに追加されます。Write.WriteDisposition.WRITE_APPEND
: 書き込み操作で、既存のテーブルの末尾に行を追加する必要があることを指定します。
書き込み処理を指定するには、write_disposition
パラメーターを使用します。有効な列挙値は次のとおりです。
BigQueryDisposition.WRITE_EMPTY
: 宛先テーブルが空でない場合、書き込み操作は実行時に失敗する必要があることを指定します。WRITE_EMPTY
がデフォルトの動作です。BigQueryDisposition.WRITE_TRUNCATE
: 書き込み操作で既存のテーブルを置き換える必要があることを指定します。宛先テーブルの既存の行はすべて削除され、新しい行がテーブルに追加されます。BigQueryDisposition.WRITE_APPEND
: 書き込み操作で、既存のテーブルの末尾に行を追加する必要があることを指定します。
WRITE_EMPTY
を使用すると、宛先テーブルが空かどうかを確認する処理が、実際の書き込み操作の前に行われる可能性があります。このチェックでは、パイプラインがテーブルへの排他的アクセスを持つことは保証されません。書き込み処理が WRITE_EMPTY
である同じ出力テーブルに書き込む 2 つの同時パイプラインが正常に開始される可能性がありますが、両方のパイプラインは後で書き込みが試行されたときに失敗する可能性があります。
テーブルスキーマの作成
BigQuery の書き込み操作で新しいテーブルを作成する場合は、スキーマ情報を提供する必要があります。スキーマには、テーブルの各フィールドに関する情報が含まれています。新しいスキーマでパイプラインを更新する場合、既存のスキーマ フィールドは同じ順序を維持する必要があります。そうしないと、パイプラインが中断し、BigQuery への書き込みに失敗します。
Java でテーブル スキーマを作成するには、TableSchema
オブジェクトを使用するか、JSON シリアル化された TableSchema
オブジェクトを含む文字列を使用できます。
Python でテーブル スキーマを作成するには、TableSchema
オブジェクトを使用するか、フィールドのリストを定義する文字列を使用できます。単一の文字列ベースのスキーマは、ネストされたフィールド、繰り返しフィールド、またはフィールドの BigQuery モードの指定をサポートしていません(モードは常に NULLABLE
に設定されます)。
TableSchema の使用
TableSchema
オブジェクトとしてテーブルスキーマを作成および使用するには、次の手順に従います。
TableFieldSchema
オブジェクトのリストを作成します。各TableFieldSchema
オブジェクトは、テーブル内のフィールドを表します。TableSchema
オブジェクトを作成し、setFields
メソッドを使用してフィールドのリストを指定します。書き込み変換を適用するときに、
withSchema
メソッドを使用してテーブルスキーマを指定します。
TableSchema
オブジェクトを作成します。テーブルの各フィールドに対して
TableFieldSchema
オブジェクトを作成して追加します。書き込み変換を適用するときに、
schema
パラメータを使用してテーブルスキーマを指定します。パラメータの値をTableSchema
オブジェクトに設定します。
次のコード例は、2つの文字列型フィールド(sourceとquote)を持つテーブルのTableSchema
を作成する方法を示しています。
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
class BigQuerySchemaCreate {
public static TableSchema createSchema() {
// To learn more about BigQuery schemas:
// https://cloud.google.com/bigquery/docs/schemas
TableSchema schema =
new TableSchema()
.setFields(
Arrays.asList(
new TableFieldSchema()
.setName("string_field")
.setType("STRING")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("int64_field")
.setType("INT64")
.setMode("NULLABLE"),
new TableFieldSchema()
.setName("float64_field")
.setType("FLOAT64"), // default mode is "NULLABLE"
new TableFieldSchema().setName("numeric_field").setType("NUMERIC"),
new TableFieldSchema().setName("bool_field").setType("BOOL"),
new TableFieldSchema().setName("bytes_field").setType("BYTES"),
new TableFieldSchema().setName("date_field").setType("DATE"),
new TableFieldSchema().setName("datetime_field").setType("DATETIME"),
new TableFieldSchema().setName("time_field").setType("TIME"),
new TableFieldSchema().setName("timestamp_field").setType("TIMESTAMP"),
new TableFieldSchema().setName("geography_field").setType("GEOGRAPHY"),
new TableFieldSchema()
.setName("array_field")
.setType("INT64")
.setMode("REPEATED")
.setDescription("Setting the mode to REPEATED makes this an ARRAY<INT64>."),
new TableFieldSchema()
.setName("struct_field")
.setType("STRUCT")
.setDescription(
"A STRUCT accepts a custom data class, the fields must match the custom class fields.")
.setFields(
Arrays.asList(
new TableFieldSchema().setName("string_value").setType("STRING"),
new TableFieldSchema().setName("int64_value").setType("INT64")))));
return schema;
}
}
文字列の使用
JSONシリアライズされたTableSchema
オブジェクトを含む文字列としてテーブルスキーマを作成および使用するには、次の手順に従います。
JSONシリアライズされた
TableSchema
オブジェクトを含む文字列を作成します。書き込み変換を適用するときに、
withJsonSchema
メソッドを使用してテーブルスキーマを指定します。
文字列としてテーブルスキーマを作成および使用するには、次の手順に従います。
フィールドのリストを定義する「field1:type1,field2:type2,field3:type3」の形式のコンマ区切り文字列を作成します。typeには、フィールドのBigQuery型を指定する必要があります。
書き込み変換を適用するときに、
schema
パラメータを使用してテーブルスキーマを指定します。パラメータの値を文字列に設定します。
次の例は、前の例と同じテーブルスキーマを文字列で指定する方法を示しています。
挿入方法の設定
BigQueryIOは、BigQueryにデータを挿入する2つの方法(ロードジョブとストリーミング挿入)をサポートしています。各挿入方法には、コスト、クォータ、データ整合性の異なるトレードオフがあります。これらのトレードオフの詳細については、BigQueryドキュメントのさまざまなデータ取り込みオプション(特に、ロードジョブとストリーミング挿入)を参照してください。
BigQueryIOは、入力PCollection
に基づいてデフォルトの挿入方法を選択します。withMethod
を使用して、必要な挿入方法を指定できます。使用可能な方法とその制限事項の一覧については、Write.Method
を参照してください。
BigQueryIOは、入力PCollection
に基づいてデフォルトの挿入方法を選択します。method
を使用して、必要な挿入方法を指定できます。使用可能な方法とその制限事項の一覧については、WriteToBigQuery
を参照してください。
BigQueryIOは、次の状況でロードジョブを使用します。
- BigQueryIO書き込み変換をバインドされた
PCollection
に適用する場合。 BigQueryIO.write().withMethod(FILE_LOADS)
を使用して、挿入方法としてロードジョブを指定する場合。
- BigQueryIO書き込み変換をバインドされた
PCollection
に適用する場合。 WriteToBigQuery(method='FILE_LOADS')
を使用して、挿入方法としてロードジョブを指定する場合。
注:ストリーミングパイプラインでバッチロードを使用する場合
ロードジョブを開始するためのトリガー頻度を指定するには、withTriggeringFrequency
を使用する必要があります。パイプラインがBigQueryロードジョブのクォータ制限を超えないように、頻度の設定には注意してください。
withNumFileShards
を使用して書き込まれるファイルシャードの数を明示的に設定するか、withAutoSharding
を使用して動的シャーディングを有効にするか(2.29.0リリース以降)、シャードの数がランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。
ロードジョブを開始するためのトリガー頻度を指定するには、triggering_frequency
を使用する必要があります。パイプラインがBigQueryロードジョブのクォータ制限を超えないように、頻度の設定には注意してください。
with_auto_sharding=True
を設定して、動的シャーディングを有効にできます(2.29.0リリース以降)。シャードの数はランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。
BigQueryIOは、次の状況でストリーミング挿入を使用します。
- BigQueryIO書き込み変換をアンバインドされた
PCollection
に適用する場合。 BigQueryIO.write().withMethod(STREAMING_INSERTS)
を使用して、挿入方法としてストリーミング挿入を指定する場合。
- BigQueryIO書き込み変換をアンバインドされた
PCollection
に適用する場合。 WriteToBigQuery(method='STREAMING_INSERTS')
を使用して、挿入方法としてストリーミング挿入を指定する場合。
注:ストリーミング挿入では、デフォルトでBigQueryのベストエフォート重複排除メカニズムが有効になります。ignoreInsertIds
を設定することで無効にできます。重複排除が有効になっている場合と無効になっている場合では、クォータ制限が異なります。
ストリーミング挿入では、各テーブルの宛先にデフォルトのシャーディングが適用されます。withAutoSharding
(2.28.0リリース以降)を使用して動的シャーディングを有効にすると、シャードの数がランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。
注:ストリーミング挿入では、デフォルトでBigQueryのベストエフォート重複排除メカニズムが有効になります。ignore_insert_ids=True
を設定することで無効にできます。重複排除が有効になっている場合と無効になっている場合では、クォータ制限が異なります。
ストリーミング挿入では、各テーブルの宛先にデフォルトのシャーディングが適用されます。with_auto_sharding=True
(2.29.0リリース以降)を設定して、動的シャーディングを有効にできます。シャードの数はランタイム時に決定および変更される場合があります。シャーディングの動作は、ランナーによって異なります。
テーブルへの書き込み
BigQueryテーブルに書き込むには、writeTableRows
またはwrite
変換を適用します。
BigQueryテーブルに書き込むには、WriteToBigQuery
変換を適用します。WriteToBigQuery
は、バッチモードとストリーミングモードの両方をサポートしています。変換は、辞書のPCollection
に適用する必要があります。通常、ParDo
などの別の変換を使用して、出力データをコレクションにフォーマットする必要があります。
次の例では、引用符を含むこのPCollection
を使用します。
writeTableRows
メソッドは、BigQuery TableRow
オブジェクトのPCollection
をBigQueryテーブルに書き込みます。PCollection
の各要素は、テーブル内の1つの行を表します。この例では、writeTableRows
を使用して、PCollection<TableRow>
に要素を書き込みます。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.values.PCollection;
class BigQueryWriteToTable {
public static void writeToTable(
String project,
String dataset,
String table,
TableSchema schema,
PCollection<TableRow> rows) {
// String project = "my-project-id";
// String dataset = "my_bigquery_dataset_id";
// String table = "my_bigquery_table_id";
// TableSchema schema = new TableSchema().setFields(Arrays.asList(...));
// Pipeline pipeline = Pipeline.create();
// PCollection<TableRow> rows = ...
rows.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.to(String.format("%s:%s.%s", project, dataset, table))
.withSchema(schema)
// For CreateDisposition:
// - CREATE_IF_NEEDED (default): creates the table if it doesn't exist, a schema is
// required
// - CREATE_NEVER: raises an error if the table doesn't exist, a schema is not needed
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
// For WriteDisposition:
// - WRITE_EMPTY (default): raises an error if the table is not empty
// - WRITE_APPEND: appends new rows to existing rows
// - WRITE_TRUNCATE: deletes the existing rows before writing
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
// pipeline.run().waitUntilFinish();
}
}
次のコード例は、WriteToBigQuery
変換を適用して、辞書のPCollection
をBigQueryテーブルに書き込む方法を示しています。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。
write
変換は、カスタム型オブジェクトのPCollection
をBigQueryテーブルに書き込みます。.withFormatFunction(SerializableFunction)
を使用して、PCollection
の各入力要素をTableRow
に変換するフォーマット関数を提供します。この例では、write
を使用してPCollection<String>
を書き込みます。書き込み操作は、必要な場合にテーブルを作成します。テーブルが既に存在する場合は、置き換えられます。
quotes.apply(
BigQueryIO.<Quote>write()
.to(tableSpec)
.withSchema(tableSchema)
.withFormatFunction(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
ストリーミング挿入を使用する場合、失敗したレコードをどうするかを決定できます。再試行を続けるか、WriteResult.getFailedInserts()
メソッドを使用して、失敗したレコードを別のPCollection
で返すことができます。
Storage Write API の使用
Beam SDK for Javaのバージョン2.36.0以降では、BigQueryIOコネクタからBigQuery Storage Write APIを使用できます。
また、Beam SDK for Pythonのバージョン2.47.0以降では、SDKはBigQuery Storage Write APIをサポートしています。
Python SDK用のBigQuery Storage Write APIには、現在、サポートされているデータ型にいくつかの制限があります。このメソッドはクロスランゲージ変換を利用するため、クロスランゲージ境界でサポートされている型に制限されます。たとえば、TIMESTAMP
BigQuery型を書き込むには、apache_beam.utils.timestamp.Timestamp
が必要です。また、一部の型(DATETIME
など)はまだサポートされていません。詳細については、完全な型マッピングを参照してください。
注:ソースコードからStorage Write APIを使用してWriteToBigQueryを実行する場合は、./gradlew :sdks:java:io:google-cloud-platform:expansion-service:build
を実行してexpansion-service jarをビルドする必要があります。リリースされたBeam SDKから実行する場合は、jarが既に含まれています。
正確に 1 回のセマンティクス
Storage Write APIを使用してBigQueryに書き込むには、withMethod
をMethod.STORAGE_WRITE_API
に設定します。次に、Storage Write APIと正確に一度のセマンティクスを使用してBigQueryに書き込む変換の例を示します。
パイプラインのすべてのBigQueryシンクがデフォルトでStorage Write APIを使用するようにBigQueryIOの動作を変更する場合は、UseStorageWriteApi
オプションを設定します。
パイプラインでテーブルを作成する必要がある場合(存在しない場合に、作成処理をCREATE_IF_NEEDED
として指定した場合)、テーブルスキーマを提供する必要があります。APIは、スキーマを使用してデータを検証し、バイナリプロトコルに変換します。
ストリーミングパイプラインの場合、ストリーム数とトリガー頻度の2つの追加パラメータを設定する必要があります。
ストリーム数は、BigQueryIO Write変換の並列処理を定義し、パイプラインが使用するStorage Write APIストリームの数にほぼ対応します。withNumStorageWriteApiStreams
を介して変換で明示的に設定するか、BigQueryOptions
で定義されているように、numStorageWriteApiStreams
オプションをパイプラインに提供できます。これはストリーミングパイプラインでのみサポートされることに注意してください。
トリガー頻度は、BigQueryでデータをクエリできるまでの時間を決定します。withTriggeringFrequency
を介して明示的に設定するか、storageWriteApiTriggeringFrequencySec
オプションを設定して秒数を指定できます。
これらの2つのパラメータの組み合わせは、BigQueryIOがStorage Write APIを呼び出す前に作成する行のバッチサイズに影響を与えます。頻度を高く設定しすぎると、バッチが小さくなり、パフォーマンスに影響を与える可能性があります。一般的なルールとして、単一のストリームは少なくとも1秒あたり1MBのスループットを処理できる必要があります。排他的なストリームを作成することはBigQueryサービスにとってコストのかかる操作であるため、ユースケースに必要なストリーム数のみを使用する必要があります。ほとんどのパイプラインでは、1桁秒単位でのトリガー頻度が適切な選択です。
ストリーミング挿入と同様に、STORAGE_WRITE_API
は、BigQueryへの書き込みに使用する並列ストリーム数を動的に決定することをサポートしています(2.42.0以降)。withAutoSharding
を使用して明示的に有効にできます。
STORAGE_WRITE_API
は、numStorageWriteApiStreams
が0または未指定の場合、デフォルトで動的シャーディングを使用します。
STORAGE_WRITE_API
を使用する場合、WriteResult.getFailedStorageApiInserts
によって返されるPCollection
には、Storage Write APIシンクへの書き込みに失敗した行が含まれます。
少なくとも 1 回のセマンティクス
ターゲットテーブルに重複レコードが含まれる可能性があっても構わない場合は、STORAGE_API_AT_LEAST_ONCE
メソッドを使用できます。このメソッドは、STORAGE_WRITE_API
メソッドの正確に1回のセマンティクスを提供するために必要な、書き込まれるレコードをシャッフルストレージに保持しません。したがって、ほとんどのパイプラインでは、このメソッドを使用する方が安価であり、レイテンシが低くなることがよくあります。STORAGE_API_AT_LEAST_ONCE
を使用する場合、ストリーム数を指定する必要はなく、トリガー頻度を指定することもできません。
自動シャーディングはSTORAGE_API_AT_LEAST_ONCE
には適用されません。
STORAGE_API_AT_LEAST_ONCE
を使用する場合、WriteResult.getFailedStorageApiInserts
によって返されるPCollection
には、Storage Write APIシンクへの書き込みに失敗した行が含まれます。
クォータ
Storage Write APIを使用する前に、BigQuery Storage Write APIの割り当てに注意してください。
動的な宛先の使用
動的宛先機能を使用すると、PCollection
内の要素を異なるBigQueryテーブルに、場合によっては異なるスキーマで書き込むことができます。
動的宛先機能は、ユーザー定義の宛先キーでユーザータイプをグループ化し、キーを使用して宛先テーブルやスキーマを計算し、各グループの要素を計算された宛先に書き込みます。
さらに、TableRow
へのマッピング関数を持つ独自の型を書き込むこともでき、すべてのDynamicDestinations
メソッドでサイド入力を使用できます。
動的宛先を使用するには、DynamicDestinations
オブジェクトを作成し、次のメソッドを実装する必要があります。
getDestination
:getTable
とgetSchema
が宛先キーとして使用して、宛先テーブルやスキーマを計算できるオブジェクトを返します。getTable
: 宛先キーのテーブル(TableDestination
オブジェクトとして)を返します。このメソッドは、一意の宛先ごとに一意のテーブルを返す必要があります。getSchema
: 宛先キーのテーブルスキーマ(TableSchema
オブジェクトとして)を返します。
次に、write().to
をDynamicDestinations
オブジェクトとともに使用します。この例では、天気データを含むPCollection
を使用し、年ごとに異なるテーブルにデータを書き込みます。
/*
@DefaultCoder(AvroCoder.class)
static class WeatherData {
final long year;
final long month;
final long day;
final double maxTemp;
public WeatherData() {
this.year = 0;
this.month = 0;
this.day = 0;
this.maxTemp = 0.0f;
}
public WeatherData(long year, long month, long day, double maxTemp) {
this.year = year;
this.month = month;
this.day = day;
this.maxTemp = maxTemp;
}
}
*/
PCollection<WeatherData> weatherData =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> {
GenericRecord record = elem.getRecord();
return new WeatherData(
(Long) record.get("year"),
(Long) record.get("month"),
(Long) record.get("day"),
(Double) record.get("max_temperature"));
})
.fromQuery(
"SELECT year, month, day, max_temperature "
+ "FROM [apache-beam-testing.samples.weather_stations] "
+ "WHERE year BETWEEN 2007 AND 2009")
.withCoder(AvroCoder.of(WeatherData.class)));
// We will send the weather data into different tables for every year.
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(
new DynamicDestinations<WeatherData, Long>() {
@Override
public Long getDestination(ValueInSingleWindow<WeatherData> elem) {
return elem.getValue().year;
}
@Override
public TableDestination getTable(Long destination) {
return new TableDestination(
new TableReference()
.setProjectId(writeProject)
.setDatasetId(writeDataset)
.setTableId(writeTable + "_" + destination),
"Table for year " + destination);
}
@Override
public TableSchema getSchema(Long destination) {
return new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("year")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("month")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("day")
.setType("INTEGER")
.setMode("REQUIRED"),
new TableFieldSchema()
.setName("maxTemp")
.setType("FLOAT")
.setMode("NULLABLE")));
}
})
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
fictional_characters_view = beam.pvalue.AsDict(
pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),
('Obi Wan Kenobi', True)]))
def table_fn(element, fictional_characters):
if element in fictional_characters:
return 'my_dataset.fictional_quotes'
else:
return 'my_dataset.real_quotes'
quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery(
table_fn,
schema=table_schema,
table_side_inputs=(fictional_characters_view, ),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
時間パーティショニングの使用
BigQueryの時間パーティショニングは、テーブルをより小さなパーティションに分割します。これはパーティション分割テーブルと呼ばれます。パーティション分割テーブルを使用すると、データの管理とクエリが簡単になります。
BigQueryの時間パーティショニングを使用するには、次の2つの方法のいずれかを使用します。
withTimePartitioning
: このメソッドはTimePartitioning
クラスを受け取り、単一のテーブルに書き込む場合にのみ使用できます。withJsonTimePartitioning
: このメソッドはwithTimePartitioning
と同じですが、JSONシリアル化されたStringオブジェクトを受け取ります。
この例では、1日あたり1つのパーティションが生成されます。
weatherData.apply(
BigQueryIO.<WeatherData>write()
.to(tableSpec + "_partitioning")
.withSchema(tableSchema)
.withFormatFunction(
(WeatherData elem) ->
new TableRow()
.set("year", elem.year)
.set("month", elem.month)
.set("day", elem.day)
.set("maxTemp", elem.maxTemp))
// NOTE: an existing table without time partitioning set up will not work
.withTimePartitioning(new TimePartitioning().setType("DAY"))
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
制限事項
BigQueryIOには現在、次の制限があります。
BigQueryの書き込み完了をパイプラインの他のステップと順序付けることはできません。
Beam SDK for Pythonを使用している場合、非常に大きなデータセットを書き込むと、インポートサイズの割り当ての問題が発生する可能性があります。回避策として、データセットをパーティション分割(たとえば、Beamの
Partition
変換を使用)して、複数のBigQueryテーブルに書き込むことができます。Beam SDK for Javaには、データセットを自動的にパーティション分割するため、この制限はありません。BigQueryにデータをロードすると、これらの制限が適用されます。デフォルトでは、BigQueryはデータのロードにスロットの共有プールを使用します。これは、利用可能な容量が保証されず、スロットが利用可能になるまでロードがキューに入れられる可能性があることを意味します。スロットが6時間以内に利用可能にならない場合、BigQueryによって設定された制限によりロードは失敗します。この状況を回避するために、BigQuery予約を使用することを強くお勧めします。これにより、ロードがキューに入れられたり、容量の問題で失敗したりすることがなくなります。
その他の例
Beamのサンプルディレクトリには、BigQueryを使用する追加の例があります。
Java クックブックの例
これらの例は、Javaのcookbook examplesディレクトリからのものです。
BigQueryTornadoesは、BigQueryの気象データの公開サンプルを読み取り、各月に発生する竜巻の数をカウントし、結果をBigQueryテーブルに書き込みます。
CombinePerKeyExamplesは、BigQueryから公開されているシェイクスピアデータを読み取り、指定された長さを超えるデータセット内の各単語に対して、その単語が表示される劇の名前のリストを含む文字列を生成します。パイプラインは、結果をBigQueryテーブルに書き込みます。
FilterExamplesは、BigQueryから気象データの公開サンプルを読み取り、データに対してプロジェクションを実行し、気温の読み取り値のグローバル平均を見つけ、特定の月の読み取り値をフィルタリングし、導出されたグローバル平均よりも平均気温が小さい(その月の)データのみを出力します。
JoinExamplesは、BigQueryからGDELT「世界イベント」のサンプルを読み取り、イベントの
action
国コードを国コードを国名にマッピングするテーブルと結合します。MaxPerKeyExamplesは、BigQueryの気象データの公開サンプルを読み取り、各月の最高気温を見つけ、結果をBigQueryテーブルに書き込みます。
TriggerExampleは、サンディエゴのフリーウェイからの交通データのストリーミング分析を実行します。パイプラインは、テキストファイルから入ってくるデータを調べ、結果をBigQueryテーブルに書き込みます。
Java の完全な例
これらの例は、Javaのcomplete examplesディレクトリからのものです。
AutoCompleteは、オートコンプリートに使用できる、すべてのプレフィックスで最も人気のあるハッシュタグを計算します。パイプラインはオプションで結果をBigQueryテーブルに書き込むことができます。
StreamingWordExtractは、テキストの行を読み取り、各行を個々の単語に分割し、それらの単語を大文字化し、出力をBigQueryテーブルに書き込みます。
TrafficMaxLaneFlowは、交通センサーデータを読み取り、記録された最も高い流れがあったレーンを見つけ、結果をBigQueryテーブルに書き込みます。
TrafficRoutesは、交通センサーデータを読み取り、各ウィンドウの平均速度を計算し、ルートの減速を探し、結果をBigQueryテーブルに書き込みます。
Python クックブックの例
これらの例は、Pythonのcookbook examplesディレクトリからのものです。
BigQuery schemaは、ネストされた繰り返しフィールドを使用して
TableSchema
を作成し、ネストされた繰り返しフィールドを含むデータを生成し、データをBigQueryテーブルに書き込みます。BigQuery side inputsは、BigQueryソースをサイド入力として使用します。シングルトン、イテレータ、リストの3つの異なる形式でサイド入力を変換に挿入する方法を示します。
BigQuery tornadoesは、テーブルスキーマの一部として「month」と「tornado」フィールドを持つBigQueryテーブルから読み取り、各月の竜巻の数を計算し、結果をBigQueryテーブルに出力します。
BigQuery filtersは、BigQueryテーブルから気象観測所データを読み取り、メモリ内でBigQuery行を操作し、結果をBigQueryテーブルに書き込みます。
最終更新日:2024/10/31
探していたものはすべて見つかりましたか?
すべてが役に立ち、明確でしたか?何か変更したいことはありますか?お知らせください!