Beam SQL シェル
概要
バージョン 2.6.0 以降、Beam SQL には、Beam SQL シェルと呼ばれるインタラクティブシェルが含まれています。このシェルを使用すると、Java SDK を必要とせずに、SQL クエリとしてパイプラインを作成できます。デフォルトでは、Beam は `DirectRunner` を使用してクエリを Beam パイプラインとして実行します。
このページでは、シェルの使い方について説明しますが、Beam SQL の特定の機能に焦点を当てるものではありません。このページの例で使用されている機能のより詳細な概要については、Beam SQL ドキュメント の対応するセクションを参照してください。
クイックスタート
Beam SQL シェルを使用するには、まず Beam SDK リポジトリ をクローンする必要があります。次に、リポジトリのクローンルートから、次のコマンドを実行してシェルを実行します。
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
./sdks/java/extensions/sql/shell/build/install/shell/bin/shell
コマンドを実行すると、SQL シェルが起動し、クエリを入力できます。
Welcome to Beam SQL 2.6.0-SNAPSHOT (based on sqlline version 1.4.0)
0: BeamSQL>
注: Gradle コマンドを実行する前にプロジェクトをビルドしていない場合、Gradle が最初にすべての依存関係をビルドする必要があるため、コマンドには数分かかります。
シェルはクエリを Beam パイプラインに変換し、`DirectRunner` を使用して実行し、パイプラインが完了すると結果をテーブルとして返します。
0: BeamSQL> SELECT 'foo' AS NAME, 'bar' AS TYPE, 'num' AS NUMBER;
+------+------+--------+
| NAME | TYPE | NUMBER |
+------+------+--------+
| foo | bar | num |
+------+------+--------+
1 row selected (0.826 seconds)
テーブルの宣言
ソースからデータを読み取ったり、宛先にデータを書き込んだりする前に、`CREATE EXTERNAL TABLE` ステートメントを使用して仮想テーブルを宣言する必要があります。たとえば、現在のフォルダーにローカル CSV ファイル `"test-file.csv"` がある場合は、次のステートメントでテーブルを作成できます。
0: BeamSQL> CREATE EXTERNAL TABLE csv_file (field1 VARCHAR, field2 INTEGER) TYPE text LOCATION 'test-file.csv';
No rows affected (0.042 seconds)
`CREATE EXTERNAL TABLE` ステートメントは、CSV ファイルを Beam SQL のテーブルとして登録し、テーブルのスキーマを指定します。このステートメントは、永続的な物理テーブルを直接作成するのではなく、データの読み書きに使用するクエリでテーブルを使用できるように、Beam SQL へのソース/シンクを記述するだけです。
`CREATE EXTERNAL TABLE` の構文とサポートされているテーブルタイプの詳細については、CREATE EXTERNAL TABLE リファレンスページを参照してください。
データの読み書き
前のセクションで宣言したローカル CSV ファイルからデータを読み取るには、次のクエリを実行します。
0: BeamSQL> SELECT field1 AS field FROM csv_file;
+--------+
| field |
+--------+
| baz |
| foo |
| bar |
| bar |
| foo |
+--------+
`SELECT` 構文の詳細については、クエリ構文ページを参照してください。
CSV ファイルにデータを書き込むには、`INSERT INTO … SELECT ...` ステートメントを使用します。
0: BeamSQL> INSERT INTO csv_file SELECT 'foo', 'bar';
読み取りと書き込みの動作は、テーブルのタイプによって異なります。例えば
- テーブルタイプ `text` は `TextIO` を使用して実装されるため、`text` テーブルへの書き込みは、複数の番号付きファイルを生成する可能性があります。
- テーブルタイプ `pubsub` は非バウンドソースであるため、`pubsub` テーブルからの読み取りは完了しません。
非バウンドソースを使用した開発
開発中に非バウンドソースからのデータを検査する場合は、`SELECT` ステートメントの最後に `LIMIT x` 句を指定して、出力を `x` 件のレコードに制限する必要があります。そうしないと、パイプラインが終了しません。
0: BeamSQL> SELECT field1 FROM unbounded_source LIMIT 10 ;
これまで示してきたクエリの例は、ローカルで実行される高速クエリです。これらのクエリは、データを調査したり、パイプラインを反復的に設計したりする場合に役立ちます。理想的には、クエリがすぐに完了し、完了時に出力が返されることが望ましいです。
SQL ステートメントのロジックに満足したら、`LIMIT x` ステートメントを削除して、ステートメントを長時間実行されるジョブとして送信できます。その後、テーブルの1つが非バウンドソースを表している場合、パイプラインは永続的に実行される可能性があります。
ランナーの指定
デフォルトでは、Beam は `DirectRunner` を使用して、コマンドを実行しているマシンでパイプラインを実行します。別のランナーでパイプラインを実行する場合は、次の2つのステップを実行する必要があります。
SQL シェルに必要なランナーが含まれていることを確認してください。Gradle 呼び出しの `-Pbeam.sql.shell.bundled` パラメータに、対応するプロジェクト ID を追加します (ソースコード, プロジェクトID)。たとえば、次のコマンドを使用して、Flink ランナーと KafkaIO を含めます。
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' installDist
注: 同じ方法で、複数のランナー (コンマ区切りのリストを使用) またはその他の追加コンポーネントをバンドルできます。たとえば、より多くの I/O のサポートを追加できます。
次に、`SET` コマンド (リファレンスページ) を使用してランナーを指定します。
0: BeamSQL> SET runner='FlinkRunner';
Beam は、指定されたランナーへのパイプラインとして、将来のすべての `INSERT` ステートメントを送信します。この場合、Beam SQL シェルはクエリ結果を表示しません。送信されたジョブは、対応するランナーの UI (たとえば、Flink UI またはコマンドラインを使用) を介して管理する必要があります。
PipelineOptions の指定
ランナーを構成するには、`SET` コマンド (詳細) を使用して `PipelineOptions` を指定する必要があります。
0: BeamSQL> SET projectId='gcpProjectId';
0: BeamSQL> SET tempLocation='/tmp/tempDir';
SQL シェルのパッケージ化
`distZip` または `distTar` タスクを使用して、SQL シェルのスタンドアロンパッケージを独自に構築することもできます。たとえば
./gradlew -p sdks/java/extensions/sql/shell -Pbeam.sql.shell.bundled=':runners:flink:1.17,:sdks:java:io:kafka' distZip
ls ./sdks/java/extensions/sql/shell/build/distributions/
beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.tar beam-sdks-java-extensions-sql-shell-2.6.0-SNAPSHOT.zip