Beam SQL拡張機能: CREATE EXTERNAL TABLE
Beam SQLのCREATE EXTERNAL TABLE
ステートメントは、外部ストレージシステムにマッピングする仮想テーブルを登録します。一部のストレージシステムでは、CREATE EXTERNAL TABLE
は、書き込みが発生するまで物理テーブルを作成しません。物理テーブルが存在するようになったら、SELECT
、JOIN
、INSERT INTO
ステートメントを使用してテーブルにアクセスできます。
CREATE EXTERNAL TABLE
ステートメントには、スキーマと拡張句が含まれます。
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE type
[LOCATION location]
[TBLPROPERTIES tblProperties]
simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR
fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*>
tableElement: columnName fieldType [ NOT NULL ]
IF NOT EXISTS
: オプション。テーブルがすでに登録されている場合、Beam SQLはエラーを返す代わりにステートメントを無視します。tableName
: 作成および登録するテーブルの大文字と小文字を区別する名前。 識別子として指定します。テーブル名は、基になるデータストレージシステムの名前と一致する必要はありません。tableElement
:columnName
fieldType
[ NOT NULL ]
columnName
: 列の大文字と小文字を区別する名前。バッククォートで囲まれた式として指定します。fieldType
: フィールドの型。次のいずれかの型として指定します。simpleType
:TINYINT
、SMALLINT
、INTEGER
、BIGINT
、FLOAT
、DOUBLE
、DECIMAL
、BOOLEAN
、DATE
、TIME
、TIMESTAMP
、CHAR
、VARCHAR
MAP<simpleType, fieldType>
ARRAY<fieldType>
ROW<tableElement [, tableElement ]*>
NOT NULL
: オプション。列がnull許容でないことを示します。
type
: 仮想テーブルをバックアップするI/O変換。 識別子として指定します。次のいずれかの値になります。bigquery
bigtable
pubsub
kafka
text
location
: 基になるテーブルのI/O固有の場所。 文字列リテラルとして指定します。location
の形式要件については、I/O固有のセクションを参照してください。tblProperties
: 追加の構成を含むI/O固有の引用符で囲まれたキーと値のJSONオブジェクト。文字列リテラルとして指定します。tblProperties
の形式要件については、I/O固有のセクションを参照してください。
BigQuery
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE bigquery
LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
TBLPROPERTIES '{"method": "DIRECT_READ"}'
LOCATION
: BigQuery CLI形式のテーブルの場所。PROJECT_ID
: Google CloudプロジェクトのID。DATASET
: BigQueryデータセットID。TABLE
: データセット内のBigQueryテーブルID。
TBLPROPERTIES
:method
: オプション。使用する読み取り方法。次のオプションを利用できます。DIRECT_READ
: BigQuery Storage APIを使用します。EXPORT
: データをAvro形式でGoogle Cloud Storageにエクスポートし、その場所からデータファイルを読み取ります。- Beam 2.21以降のデフォルトは
DIRECT_READ
です(古いバージョンではEXPORT
を使用します)。
読み取りモード
Beam SQLは、単純型(simpleType
)の列と単純型の配列(ARRAY<simpleType>
)の読み取りをサポートします。
EXPORT
メソッドを使用して読み取る場合、次のパイプラインオプションを設定する必要があります。
project
: Google CloudプロジェクトのID。tempLocation
: 中間データを保存するバケット。例:gs://temp-storage/temp
。
DIRECT_READ
メソッドを使用して読み取る場合、オプティマイザーはプロジェクトと述語のプッシュダウンを実行しようとし、BigQueryからデータを読み取るために必要な時間を短縮する可能性があります。
BigQuery Storage APIの詳細については、こちらを参照してください。
書き込みモード
テーブルが存在しない場合、Beamは最初のレコードが書き込まれたときに、場所で指定されたテーブルを作成します。テーブルが存在する場合、指定された列は既存のテーブルと一致する必要があります。
スキーマ
スキーマ関連のエラーが発生すると、パイプラインがクラッシュします。 Map型はサポートされていません。 Beam SQL型は、BigQuery Standard SQL型に次のようにマッピングされます。
Beam SQL型 | BigQuery Standard SQL型 |
TINYINT、SMALLINT、INTEGER、BIGINT | INT64 |
FLOAT、DOUBLE、DECIMAL | FLOAT64 |
BOOLEAN | BOOL |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
CHAR、VARCHAR | STRING |
MAP | (サポートされていません) |
ARRAY | ARRAY |
ROW | STRUCT |
例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE bigquery
LOCATION 'testing-integration:apache.users'
Cloud Bigtable
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
key VARCHAR NOT NULL,
family ROW<qualifier cells [, qualifier cells ]* >
[, family ROW< qualifier cells [, qualifier cells ]* > ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
key
: Bigtable行のキーfamily
: 列ファミリの名前qualifier
: 列修飾子cells
: 各値のいずれかTYPE
ARRAY<SIMPLE_TYPE>
LOCATION
:PROJECT_ID
: Google CloudプロジェクトのID。INSTANCE_ID
: BigtableインスタンスID。TABLE
: BigtableテーブルID。
TYPE
:SIMPLE_TYPE
またはCELL_ROW
CELL_ROW
:ROW<val SIMPLE_TYPE [, timestampMicros BIGINT [NOT NULL]] [, labels ARRAY<VARCHAR> [NOT NULL]]
SIMPLE_TYPE
: 次のいずれかBINARY
VARCHAR
BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
FLOAT
BOOLEAN
TIMESTAMP
フラットスキーマを使用した代替構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
key VARCHAR NOT NULL,
qualifier SIMPLE_TYPE
[, qualifier SIMPLE_TYPE ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
TBLPROPERTIES '{
"columnsMapping": "family:qualifier[,family:qualifier]*"
}'
key
: Bigtable行のキーfamily
: 列ファミリの名前qualifier
: 列修飾子LOCATION
:PROJECT_ID
: Google CloudプロジェクトのID。INSTANCE_ID
: BigtableインスタンスID。TABLE
: BigtableテーブルID。
TBLPROPERTIES
: コロンで区切られたコンマ区切りのキーと値のペアを持つcolumnsMappingキーを含むJSONオブジェクトSIMPLE_TYPE
: 前の構文と同じ
読み取りモード
Beam SQLは、必須のkey
フィールド、少なくとも1つのqualifier
を持つ少なくとも1つのfamily
を持つ行の読み取りをサポートします。セルは、単純型(SIMPLE_TYPE
)として、または必須のval
フィールド、オプションのtimestampMicros
、およびオプションのlabels
を持つROW型として表されます。どちらも列の最新のセルを読み取ります。単純型の配列(ARRAY<simpleType>
)として指定されたセルを使用すると、列のすべての値を読み取ることができます。
フラットスキーマの場合、SIMPLE_TYPE
値のみが許可されます。 key
を除くすべてのフィールドは、columnsMapping
で指定されたキーと値のペアに対応する必要があります。
既存のすべての列ファミリと修飾子をスキーマに提供する必要はありません。
フィルターは、RE2 Syntax regexを使用した単一のLIKE
ステートメントによるkey
フィールドでのみ許可されます。例:SELECT * FROM table WHERE key LIKE '^key[012]{1}'
書き込みモード
フラットスキーマでのみサポートされます。
例
CREATE EXTERNAL TABLE beamTable(
key VARCHAR NOT NULL,
beamFamily ROW<
boolLatest BOOLEAN NOT NULL,
longLatestWithTs ROW<
val BIGINT NOT NULL,
timestampMicros BIGINT NOT NULL
> NOT NULL,
allStrings ARRAY<VARCHAR> NOT NULL,
doubleLatestWithTsAndLabels ROW<
val DOUBLE NOT NULL,
timestampMicros BIGINT NOT NULL,
labels ARRAY<VARCHAR> NOT NULL
> NOT NULL,
binaryLatestWithLabels ROW<
val BINARY NOT NULL,
labels ARRAY<VARCHAR> NOT NULL
> NOT NULL
> NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'
フラットスキーマの例
CREATE EXTERNAL TABLE flatTable(
key VARCHAR NOT NULL,
boolColumn BOOLEAN NOT NULL,
longColumn BIGINT NOT NULL,
stringColumn VARCHAR NOT NULL,
doubleColumn DOUBLE NOT NULL,
binaryColumn BINARY NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable'
TBLPROPERTIES '{
"columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn"
}'
書き込み例
INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn)
VALUES ('key', TRUE, 10, 'stringValue', 5.5)
Pub/Sub
構文
ネストモード
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
event_timestamp TIMESTAMP,
attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
フラットモード
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'
ネストモードでは、次のフィールドにトピックメタデータが保持されます。 attributes
フィールドの存在により、ネストモードの使用がトリガーされます。
event_timestamp
: PubsubIOによってPub/Subメッセージに関連付けられたイベントタイムスタンプ。次のいずれかになります。- Pub/Subによって提供されるメッセージ公開時刻。これは、追加の構成が提供されない場合のデフォルト値です。
- ユーザーが提供するメッセージ属性の1つで指定されたタイムスタンプ。属性キーは、
tblProperties
blobのtimestampAttributeKey
フィールドによって構成されます。属性の値は、PubsubIOの要件に準拠している必要があり、Unixエポックからのミリ秒数またはRFC 339 日付文字列のいずれかです。
attributes
: Pub/Subメッセージからユーザーが提供した属性マップ。payload
: Pub/Subメッセージのペイロードのスキーマ。レコードを非整列化できない場合、レコードはtblProperties
blobのdeadLeaderQueue
フィールドで指定されたトピックに書き込まれます。この場合にデッドレターキューが指定されていない場合、例外がスローされ、パイプラインがクラッシュします。LOCATION
:PROJECT
: Google CloudプロジェクトのIDTOPIC
: Pub/Subトピック名。サブスクリプションは自動的に作成されますが、自動的にクリーンアップされることはありません。既存のサブスクリプションを指定することはサポートされていません。
TBLPROPERTIES
:timestampAttributeKey
: オプション。Pub/Subメッセージに関連付けられたイベントタイムスタンプを含むキー。指定しない場合、メッセージの公開タイムスタンプがウィンドウ処理/ウォーターマーキングのイベントタイムスタンプとして使用されます。deadLetterQueue
: ペイロードが解析されなかった場合にメッセージが書き込まれるトピック。指定しない場合、解析の失敗に対して例外がスローされます。format
: オプション。Pubsubペイロード形式を指定できます。
読み取りモード
PubsubIOは、新しいサブスクリプションを作成することでトピックからの読み取りをサポートします。
書き込みモード
PubsubIOは、トピックへの書き込みをサポートします。
スキーマ
Pub/Subメッセージにはメタデータが関連付けられており、クエリでこのメタデータを参照できます。各メッセージについて、Pub/Subは、ペイロード(一般的なケースでは非構造化)に加えて、公開時間とユーザーが提供した属性のマップを公開します。この情報は、SQLステートメントから保持およびアクセスできる必要があります。現在、これはPubsubIOテーブルで、以下に示すように特別な列のセットを宣言する必要があることを意味します。
サポートされているペイロード
- Pub/Subは、汎用ペイロード処理をサポートしています。
例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'
Pub/Sub Lite
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
publish_timestamp DATETIME,
event_timestamp DATETIME,
message_key BYTES,
attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsublite
// For writing
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
// For reading
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'
LOCATION
:PROJECT
: Google CloudプロジェクトのIDTOPIC
: Pub/Sub Liteトピック名。SUBSCRIPTION
: Pub/Sub Liteサブスクリプション名。GCP-LOCATION
: このPub/Sub Liteトピックまたはサブスクリプションの場所。
TBLPROPERTIES
:timestampAttributeKey
: オプション。Pub/Subメッセージに関連付けられたイベントタイムスタンプを含むキー。指定しない場合、メッセージの公開タイムスタンプがウィンドウ処理/ウォーターマーキングのイベントタイムスタンプとして使用されます。deadLetterQueue
: オプション、汎用DLQ処理をサポートしますformat
: オプション。ペイロード形式を指定できます。
読み取りモード
PubsubLiteIOは、サブスクリプションからの読み取りをサポートします。
書き込みモード
PubsubLiteIOは、トピックへの書き込みをサポートします。
サポートされているペイロード
- Pub/Sub Liteは、汎用ペイロード処理をサポートしています。
例
CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsublite
LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'
Kafka
KafkaIOは、Beam SQLでは実験的な機能です。
構文
フラットモード
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
"bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
"topics": ["topic2", "topic3"],
"format": "json"
}'
ネストモード
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
event_timestamp DATETIME,
message_key BYTES,
headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
"bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
"topics": ["topic2", "topic3"],
"format": "json"
}'
headers
フィールドの存在は、ネストされたモードの使用をトリガーします。
LOCATION
: 使用する初期ブートストラップブローカーと、パスとして提供される初期トピック名を含むURL。TBLPROPERTIES
:bootstrap_servers
: オプション。LOCATION
内のものに加えて使用される追加のブートストラップサーバーを指定できます。topics
: オプション。LOCATION
内のものに加えて使用される追加のトピックを指定できます。format
: オプション。Kafka値の形式を指定できます。可能な値は、{csv
、avro
、json
、proto
、thrift
}です。フラット化モードではcsv
、ネストモードではjson
がデフォルトです。csv
はネストモードをサポートしていません。
読み取りモード
読み取りモードは、トピックからの読み取りをサポートします。
書き込みモード
書き込みモードは、トピックへの書き込みをサポートします。
サポートされているフォーマット
- CSV(デフォルト)
- Beamは、スキーマで指定された型に従ってフィールドを解析しようと試み、メッセージを解析します。
- Kafkaは、すべての汎用ペイロード処理形式をサポートします。
スキーマ
CSVでは、単純型のみがサポートされます。
MongoDB
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE mongodb
LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'
LOCATION
: コレクションの場所。HOST
: MongoDBサーバーの場所。localhostまたはIPアドレスを指定できます。認証が必要な場合は、次のようにユーザー名とパスワードを指定できます:username:password@localhost
。PORT
: MongoDBサーバーがリッスンしているポート。DATABASE
: 接続するデータベース。COLLECTION
: データベース内のコレクション。
読み取りモード
読み取りモードは、コレクションからの読み取りをサポートします。
書き込みモード
書き込みモードは、コレクションへの書き込みをサポートします。
スキーマ
単純型のみがサポートされています。MongoDBドキュメントは、JsonToRow
変換を介してBeam SQL型にマッピングされます。
例
CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE mongodb
LOCATION 'mongodb://localhost:27017/apache/users'
テキスト
TextIOは、Beam SQLでは実験的な機能です。読み取りモードと書き込みモードは、現在同じ基盤となるデータにアクセスしていません。
構文
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE text
LOCATION '/home/admin/orders'
TBLPROPERTIES '{"format: "Excel"}'
LOCATION
: 読み取りモードの場合のファイルへのパス。書き込みモードの場合のプレフィックス。TBLPROPERTIES
:format
: オプション。フィールドデリミター、引用符、レコードセパレーター、およびその他のプロパティを制御するCSV形式を指定できます。次の表を参照してください。
format の値 | フィールドデリミター | 引用符 | レコードセパレーター | 空行を無視しますか? | 列名がなくてもよいですか? |
---|---|---|---|---|---|
default | , | " | \r\n | はい | いいえ |
rfc4180 | , | " | \r\n | いいえ | いいえ |
excel | , | " | \r\n | いいえ | はい |
tdf | \t | " | \r\n | はい | いいえ |
mysql | \t | none | \n | いいえ | いいえ |
読み取りモード
読み取りモードは、ファイルからの読み取りをサポートします。
書き込みモード
書き込みモードは、一連のファイルへの書き込みをサポートします。TextIOは書き込み時にファイルを作成します。
サポートされているペイロード
- CSV
- Beamは、org.apache.commons.csvを使用して、スキーマで指定された型に従ってフィールドを解析しようと試み、メッセージを解析します。
スキーマ
単純型のみがサポートされます。
例
CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER)
TYPE text
LOCATION '/home/admin/orders'
汎用ペイロード処理
特定のデータソースとシンクは、汎用ペイロード処理をサポートしています。この処理では、バイト配列ペイロードフィールドをテーブルスキーマに解析します。この処理では、次のスキーマがサポートされます。すべて"format": "<type>"
の設定が少なくとも必要で、その他のプロパティが必要になる場合があります。
avro
: Avro- Avroスキーマは、指定されたフィールド型から自動的に生成されます。これは、受信メッセージの解析と送信メッセージのフォーマットに使用されます。
json
: JSONオブジェクト- Beamは、バイト配列をUTF-8 JSONとして解析し、スキーマと一致させようとします。
proto
: Protocol Buffers- Beamは、同等のProtocol Bufferクラスを見つけ、それを使用してペイロードを解析します。
protoClass
: 必須。使用するprotoクラス名。デプロイされたJARに組み込まれている必要があります。- スキーマ内のフィールドは、指定された
protoClass
のフィールドと一致する必要があります。
thrift
: Thrift- スキーマ内のフィールドは、指定された
thriftClass
のフィールドと一致する必要があります。 thriftClass
: 必須。完全なthrift javaクラス名を指定できます。デプロイされたJARに組み込まれている必要があります。thriftProtocolFactoryClass
: 必須。thriftシリアル化に使用するTProtocolFactory
の完全なクラス名を指定できます。デプロイされたJARに組み込まれている必要があります。- thriftシリアル化に使用する
TProtocolFactory
は、提供されたthriftProtocolFactoryClass
と一致する必要があります。
- スキーマ内のフィールドは、指定された
汎用DLQ処理
汎用DLQ処理をサポートするソースとシンクは、"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"
の形式でパラメータを指定します。次のタイプのDLQ処理がサポートされています。
bigquery
: BigQuery- DLQ_IDは、「error」文字列フィールドと「payload」バイト配列フィールドを持つ出力テーブルのテーブル仕様です。
pubsub
: Pub/Subトピック- DLQ_IDは、Pub/Subトピックのフルパスです。
pubsublite
: Pub/Sub Liteトピック- DLQ_IDは、Pub/Sub Liteトピックのフルパスです。