Beam SQL CLIへの新しいデータソースの追加

Apache Beamの新しいエキサイティングな機能として、パイプラインでSQLを使用できるようになりました。これは、JavaパイプラインでBeamのSqlTransformを使用して行います。

Beamには、バッチまたはストリーミングのデータをインタラクティブにクエリするために使用できる、洗練された新しいSQLコマンドラインもあります。まだ試していない場合は、https://bit.ly/ExploreBeamSQLをご覧ください。

SQL CLIの優れた機能は、CREATE EXTERNAL TABLEコマンドを使用して、CLIでアクセスするデータソースを*追加*できることです。現在、CLIはBigQuery、PubSub、Kafka、およびテキストファイルからテーブルを作成することをサポートしています。この記事では、新しいデータソースを追加する方法を検討し、他のBeamソースからのデータを使用できるようにします。

この記事で実装するテーブルプロバイダーは、連続した非有界の整数ストリームを生成します。これは、Beam SDKのGenerateSequence PTransformに基づいています。最終的には、SQLでシーケンスジェネレーターを次のように定義して使用できるようになります。

CREATE EXTERNAL TABLE                      -- all tables in Beam are external, they are not persisted
  sequenceTable                              -- table alias that will be used in queries
  (
         sequence BIGINT,                  -- sequence number
         event_timestamp TIMESTAMP         -- timestamp of the generated event
  )
TYPE sequence                              -- type identifies the table provider
TBLPROPERTIES '{ elementsPerSecond : 12 }' -- optional rate at which events are generated

(SQLの例はそのまま)

SELECT sequence FROM sequenceTable;

そして、クエリで次のように使用できます。

(SQLの例はそのまま)

さあ、始めましょう!

TableProviderの実装

BeamのSqlTransformは、CREATE EXTERNAL TABLEステートメントが使用されたときに使用するTableProviderに依存して動作します。Beam SQL CLIに新しいデータソースを追加する場合は、それを行うためにTableProviderを追加する必要があります。この記事では、Java SDKで利用可能なGenerateSequence変換の新しいテーブルプロバイダーを作成するために必要な手順を示します。

TableProviderクラスは、sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/にあります。そこを見ると、利用可能なすべてのデータソースのプロバイダーとその実装を見つけることができます。そのため、必要なものをBaseBeamTableの実装ととも に追加するだけです。

@AutoService(TableProvider.class)
public class GenerateSequenceTableProvider extends InMemoryMetaTableProvider {

  @Override
  public String getTableType() {
    return "sequence";
  }

  @Override
  public BeamSqlTable buildBeamSqlTable(Table table) {
    return new GenerateSequenceTable(table);
  }
}

GenerateSequenceTableProvider

テーブルプロバイダーは次のようになります。

(コード例はそのまま)

class GenerateSequenceTable extends BaseBeamTable implements Serializable {
  public static final Schema TABLE_SCHEMA =
      Schema.of(Field.of("sequence", FieldType.INT64), Field.of("event_time", FieldType.DATETIME));

  Integer elementsPerSecond = 5;

  GenerateSequenceTable(Table table) {
    super(TABLE_SCHEMA);
    if (table.getProperties().containsKey("elementsPerSecond")) {
      elementsPerSecond = table.getProperties().getInteger("elementsPerSecond");
    }
  }

  @Override
  public PCollection.IsBounded isBounded() {
    return IsBounded.UNBOUNDED;
  }

  @Override
  public PCollection<Row> buildIOReader(PBegin begin) {
    return begin
        .apply(GenerateSequence.from(0).withRate(elementsPerSecond, Duration.standardSeconds(1)))
        .apply(
            MapElements.into(TypeDescriptor.of(Row.class))
                .via(elm -> Row.withSchema(TABLE_SCHEMA).addValues(elm, Instant.now()).build()))
        .setRowSchema(getSchema());
  }

  @Override
  public POutput buildIOWriter(PCollection<Row> input) {
    throw new UnsupportedOperationException("buildIOWriter unsupported!");
  }
}

テーブルのタイプを指定するだけで、buildBeamSqlTableメソッドを実装します。このメソッドは、GenerateSequenceTable実装で定義されたBeamSqlTableを返します。

GenerateSequenceTable

0: BeamSQL> CREATE EXTERNAL TABLE input_seq (
. . . . . >   sequence BIGINT COMMENT 'this is the primary key',
. . . . . >   event_time TIMESTAMP COMMENT 'this is the element timestamp'
. . . . . > )
. . . . . > TYPE 'sequence';
No rows affected (0.005 seconds)

ストリーミングを適切にサポートするテーブル実装が必要なので、ユーザーが1秒あたりに送信される要素数を定義できるようにします。連続した整数をストリーミング方式で送信する簡単なテーブルを定義します。これは次のようになります。

0: BeamSQL> SELECT * FROM input_seq LIMIT 5;
+---------------------+------------+
|      sequence       | event_time |
+---------------------+------------+
| 0                   | 2019-05-21 00:36:33 |
| 1                   | 2019-05-21 00:36:33 |
| 2                   | 2019-05-21 00:36:33 |
| 3                   | 2019-05-21 00:36:33 |
| 4                   | 2019-05-21 00:36:33 |
+---------------------+------------+
5 rows selected (1.138 seconds)

(コード例はそのまま)

0: BeamSQL> SELECT
. . . . . >   COUNT(sequence) as elements,
. . . . . >   TUMBLE_START(event_time, INTERVAL '2' SECOND) as window_start
. . . . . > FROM input_seq
. . . . . > GROUP BY TUMBLE(event_time, INTERVAL '2' SECOND) LIMIT 5;
+---------------------+--------------+
|      elements       | window_start |
+---------------------+--------------+
| 6                   | 2019-06-05 00:39:24 |
| 10                  | 2019-06-05 00:39:26 |
| 10                  | 2019-06-05 00:39:28 |
| 10                  | 2019-06-05 00:39:30 |
| 10                  | 2019-06-05 00:39:32 |
+---------------------+--------------+
5 rows selected (10.142 seconds)

本番