組み込み I/O 変換

SingleStoreDB I/O

SingleStoreDB I/O の使用と実行に関するパイプラインオプションと一般的な情報。

始める前に

SingleStoreDB I/O を使用するには、`pom.xml` ファイルに Maven アーティファクトの依存関係を追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-singlestore</artifactId>
    <version>2.60.0</version>
</dependency>

追加のリソース

認証

SingleStoreIO 接続プロパティの構成には、DataSource 構成が必要です。

DataSource 構成の作成

SingleStoreIO.DataSourceConfiguration
    .create("myHost:3306")
    .withDatabase("db")
    .withConnectionProperties("connectTimeout=30000;useServerPrepStmts=FALSE")
    .withPassword("password")
    .withUsername("admin");

パラメーターの配置場所

注記 - ` .withDatabase(...)` は ` .readWithPartitions()` に**必須**です。

SingleStoreDB からの読み取り

SingleStoreIO の機能の 1 つは、SingleStoreDB テーブルからの読み取りです。SingleStoreIO は 2 つのタイプの読み取りをサポートしています。

多くの場合、パフォーマンス上の理由から、シーケンシャルデータ読み取りよりも並列データ読み取りが優先されます。

シーケンシャルデータ読み取り

基本的な ` .read()` 操作の使用方法は次のとおりです。

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>read()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withStatementPreparator(statementPreparator)
        .withOutputParallelization(true)
        .withRowMapper(mapper)
);

パラメーターの配置場所

注記 - ` .withTable(...)` または ` .withQuery(...)` のいずれかが**必須**です。

並列データ読み取り

基本的な ` .readWithPartitions()` 操作の使用方法は次のとおりです。

PCollection<USER_DATA_TYPE> items = pipeline.apply(
    SingleStoreIO.<USER_DATA_TYPE>readWithPartitions()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE") // or .withQuery("QUERY")
        .withRowMapper(mapper)
);

パラメーターの配置場所

注記 - ` .withTable(...)` または ` .withQuery(...)` のいずれかが**必須**です。

StatementPreparator

`StatementPreparator` は `read()` によって使用され、`PreparedStatement` のパラメーターを設定します。例:

public static class MyStatmentPreparator implements SingleStoreIO.StatementPreparator {
    @Override
    public void setParameters(PreparedStatement preparedStatement) throws Exception {
        preparedStatement.setInt(1, 10);
    }
}

RowMapper

`RowMapper` は `read()` と `readWithPartitions()` によって使用され、`ResultSet` の各行を結果の `PCollection` の要素に変換します。例:

public static class MyRowMapper implements SingleStoreIO.RowMapper<MyRow> {
    @Override
    public MyRow mapRow(ResultSet resultSet) throws Exception {
        return MyRow.create(resultSet.getInt(1), resultSet.getString(2));
    }
}

SingleStoreDB テーブルへの書き込み

SingleStoreIO の機能の 1 つは、SingleStoreDB テーブルへの書き込みです。この変換により、ユーザーのPCollection を SingleStoreDB データベースに送信できます。各要素のバッチによって書き込まれた行数を返します。

基本的な ` .write()` 操作の使用方法は次のとおりです。

data.apply(
    SingleStoreIO.<USER_DATA_TYPE>write()
        .withDataSourceConfiguration(dc)
        .withTable("MY_TABLE")
        .withUserDataMapper(mapper)
        .withBatchSize(100000)
);

パラメーターの配置場所

UserDataMapper

`UserDataMapper` は、`write()` 操作がデータを保存する前に、`PCollection` から `String` 値の配列にデータをマップするために必要です。例:

public static class MyRowDataMapper implements SingleStoreIO.UserDataMapper<MyRow> {
    @Override
    public List<String> mapRow(MyRow element) {
        List<String> res = new ArrayList<>();
        res.add(element.id().toString());
        res.add(element.name());
        return res;
    }
}