Beam SQL拡張機能:ユーザー定義関数
Beam SQLに必要なスカラー関数または集計関数がない場合、Javaで作成し、SQLクエリで呼び出すことができます。これらは一般的にUDF(スカラー関数用)とUDAF(集計関数用)と呼ばれます。
ユーザー定義関数(UDF)の作成と指定
UDFは次のようになります。
- 0個以上のスカラーフィールドを受け取り、1つのスカラー値を返す任意のJavaメソッド。
SerializableFunction
。
以下は、UDFの例とDSLでの使用方法です。
/**
* A example UDF for test.
*/
public static class CubicInteger implements BeamSqlUdf {
public static Integer eval(Integer input){
return input * input * input;
}
}
/**
* Another example UDF with {@link SerializableFunction}.
*/
public static class CubicIntegerFn implements SerializableFunction<Integer, Integer> {
@Override
public Integer apply(Integer input) {
return input * input * input;
}
}
// Define a SQL query which calls the above UDFs
String sql =
"SELECT f_int, cubic1(f_int), cubic2(f_int)"
+ "FROM PCOLLECTION "
+ "WHERE f_int = 2";
// Create and apply the PTransform representing the query.
// Register the UDFs used in the query by calling '.registerUdf()' with
// either a class which implements BeamSqlUdf or with
// an instance of the SerializableFunction;
PCollection<Row> result =
input.apply(
"udfExample",
SqlTransform
.query(sql)
.registerUdf("cubic1", CubicInteger.class)
.registerUdf("cubic2", new CubicIntegerFn())
ユーザー定義集計関数(UDAF)の作成と指定
Beam SQLは、UDAFとしてCombineFn
を受け入れることができます。登録方法は上記のUDFの例と同様です。
/**
* UDAF(CombineFn) for test, which returns the sum of square.
*/
public static class SquareSum extends CombineFn<Integer, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer addInput(Integer accumulator, Integer input) {
return accumulator + input * input;
}
@Override
public Integer mergeAccumulators(Iterable<Integer> accumulators) {
int v = 0;
Iterator<Integer> ite = accumulators.iterator();
while (ite.hasNext()) {
v += ite.next();
}
return v;
}
@Override
public Integer extractOutput(Integer accumulator) {
return accumulator;
}
}
// Define a SQL query which calls the above UDAF
String sql =
"SELECT f_int1, squaresum(f_int2) "
+ "FROM PCOLLECTION "
+ "GROUP BY f_int1";
// Create and apply the PTransform representing the query.
// Register the UDAFs used in the query by calling '.registerUdaf()' by
// providing it an instance of the CombineFn
PCollection<Row> result =
input.apply(
"udafExample",
SqlTransform
.query(sql)
.registerUdaf("squaresum", new SquareSum()));