ブログ
2019/06/04
Beam SQL CLIへの新しいデータソースの追加
Apache Beamの新しいエキサイティングな機能として、パイプラインでSQLを使用できるようになりました。これは、JavaパイプラインでBeamのSqlTransform
を使用して行います。
Beamには、バッチまたはストリーミングのデータをインタラクティブにクエリするために使用できる、洗練された新しいSQLコマンドラインもあります。まだ試していない場合は、https://bit.ly/ExploreBeamSQLをご覧ください。
SQL CLIの優れた機能は、CREATE EXTERNAL TABLE
コマンドを使用して、CLIでアクセスするデータソースを*追加*できることです。現在、CLIはBigQuery、PubSub、Kafka、およびテキストファイルからテーブルを作成することをサポートしています。この記事では、新しいデータソースを追加する方法を検討し、他のBeamソースからのデータを使用できるようにします。
この記事で実装するテーブルプロバイダーは、連続した非有界の整数ストリームを生成します。これは、Beam SDKのGenerateSequence
PTransformに基づいています。最終的には、SQLでシーケンスジェネレーターを次のように定義して使用できるようになります。
CREATE EXTERNAL TABLE -- all tables in Beam are external, they are not persisted
sequenceTable -- table alias that will be used in queries
(
sequence BIGINT, -- sequence number
event_timestamp TIMESTAMP -- timestamp of the generated event
)
TYPE sequence -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated
(SQLの例はそのまま)
SELECT sequence FROM sequenceTable;
そして、クエリで次のように使用できます。
(SQLの例はそのまま)
さあ、始めましょう!
TableProvider
の実装
BeamのSqlTransform
は、CREATE EXTERNAL TABLE
ステートメントが使用されたときに使用するTableProvider
に依存して動作します。Beam SQL CLIに新しいデータソースを追加する場合は、それを行うためにTableProvider
を追加する必要があります。この記事では、Java SDKで利用可能なGenerateSequence
変換の新しいテーブルプロバイダーを作成するために必要な手順を示します。
TableProvider
クラスは、sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/
にあります。そこを見ると、利用可能なすべてのデータソースのプロバイダーとその実装を見つけることができます。そのため、必要なものをBaseBeamTable
の実装ととも に追加するだけです。
GenerateSequenceTableProvider
テーブルプロバイダーは次のようになります。
(コード例はそのまま)
class GenerateSequenceTable extends BaseBeamTable implements Serializable {
public static final Schema TABLE_SCHEMA =
Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));
Integer elementsPerSecond = 5;
GenerateSequenceTable(Table table) {
super(TABLE_SCHEMA);
if (table.getProperties().containsKey("elementsPerSecond")) {
elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
}
}
@Override
public PCollection.IsBounded isBounded() {
return IsBounded.UNBOUNDED;
}
@Override
public PCollection<Row> buildIOReader(PBegin begin) {
return begin
.apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
.apply(
MapElements.into(TypeDescriptor.of(Row.class))
.via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
.setRowSchema(getSchema());
}
@Override
public POutput buildIOWriter(PCollection<Row> input) {
throw new UnsupportedOperationException("buildIOWriter unsupported!");
}
}
テーブルのタイプを指定するだけで、buildBeamSqlTable
メソッドを実装します。このメソッドは、GenerateSequenceTable
実装で定義されたBeamSqlTable
を返します。
GenerateSequenceTable
0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . > sequence BIGINT COMMENT 'this is the primary key',
. . . . . > event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)
ストリーミングを適切にサポートするテーブル実装が必要なので、ユーザーが1秒あたりに送信される要素数を定義できるようにします。連続した整数をストリーミング方式で送信する簡単なテーブルを定義します。これは次のようになります。
0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
| sequence | event_time |
+---------------------+------------+
| 0 | 2019-05-21 00:36:33 |
| 1 | 2019-05-21 00:36:33 |
| 2 | 2019-05-21 00:36:33 |
| 3 | 2019-05-21 00:36:33 |
| 4 | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)
(コード例はそのまま)
0: BeamSQL> SELECT
. . . . . > COUNT(sequence) as elements,
. . . . . > TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
| elements | window_start |
+---------------------+--------------+
| 6 | 2019-06-05 00:39:24 |
| 10 | 2019-06-05 00:39:26 |
| 10 | 2019-06-05 00:39:28 |
| 10 | 2019-06-05 00:39:30 |
| 10 | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)
本番