Beam SQLウォークスルー
このページでは、Beam SQLの使用例をコード例とともに示します。
BeamスキーマとRow
SQLクエリは、スキーマが登録されているPCollection<T>
またはPCollection<Row>
にのみ適用できます。型T
のスキーマ登録の詳細については、Beamプログラミングガイドのスキーマに関するドキュメントを参照してください。
既存の型T
がない場合、PCollection<Row>
は、たとえば、複数の方法で取得できます。
メモリ内のデータから(通常は単体テスト用)。
注意:
Row
コーダーを明示的に指定する必要があります。この例では、Create.of(..)
を呼び出すことで指定しています。// Define the schema for the records. Schema appSchema = Schema .builder() .addInt32Field("appId") .addStringField("description") .addDateTimeField("rowtime") .build(); // Create a concrete row with that type. Row row = Row .withSchema(appSchema) .addValues(1, "Some cool app", new Date()) .build(); // Create a source PCollection containing only that row PCollection<Row> testApps = PBegin .in(p) .apply(Create .of(row) .withCoder(RowCoder.of(appSchema)));
他の型のレコードの
PCollection<T>
から(つまり、T
がすでにRow
ではない場合)。入力レコードをRow
形式に変換するParDo
を適用することによって取得します。// An example POJO class. class AppPojo { Integer appId; String description; Date timestamp; } // Acquire a collection of POJOs somehow. PCollection<AppPojo> pojos = ... // Convert them to Rows with the same schema as defined above via a DoFn. PCollection<Row> apps = pojos .apply( ParDo.of(new DoFn<AppPojo, Row>() { @ProcessElement public void processElement(ProcessContext c) { // Get the current POJO instance AppPojo pojo = c.element(); // Create a Row with the appSchema schema // and values from the current POJO Row appRow = Row .withSchema(appSchema) .addValues( pojo.appId, pojo.description, pojo.timestamp) .build(); // Output the Row representing the current POJO c.output(appRow); } })).setRowSchema(appSchema);
別の
SqlTransform
の出力として。詳細は次のセクションで説明します。
PCollection<Row>
を入手したら、SqlTransform
を使用してSQLクエリを適用できます。
SqlTransform
SqlTransform.query(queryString)
メソッドは、SQLクエリの文字列表現からPTransform
を作成する唯一のAPIです。このPTransform
は、単一のPCollection
または複数のPCollection
を保持するPCollectionTuple
のいずれかに適用できます。
単一の
PCollection
に適用する場合、クエリ内でテーブル名PCOLLECTION
を介して参照できます。PCollectionTuple
に適用する場合、タプル内の各PCollection
のタプルタグによって、クエリに使用できるテーブル名が定義されます。テーブル名は特定のPCollectionTuple
にバインドされているため、それに適用されるクエリのコンテキスト内でのみ有効であることに注意してください。たとえば、2つの
PCollection
を結合できます。// Create the schema for reviews Schema reviewSchema = Schema .builder() .addInt32Field("appId") .addInt32Field("reviewerId") .addFloatField("rating") .addDateTimeField("rowtime") .build(); // Obtain the reviews records with this schema PCollection<Row> reviewsRows = ... // Create a PCollectionTuple containing both PCollections. // TupleTags IDs will be used as table names in the SQL query PCollectionTuple namesAndFoods = PCollectionTuple .of(new TupleTag<>("Apps"), appsRows) // appsRows from the previous example .and(new TupleTag<>("Reviews"), reviewsRows); // Compute the total number of reviews // and average rating per app // by joining two PCollections PCollection<Row> output = namesAndFoods.apply( SqlTransform.query( "SELECT Apps.appId, COUNT(Reviews.rating), AVG(Reviews.rating) " + "FROM Apps INNER JOIN Reviews ON Apps.appId = Reviews.appId " + "GROUP BY Apps.appId"));
コードリポジトリのBeamSqlExampleは、両方のAPIの基本的な使用法を示しています。