Beam SQL拡張機能: CREATE EXTERNAL TABLE

Beam SQLのCREATE EXTERNAL TABLEステートメントは、外部ストレージシステムにマッピングする仮想テーブルを登録します。一部のストレージシステムでは、CREATE EXTERNAL TABLEは、書き込みが発生するまで物理テーブルを作成しません。物理テーブルが存在するようになったら、SELECTJOININSERT INTOステートメントを使用してテーブルにアクセスできます。

CREATE EXTERNAL TABLEステートメントには、スキーマと拡張句が含まれます。

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE type
[LOCATION location]
[TBLPROPERTIES tblProperties]

simpleType: TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL | BOOLEAN | DATE | TIME | TIMESTAMP | CHAR | VARCHAR

fieldType: simpleType | MAP<simpleType, fieldType> | ARRAY<fieldType> | ROW<tableElement [, tableElement ]*>

tableElement: columnName fieldType [ NOT NULL ]

BigQuery

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE bigquery
LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
TBLPROPERTIES '{"method": "DIRECT_READ"}'

読み取りモード

Beam SQLは、単純型(simpleType)の列と単純型の配列(ARRAY<simpleType>)の読み取りをサポートします。

EXPORTメソッドを使用して読み取る場合、次のパイプラインオプションを設定する必要があります。

DIRECT_READメソッドを使用して読み取る場合、オプティマイザーはプロジェクトと述語のプッシュダウンを実行しようとし、BigQueryからデータを読み取るために必要な時間を短縮する可能性があります。

BigQuery Storage APIの詳細については、こちらを参照してください。

書き込みモード

テーブルが存在しない場合、Beamは最初のレコードが書き込まれたときに、場所で指定されたテーブルを作成します。テーブルが存在する場合、指定された列は既存のテーブルと一致する必要があります。

スキーマ

スキーマ関連のエラーが発生すると、パイプラインがクラッシュします。 Map型はサポートされていません。 Beam SQL型は、BigQuery Standard SQL型に次のようにマッピングされます。

Beam SQL型BigQuery Standard SQL型
TINYINT、SMALLINT、INTEGER、BIGINT  INT64
FLOAT、DOUBLE、DECIMALFLOAT64
BOOLEANBOOL
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
CHAR、VARCHARSTRING
MAP(サポートされていません)
ARRAYARRAY
ROWSTRUCT

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE bigquery
LOCATION 'testing-integration:apache.users'

Cloud Bigtable

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    family ROW<qualifier cells [, qualifier cells ]* >
    [, family ROW< qualifier cells [, qualifier cells ]* > ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'

フラットスキーマを使用した代替構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
    key VARCHAR NOT NULL,
    qualifier SIMPLE_TYPE
    [, qualifier SIMPLE_TYPE ]*
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/[PROJECT_ID]/instances/[INSTANCE_ID]/tables/[TABLE]'
TBLPROPERTIES '{
  "columnsMapping": "family:qualifier[,family:qualifier]*"
}'

読み取りモード

Beam SQLは、必須のkeyフィールド、少なくとも1つのqualifierを持つ少なくとも1つのfamilyを持つ行の読み取りをサポートします。セルは、単純型(SIMPLE_TYPE)として、または必須のvalフィールド、オプションのtimestampMicros、およびオプションのlabelsを持つROW型として表されます。どちらも列の最新のセルを読み取ります。単純型の配列(ARRAY<simpleType>)として指定されたセルを使用すると、列のすべての値を読み取ることができます。

フラットスキーマの場合、SIMPLE_TYPE値のみが許可されます。 keyを除くすべてのフィールドは、columnsMappingで指定されたキーと値のペアに対応する必要があります。

既存のすべての列ファミリと修飾子をスキーマに提供する必要はありません。

フィルターは、RE2 Syntax regexを使用した単一のLIKEステートメントによるkeyフィールドでのみ許可されます。例:SELECT * FROM table WHERE key LIKE '^key[012]{1}'

書き込みモード

フラットスキーマでのみサポートされます。

CREATE EXTERNAL TABLE beamTable(
  key VARCHAR NOT NULL,
  beamFamily ROW<
     boolLatest BOOLEAN NOT NULL,
     longLatestWithTs ROW<
        val BIGINT NOT NULL,
        timestampMicros BIGINT NOT NULL
      > NOT NULL,
      allStrings ARRAY<VARCHAR> NOT NULL,
      doubleLatestWithTsAndLabels ROW<
        val DOUBLE NOT NULL,
        timestampMicros BIGINT NOT NULL,
        labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL,
      binaryLatestWithLabels ROW<
         val BINARY NOT NULL,
         labels ARRAY<VARCHAR> NOT NULL
      > NOT NULL
    > NOT NULL
  )
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/beamTable'

フラットスキーマの例

CREATE EXTERNAL TABLE flatTable(
  key VARCHAR NOT NULL,
  boolColumn BOOLEAN NOT NULL,
  longColumn BIGINT NOT NULL,
  stringColumn VARCHAR NOT NULL,
  doubleColumn DOUBLE NOT NULL,
  binaryColumn BINARY NOT NULL
)
TYPE bigtable
LOCATION 'googleapis.com/bigtable/projects/beam/instances/beamInstance/tables/flatTable'
TBLPROPERTIES '{
  "columnsMapping": "f:boolColumn,f:longColumn,f:stringColumn,f2:doubleColumn,f2:binaryColumn"
}'

書き込み例

INSERT INTO writeTable(key, boolColumn, longColumn, stringColumn, doubleColumn)
  VALUES ('key', TRUE, 10, 'stringValue', 5.5)

Pub/Sub

構文

ネストモード

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    event_timestamp TIMESTAMP,
    attributes [MAP<VARCHAR, VARCHAR>, ARRAY<ROW<VARCHAR key, VARCHAR value>>],
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

フラットモード

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(tableElement [, tableElement ]*)
TYPE pubsub
LOCATION 'projects/[PROJECT]/topics/[TOPIC]'

ネストモードでは、次のフィールドにトピックメタデータが保持されます。 attributesフィールドの存在により、ネストモードの使用がトリガーされます。

読み取りモード

PubsubIOは、新しいサブスクリプションを作成することでトピックからの読み取りをサポートします。

書き込みモード

PubsubIOは、トピックへの書き込みをサポートします。

スキーマ

Pub/Subメッセージにはメタデータが関連付けられており、クエリでこのメタデータを参照できます。各メッセージについて、Pub/Subは、ペイロード(一般的なケースでは非構造化)に加えて、公開時間とユーザーが提供した属性のマップを公開します。この情報は、SQLステートメントから保持およびアクセスできる必要があります。現在、これはPubsubIOテーブルで、以下に示すように特別な列のセットを宣言する必要があることを意味します。

サポートされているペイロード

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes MAP<VARCHAR, VARCHAR>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsub
LOCATION 'projects/testing-integration/topics/user-location'

Pub/Sub Lite

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName(
    publish_timestamp DATETIME,
    event_timestamp DATETIME,
    message_key BYTES,
    attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
    payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE pubsublite
// For writing
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/topics/[TOPIC]'
// For reading
LOCATION 'projects/[PROJECT]/locations/[GCP-LOCATION]/subscriptions/[SUBSCRIPTION]'

読み取りモード

PubsubLiteIOは、サブスクリプションからの読み取りをサポートします。

書き込みモード

PubsubLiteIOは、トピックへの書き込みをサポートします。

サポートされているペイロード

CREATE EXTERNAL TABLE locations (event_timestamp TIMESTAMP, attributes ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>, payload ROW<id INTEGER, location VARCHAR>)
TYPE pubsublite
LOCATION 'projects/testing-integration/locations/us-central1-a/topics/user-location'

Kafka

KafkaIOは、Beam SQLでは実験的な機能です。

構文

フラットモード

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

ネストモード

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (
  event_timestamp DATETIME,
  message_key BYTES,
  headers ARRAY<ROW<key VARCHAR, `values` ARRAY<VARBINARY>>>,
  payload [BYTES, ROW<tableElement [, tableElement ]*>]
)
TYPE kafka
LOCATION 'my.company.url.com:2181/topic1'
TBLPROPERTIES '{
    "bootstrap_servers": ["localhost:9092", "PLAINTEXT://192.168.1.200:2181"],
    "topics": ["topic2", "topic3"],
    "format": "json"
}'

headersフィールドの存在は、ネストされたモードの使用をトリガーします。

読み取りモード

読み取りモードは、トピックからの読み取りをサポートします。

書き込みモード

書き込みモードは、トピックへの書き込みをサポートします。

サポートされているフォーマット

スキーマ

CSVでは、単純型のみがサポートされます。

MongoDB

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE mongodb
LOCATION 'mongodb://[HOST]:[PORT]/[DATABASE]/[COLLECTION]'

読み取りモード

読み取りモードは、コレクションからの読み取りをサポートします。

書き込みモード

書き込みモードは、コレクションへの書き込みをサポートします。

スキーマ

単純型のみがサポートされています。MongoDBドキュメントは、JsonToRow変換を介してBeam SQL型にマッピングされます。

CREATE EXTERNAL TABLE users (id INTEGER, username VARCHAR)
TYPE mongodb
LOCATION 'mongodb://localhost:27017/apache/users'

テキスト

TextIOは、Beam SQLでは実験的な機能です。読み取りモードと書き込みモードは、現在同じ基盤となるデータにアクセスしていません。

構文

CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, tableElement ]*)
TYPE text
LOCATION '/home/admin/orders'
TBLPROPERTIES '{"format: "Excel"}'
formatの値フィールドデリミター引用符レコードセパレーター空行を無視しますか?列名がなくてもよいですか?
default,"\r\nはいいいえ
rfc4180,"\r\nいいえいいえ
excel,"\r\nいいえはい
tdf\t"\r\nはいいいえ
mysql\tnone\nいいえいいえ

読み取りモード

読み取りモードは、ファイルからの読み取りをサポートします。

書き込みモード

書き込みモードは、一連のファイルへの書き込みをサポートします。TextIOは書き込み時にファイルを作成します。

サポートされているペイロード

スキーマ

単純型のみがサポートされます。

CREATE EXTERNAL TABLE orders (id INTEGER, price INTEGER)
TYPE text
LOCATION '/home/admin/orders'

汎用ペイロード処理

特定のデータソースとシンクは、汎用ペイロード処理をサポートしています。この処理では、バイト配列ペイロードフィールドをテーブルスキーマに解析します。この処理では、次のスキーマがサポートされます。すべて"format": "<type>"の設定が少なくとも必要で、その他のプロパティが必要になる場合があります。

汎用DLQ処理

汎用DLQ処理をサポートするソースとシンクは、"<dlqParamName>": "[DLQ_KIND]:[DLQ_ID]"の形式でパラメータを指定します。次のタイプのDLQ処理がサポートされています。