組み込み I/O 変換

Snowflake I/O

Snowflake IO の使用と実行に関するパイプラインオプションと一般的な情報。

開始する前に

SnowflakeIO を使用するには、Maven アーティファクトの依存関係を `pom.xml` ファイルに追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-snowflake</artifactId>
    <version>2.60.0</version>
</dependency>

追加のリソース

認証

読み取りとバッチ書き込みは、次の認証方法をサポートしています

ストリーミング書き込みは、キーペア認証のみをサポートしています。詳細については、BEAM-3304 を参照してください。

資格情報の受け渡しは、`SnowflakeIO.DataSourceConfiguration` クラスをインスタンス化する際に使用されるパイプラインオプションを介して行われます。各認証方法には、このクラスを構成する異なる方法があります。

ユーザー名とパスワード

SnowflakeIO でユーザー名/パスワード認証を使用するには、次のパイプラインオプションを使用してパイプラインを呼び出します

--username=<USERNAME> --password=<PASSWORD>

資格情報の受け渡しは、`SnowflakeIO.DataSourceConfiguration` クラスをインスタンス化する際に使用されるパイプラインオプションを介して行われます。

SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
        .withUsernamePasswordAuth(
                options.getUsername(),
                options.getPassword())
        .withServerName(options.getServerName())
        .withDatabase(options.getDatabase())
        .withRole(options.getRole())
        .withWarehouse(options.getWarehouse())
        .withSchema(options.getSchema());

キーペア

この認証方法を使用するには、最初にキーペアを生成し、IO 変換を使用して接続する Snowflake ユーザーに公開キーを関連付ける必要があります。手順については、Snowflake ドキュメントのキーペア認証とキーペアローテーションを参照してください。

SnowflakeIO でキーペア認証を使用するには、次のパイプラインオプションのセットのいずれかを使用してパイプラインを呼び出します

OAuth トークン

SnowflakeIO は OAuth トークンもサポートしています。

重要:SnowflakeIO には有効な OAuth アクセストークンが必要です。トークンを更新したり、Web ベースのフローを使用してトークンを取得したりすることはできません。OAuth 統合の構成とトークンの取得については、Snowflake ドキュメントを参照してください。

トークンを取得したら、次のパイプラインオプションを使用してパイプラインを呼び出します

--oauthToken=<TOKEN>
`SnowflakeIO.DataSourceConfiguration` クラスの初期化は次のようになります
 SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());

DataSource 構成

DataSource 構成は、IO の目的で Snowflake 接続プロパティを構成するために、読み取りオブジェクトと書き込みオブジェクトの両方で必要です。

一般的な使用方法

DataSource 構成を作成します

 SnowflakeIO.DataSourceConfiguration
            .create()
            .withUrl(options.getUrl())
            .withServerName(options.getServerName())
            .withDatabase(options.getDatabase())
            .withWarehouse(options.getWarehouse())
            .withSchema(options.getSchema());
パラメーターは次のとおりです

注記 - `.withUrl(...)` または `.withServerName(...)` のいずれかが必要です

パイプラインオプション

Beam のパイプラインオプションを使用して、コマンドラインからオプションを設定します。

Snowflake パイプラインオプション

Snowflake IO ライブラリは、パイプラインがそれらを使用する場合、デフォルトでコマンドラインを介して渡すことができる次のオプションをサポートしています

`--url` パラメーターなしでアカウント名とリージョンを含む Snowflake の JDBC ライクな URL。

`--serverName` アカウント、ゾーン、ドメインを含む完全なサーバー名。

`--username` ユーザー名/パスワード認証と秘密キー認証に必要なもの。

`--oauthToken` OAuth 認証のみに必要なもの。

`--password` ユーザー名/パスワード認証のみに必要なもの。

`--privateKeyPath` 秘密キーファイルへのパス。秘密キー認証のみに必要なもの。

--rawPrivateKey 秘密鍵。秘密鍵認証の場合のみ必須です。

--privateKeyPassphrase 秘密鍵のパスフレーズ。秘密鍵認証の場合のみ必須です。

--stagingBucketName 外部バケットパス(末尾に/が必要です)。例:{gs,s3}://bucket/。サブディレクトリも許可されます。

--storageIntegrationName ストレージインテグレーション名

--warehouse 使用するウェアハウス。省略可能です。

--database 接続するデータベース名。省略可能です。

--schema 使用するスキーマ。省略可能です。

--table 使用するテーブル。省略可能です。

--query 使用するクエリ。省略可能です。

--role 使用するロール。省略可能です。

--authenticator 使用する認証子。省略可能です。

--portNumber ポート番号。省略可能です。

--loginTimeout ログインタイムアウト。省略可能です。

--snowPipe SnowPipe名。省略可能です。

パイプラインオプションを使用した main コマンドの実行

コマンドラインからパイプラインオプションを渡すには、以下の様にgradleコマンドで--argsを使用します。

./gradle run
    --args="
        --serverName=<SNOWFLAKE SERVER NAME>
           Example: --serverName=account.region.gcp.snowflakecomputing.com
        --username=<SNOWFLAKE USERNAME>
           Example: --username=testuser
        --password=<SNOWFLAKE PASSWORD>
           Example: --password=mypassword
        --database=<SNOWFLAKE DATABASE>
           Example: --database=TEST_DATABASE
        --schema=<SNOWFLAKE SCHEMA>
           Example: --schema=public
        --table=<SNOWFLAKE TABLE IN DATABASE>
           Example: --table=TEST_TABLE
        --query=<IF NOT TABLE THEN QUERY>
           Example: --query=‘SELECT column FROM TABLE’
        --storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
           Example: --storageIntegrationName=my_integration
        --stagingBucketName=<GCS OR S3 BUCKET>
           Example: --stagingBucketName={gs,s3}://bucket/
        --runner=<DirectRunner/DataflowRunner>
           Example: --runner=DataflowRunner
        --project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
           Example: --project=my_project
        --tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
                        WITH gs://…>
           Example: --tempLocation=gs://bucket/temp/
        --region=<FOR DATAFLOW RUNNER: GCP REGION>
           Example: --region=us-east-1
        --appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
           Example: --appName=my_job"
その後、コード内で引数を使用してパラメータにアクセスできます。例:options.getStagingBucketName()

パイプラインオプションを使用した test コマンドの実行

コマンドラインからパイプラインオプションを渡すには、以下の様にgradleコマンドで-DintegrationTestPipelineOptionsを使用します。

./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
  "--serverName=<SNOWFLAKE SERVER NAME>",
      Example: --serverName=account.region.gcp.snowflakecomputing.com
  "--username=<SNOWFLAKE USERNAME>",
      Example: --username=testuser
  "--password=<SNOWFLAKE PASSWORD>",
      Example: --password=mypassword
  "--schema=<SNOWFLAKE SCHEMA>",
      Example: --schema=PUBLIC
  "--table=<SNOWFLAKE TABLE IN DATABASE>",
      Example: --table=TEST_TABLE
  "--database=<SNOWFLAKE DATABASE>",
      Example: --database=TEST_DATABASE
  "--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
      Example: --storageIntegrationName=my_integration
  "--stagingBucketName=<GCS OR S3 BUCKET>",
      Example: --stagingBucketName={gs,s3}://bucket
  "--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
      Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache

すべてのパラメータは「–」で始まり、二重引用符で囲まれ、コンマで区切られます。

Dataflow でのパイプラインの実行

デフォルトでは、パイプラインはローカルマシンのDirect Runnerで実行されます。Google Dataflowでパイプラインを実行するには、以下のパイプラインオプションを指定する必要があります。

Dataflowのその他のパイプラインオプションについては、こちらを参照してください。

注意:Google Cloudで適切に認証するには、gcloudを使用するか、Google Cloudドキュメントに従ってください。

重要Google Dataflowの価格をご確認ください。

Dataflow でのパイプラインテンプレートの実行

Google Dataflowはテンプレートの作成をサポートしています。つまり、Cloud Storageにパイプラインをステージングし、パイプラインの実行時にのみ利用可能なランタイムパラメータを渡して実行できます。

独自のDataflowテンプレート作成手順は以下のとおりです。

  1. 独自のパイプラインを作成します。
  2. 実行時にSnowflakeIOがサポートしているオプションを確認しながら、Dataflowテンプレートを作成します。
  3. Cloud ConsoleREST API、またはgcloudを使用してDataflowテンプレートを実行します。

現在、SnowflakeIOは実行時に以下のオプションをサポートしています。

現在、SnowflakeIOは実行時に以下のオプションをサポートしていません

Snowflake テーブルへの書き込み

SnowflakeIOの機能の1つは、Snowflakeテーブルへの書き込みです。この変換により、ユーザーのPCollectionをSnowflakeデータベースに送信する出力操作でBeamパイプラインを終了できます。

バッチ書き込み (バウンドソースから)

基本的な.write()操作の使用方法は次のとおりです。

data.apply(
   SnowflakeIO.<type>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
)
書き込むPCollectionオブジェクトのデータ型をtypeに置き換えます。例:文字列の入力PCollectionの場合はSnowflakeIO.<String>

以下のすべてのパラメータは必須です。

注意:SnowflakeIOは内部的にCOPYステートメントを使用して書き込みます(COPY to tableを使用)。StagingBucketNameは、Snowflakeに送信されるCSVファイルを保存するために使用されます。これらのCSVファイルは、「stagingBucketName」パスに保存されます。

省略可能(バッチ処理用)

ストリーミング書き込み (アンバウンドソースから)

SnowflakeコンソールでSnowPipeを作成する必要があります。SnowPipeは、.withStagingBucketName.withStorageIntegrationNameメソッドで指定されたものと同じインテグレーションとバケットを使用する必要があります。書き込み操作は次のようになります。

data.apply(
   SnowflakeIO.<type>write()
      .withStagingBucketName("BUCKET")
      .withStorageIntegrationName("STORAGE INTEGRATION NAME")
      .withDataSourceConfiguration(dc)
      .withUserDataMapper(mapper)
      .withSnowPipe("MY_SNOW_PIPE")
      .withFlushTimeLimit(Duration.millis(time))
      .withFlushRowLimit(rowsNumber)
      .withShardsNumber(shardsNumber)
)

パラメーター

必須(ストリーミング用)

注意スキーマデータベースの名前を指定することが重要です。

注記:

前述のように、SnowflakeIOは、無制限のソースからの書き込みのために、内部的にSnowPipe REST呼び出しを使用します。StagingBucketNameは、Snowflakeに送信されるCSVファイルを保存するために使用されます。SnowflakeIOは、ストリーミング中またはストリーミング終了後も、「stagingBucketName」パス下の作成されたCSVファイルを削除しません。

省略可能(ストリーミング用)

重要な注意事項:

  1. ストリーミングは、キーペア認証のみを受け入れます。詳細については、Issue 21287を参照してください。
  2. SnowflakeIO.DataSourceConfigurationオブジェクトで設定されたroleパラメータは、ストリーミング書き込みでは無視されます。詳細については、Issue 21365を参照してください。

フラッシュ時間:期間と行数

期間:ストリーミング書き込みは、フラッシュタイムリミットで指定された時間間隔に従って、ステージに定期的にファイルを書き込みます(例:1分ごと)。

行数:書き込みのためにステージングされたファイルは、フラッシュタイムリミットに達するまで、フラッシュ行リミットで指定された行数になります(例:制限が1000行で、バッファに99行が収集され、1分のフラッシュ時間が経過した場合、行は挿入のためにSnowPipeに送信されます)。

ステージングされたファイルのサイズは、行のサイズと使用される圧縮(GZIP)によって異なります。

UserDataMapper 関数

UserDataMapper関数は、write()操作がデータを一時的な.csvファイルに保存する前に、PCollectionからのデータを文字列値の配列にマッピングするために必要です。例:

public static SnowflakeIO.UserDataMapper<Long> getCsvMapper() {
    return (SnowflakeIO.UserDataMapper<Long>) recordLine -> new String[] {recordLine.toString()};
}

追加の書き込みオプション

変換クエリ

.withQueryTransformation()オプションは、SQLクエリを文字列値として受け入れます。このクエリは、CSVファイルにステージングされたデータをターゲットのSnowflakeテーブルに直接転送する際に実行されます。変換SQL構文の詳細については、Snowflakeドキュメントを参照してください。

使用方法

String query = "SELECT t.$1 from YOUR_TABLE;";
data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withQueryTransformation(query)
)

書き込みディスポジション

.withWriteDisposition(...)オプションを指定して、データが書き込まれるテーブルに基づいて書き込み動作を定義します。

使用例

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withWriteDisposition(TRUNCATE)
)

作成ディスポジション

.withCreateDisposition() オプションは、対象テーブルが存在しない場合の書き込み操作の動作を定義します。以下の値がサポートされています。

使用方法

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withCreateDisposition(CREATE_NEVER)
)

テーブルスキーマディスポジション

.withCreateDisposition() オプションが CREATE_IF_NEEDED に設定されている場合、.withTableSchema() オプションを使用すると、作成される対象テーブルのスキーマを指定できます。テーブルスキーマは、テーブルの各列に対応する列名と型を持つ SnowflakeColumn オブジェクトのリストです。

使用方法

SnowflakeTableSchema tableSchema =
    new SnowflakeTableSchema(
        SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
        new SnowflakeColumn("id", new SnowflakeNumber()),
        SnowflakeColumn.of("name", new SnowflakeText(), true));

data.apply(
   SnowflakeIO.<~>write()
       .withDataSourceConfiguration(dc)
       .to("MY_TABLE")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withUserDataMapper(mapper)
       .withTableSchema(tableSchema)
)

Snowflake からの読み取り

SnowflakeIO の機能の1つは、Snowflakeテーブルの読み込みです。テーブル名による完全なテーブル読み込み、またはクエリによるカスタムデータ読み込みが可能です。読み込み変換の出力は、ユーザー定義データ型のPCollectionです。

一般的な使用方法

基本的な.read() 操作の使用方法

PCollection<USER_DATA_TYPE> items = pipeline.apply(
   SnowflakeIO.<USER_DATA_TYPE>read()
       .withDataSourceConfiguration(dc)
       .fromTable("MY_TABLE") // or .fromQuery("QUERY")
       .withStagingBucketName("BUCKET")
       .withStorageIntegrationName("STORAGE INTEGRATION NAME")
       .withCsvMapper(mapper)
       .withCoder(coder));
)
以下のすべてのパラメータが必要です。

注記: SnowflakeIO は、バックグラウンドでCOPY文を使用して、クラウドストレージにステージングされたファイルを読み込みます(COPY to location を使用)。StagingBucketName は、CSVファイルの一時保存場所として使用されます。これらのテンポラリディレクトリはsf_copy_csv_DATE_TIME_RANDOMSUFFIX という名前になり、読み込み操作が完了すると自動的に削除されます。

CSVMapper

SnowflakeIO は、COPY INTO 文を使用して、SnowflakeテーブルからGCS/S3にCSVファイルとしてデータを移動します。これらのファイルはその後、FileIO を介してダウンロードされ、行ごとに処理されます。各行は、OpenCSV ライブラリを使用して文字列の配列に分割されます。

CSVMapper の役割は、ユーザーが文字列の配列をユーザー定義型(例:AvroまたはParquetファイルのGenericRecord、またはカスタムPOJO)に変換できるようにすることです。

GenericRecord 用の CsvMapper の実装例

static SnowflakeIO.CsvMapper<GenericRecord> getCsvMapper() {
   return (SnowflakeIO.CsvMapper<GenericRecord>)
           parts -> {
               return new GenericRecordBuilder(PARQUET_SCHEMA)
                       .set("ID", Long.valueOf(parts[0]))
                       .set("NAME", parts[1])
                       [...]
                       .build();
           };
}

AWS S3 との SnowflakeIO の使用

stagingBucketName として AWS S3 バケットを使用するには、

  1. SnowflakePipelineOptionsS3Options を拡張するPipelineOptions インターフェースを作成し、AwsAccessKeyAwsSecretKey オプションを追加します。例

public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {

    @Description("AWS Access Key")
    @Default.String("access_key")
    String getAwsAccessKey();

    void setAwsAccessKey(String awsAccessKey);

    @Description("AWS secret key")
    @Default.String("secret_key")
    String getAwsSecretKey();

    void setAwsSecretKey(String awsSecretKey);
}
2. AwsAccessKeyAwsSecretKey オプションを使用して、AwsCredentialsProvider オプションを設定します。

options.setAwsCredentialsProvider(
    new AWSStaticCredentialsProvider(
        new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey())
    )
);
3. パイプラインを作成します。

Pipeline p = Pipeline.create(options);

注記: S3Options から awsRegion を設定することを忘れないでください。

Python SDK での SnowflakeIO の使用

イントロ

Snowflake のクロス言語実装は、Portability Framework Roadmap の一部であるクロス言語機能のおかげで、Python プログラミング言語での読み込みと書き込み操作の両方をサポートしています。これは、開発者の観点からは、異なる言語(Java/Python/Go)で記述された変換を組み合わせることができることを意味します。

クロス言語の詳細については、マルチSDKの取り組みクロス言語変換APIと拡張サービス の記事をご覧ください。

追加のリソース

Snowflake からの読み取り

SnowflakeIO の機能の1つは、Snowflakeテーブルの読み込みです。テーブル名による完全なテーブル読み込み、またはクエリによるカスタムデータ読み込みが可能です。読み込み変換の出力は、ユーザー定義データ型のPCollectionです。

一般的な使用方法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | ReadFromSnowflake(...)
       | <FURTHER TRANSFORMS>)

必須パラメーター

認証パラメーター

認証には、以下の有効なパラメータの組み合わせのいずれかを渡す必要があります。

追加のパラメーター

Snowflake への書き込み

SnowflakeIO の機能の1つは、Snowflakeテーブルへの書き込みです。この変換により、ユーザーのPCollectionをSnowflakeデータベースに送信する出力操作でBeamパイプラインを終了できます。

一般的な使用方法

OPTIONS = ["--runner=FlinkRunner"]

with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
   (p
       | <SOURCE OF DATA>
       | WriteToSnowflake(
           server_name=<SNOWFLAKE SERVER NAME>,
           username=<SNOWFLAKE USERNAME>,
           password=<SNOWFLAKE PASSWORD>,
           o_auth_token=<OAUTH TOKEN>,
           private_key_path=<PATH TO P8 FILE>,
           raw_private_key=<PRIVATE_KEY>
           private_key_passphrase=<PASSWORD FOR KEY>,
           schema=<SNOWFLAKE SCHEMA>,
           database=<SNOWFLAKE DATABASE>,
           staging_bucket_name=<GCS OR S3 BUCKET>,
           storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
           create_disposition=<CREATE DISPOSITION>,
           write_disposition=<WRITE DISPOSITION>,
           table_schema=<SNOWFLAKE TABLE SCHEMA>,
           user_data_mapper=<USER DATA MAPPER FUNCTION>,
           table=<SNOWFLAKE TABLE>,
           query=<IF NOT TABLE THEN QUERY>,
           role=<SNOWFLAKE ROLE>,
           warehouse=<SNOWFLAKE WAREHOUSE>,
           expansion_service=<EXPANSION SERVICE ADDRESS>))

必須パラメーター

認証パラメーター

認証には、以下の有効なパラメータの組み合わせのいずれかを渡す必要があります。

追加のパラメーター

制限事項

SnowflakeIOには現在、以下の制限があります。

  1. ストリーミング書き込みは、キーペア認証のみをサポートしています。詳細については、Issue 21287を参照してください。

  2. SnowflakeIO.DataSourceConfigurationオブジェクトで設定されたroleパラメータは、ストリーミング書き込みでは無視されます。詳細については、Issue 21365を参照してください。