Snowflake I/O
Snowflake IO の使用と実行に関するパイプラインオプションと一般的な情報。
開始する前に
SnowflakeIO を使用するには、Maven アーティファクトの依存関係を `pom.xml` ファイルに追加します。
追加のリソース
認証
読み取りとバッチ書き込みは、次の認証方法をサポートしています
- ユーザー名とパスワード
- キーペア
- OAuth トークン
ストリーミング書き込みは、キーペア認証のみをサポートしています。詳細については、BEAM-3304 を参照してください。
資格情報の受け渡しは、`SnowflakeIO.DataSourceConfiguration` クラスをインスタンス化する際に使用されるパイプラインオプションを介して行われます。各認証方法には、このクラスを構成する異なる方法があります。
ユーザー名とパスワード
SnowflakeIO でユーザー名/パスワード認証を使用するには、次のパイプラインオプションを使用してパイプラインを呼び出します
資格情報の受け渡しは、`SnowflakeIO.DataSourceConfiguration` クラスをインスタンス化する際に使用されるパイプラインオプションを介して行われます。
SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create()
.withUsernamePasswordAuth(
options.getUsername(),
options.getPassword())
.withServerName(options.getServerName())
.withDatabase(options.getDatabase())
.withRole(options.getRole())
.withWarehouse(options.getWarehouse())
.withSchema(options.getSchema());
キーペア
この認証方法を使用するには、最初にキーペアを生成し、IO 変換を使用して接続する Snowflake ユーザーに公開キーを関連付ける必要があります。手順については、Snowflake ドキュメントのキーペア認証とキーペアローテーションを参照してください。
SnowflakeIO でキーペア認証を使用するには、次のパイプラインオプションのセットのいずれかを使用してパイプラインを呼び出します
- キーをパスとして渡す場合`SnowflakeIO.DataSourceConfiguration` クラスの初期化は次のようになります
SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairPathAuth( options.getUsername(), options.getPrivateKeyPath(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
- キーを値として渡す場合`SnowflakeIO.DataSourceConfiguration` クラスの初期化は次のようになります
SnowflakeIO.DataSourceConfiguration datasource = SnowflakeIO.DataSourceConfiguration.create() .withKeyPairRawAuth( options.getUsername(), options.getRawPrivateKey(), options.getPrivateKeyPassphrase()) .withServerName(options.getServerName()) .withDatabase(options.getDatabase()) .withRole(options.getRole()) .withWarehouse(options.getWarehouse()) .withSchema(options.getSchema());
OAuth トークン
SnowflakeIO は OAuth トークンもサポートしています。
重要:SnowflakeIO には有効な OAuth アクセストークンが必要です。トークンを更新したり、Web ベースのフローを使用してトークンを取得したりすることはできません。OAuth 統合の構成とトークンの取得については、Snowflake ドキュメントを参照してください。
トークンを取得したら、次のパイプラインオプションを使用してパイプラインを呼び出します
`SnowflakeIO.DataSourceConfiguration` クラスの初期化は次のようになりますDataSource 構成
DataSource 構成は、IO の目的で Snowflake 接続プロパティを構成するために、読み取りオブジェクトと書き込みオブジェクトの両方で必要です。
一般的な使用方法
DataSource 構成を作成します
.withUrl(...)
- アカウント名とリージョンを含む Snowflake アカウントの JDBC ライクな URL(パラメーターなし)。
- 例:`jdbc:snowflake://account.snowflakecomputing.com`
.withServerName(...)
- サーバー名 - アカウント、ゾーン、ドメインを含む完全なサーバー名。
- 例:`account.snowflakecomputing.com`
.withDatabase(...)
- 使用する Snowflake データベースの名前。
- 例:`MY_DATABASE`
.withWarehouse(...)
- 使用する Snowflake ウェアハウスの名前。このパラメーターはオプションです。ウェアハウス名が指定されていない場合、ユーザーのデフォルトウェアハウスが使用されます。
- 例:`MY_WAREHOUSE`
.withSchema(...)
- 使用するデータベース内のスキーマの名前。このパラメーターはオプションです。
- 例:`PUBLIC`
.withUsernamePasswordAuth(username, password)
- ユーザー名/パスワード認証を設定します。
- 例:`.withUsernamePasswordAuth("USERNAME", "PASSWORD")`
.withOAuth(token)
- OAuth 認証を設定します。
- 例:`.withOAuth("TOKEN")`
.withKeyPairAuth(username, privateKey)
- ユーザー名とPrivateKey を使用してキーペア認証を設定します
- 例:`.withKeyPairAuth("USERNAME",`PrivateKey`)`
.withKeyPairPathAuth(username, privateKeyPath, privateKeyPassphrase)
- ユーザー名、秘密キーファイルへのパス、パスフレーズを使用してキーペア認証を設定します。
- 例:`.withKeyPairPathAuth("USERNAME", "PATH/TO/KEY.P8", "PASSPHRASE")`
.withKeyPairRawAuth(username, rawPrivateKey, privateKeyPassphrase)
- ユーザー名、秘密キー、パスフレーズを使用してキーペア認証を設定します。
- 例:`.withKeyPairRawAuth("USERNAME", "PRIVATE_KEY", "PASSPHRASE")`
注記 - `.withUrl(...)` または `.withServerName(...)` のいずれかが必要です。
パイプラインオプション
Beam のパイプラインオプションを使用して、コマンドラインからオプションを設定します。
Snowflake パイプラインオプション
Snowflake IO ライブラリは、パイプラインがそれらを使用する場合、デフォルトでコマンドラインを介して渡すことができる次のオプションをサポートしています
`--url` パラメーターなしでアカウント名とリージョンを含む Snowflake の JDBC ライクな URL。
`--serverName` アカウント、ゾーン、ドメインを含む完全なサーバー名。
`--username` ユーザー名/パスワード認証と秘密キー認証に必要なもの。
`--oauthToken` OAuth 認証のみに必要なもの。
`--password` ユーザー名/パスワード認証のみに必要なもの。
`--privateKeyPath` 秘密キーファイルへのパス。秘密キー認証のみに必要なもの。
--rawPrivateKey
秘密鍵。秘密鍵認証の場合のみ必須です。
--privateKeyPassphrase
秘密鍵のパスフレーズ。秘密鍵認証の場合のみ必須です。
--stagingBucketName
外部バケットパス(末尾に/
が必要です)。例:{gs,s3}://bucket/
。サブディレクトリも許可されます。
--storageIntegrationName
ストレージインテグレーション名
--warehouse
使用するウェアハウス。省略可能です。
--database
接続するデータベース名。省略可能です。
--schema
使用するスキーマ。省略可能です。
--table
使用するテーブル。省略可能です。
--query
使用するクエリ。省略可能です。
--role
使用するロール。省略可能です。
--authenticator
使用する認証子。省略可能です。
--portNumber
ポート番号。省略可能です。
--loginTimeout
ログインタイムアウト。省略可能です。
--snowPipe
SnowPipe名。省略可能です。
パイプラインオプションを使用した main コマンドの実行
コマンドラインからパイプラインオプションを渡すには、以下の様にgradleコマンドで--args
を使用します。
./gradle run
--args="
--serverName=<SNOWFLAKE SERVER NAME>
Example: --serverName=account.region.gcp.snowflakecomputing.com
--username=<SNOWFLAKE USERNAME>
Example: --username=testuser
--password=<SNOWFLAKE PASSWORD>
Example: --password=mypassword
--database=<SNOWFLAKE DATABASE>
Example: --database=TEST_DATABASE
--schema=<SNOWFLAKE SCHEMA>
Example: --schema=public
--table=<SNOWFLAKE TABLE IN DATABASE>
Example: --table=TEST_TABLE
--query=<IF NOT TABLE THEN QUERY>
Example: --query=‘SELECT column FROM TABLE’
--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>
Example: --storageIntegrationName=my_integration
--stagingBucketName=<GCS OR S3 BUCKET>
Example: --stagingBucketName={gs,s3}://bucket/
--runner=<DirectRunner/DataflowRunner>
Example: --runner=DataflowRunner
--project=<FOR DATAFLOW RUNNER: GCP PROJECT NAME>
Example: --project=my_project
--tempLocation=<FOR DATAFLOW RUNNER: GCS TEMP LOCATION STARTING
WITH gs://…>
Example: --tempLocation=gs://bucket/temp/
--region=<FOR DATAFLOW RUNNER: GCP REGION>
Example: --region=us-east-1
--appName=<OPTIONAL: DATAFLOW JOB NAME PREFIX>
Example: --appName=my_job"
options.getStagingBucketName()
パイプラインオプションを使用した test コマンドの実行
コマンドラインからパイプラインオプションを渡すには、以下の様にgradleコマンドで-DintegrationTestPipelineOptions
を使用します。
./gradlew test --tests nameOfTest
-DintegrationTestPipelineOptions='[
"--serverName=<SNOWFLAKE SERVER NAME>",
Example: --serverName=account.region.gcp.snowflakecomputing.com
"--username=<SNOWFLAKE USERNAME>",
Example: --username=testuser
"--password=<SNOWFLAKE PASSWORD>",
Example: --password=mypassword
"--schema=<SNOWFLAKE SCHEMA>",
Example: --schema=PUBLIC
"--table=<SNOWFLAKE TABLE IN DATABASE>",
Example: --table=TEST_TABLE
"--database=<SNOWFLAKE DATABASE>",
Example: --database=TEST_DATABASE
"--storageIntegrationName=<SNOWFLAKE STORAGE INTEGRATION NAME>",
Example: --storageIntegrationName=my_integration
"--stagingBucketName=<GCS OR S3 BUCKET>",
Example: --stagingBucketName={gs,s3}://bucket
"--externalLocation=<GCS BUCKET URL STARTING WITH GS://>",
Example: --tempLocation=gs://bucket/temp/
]' --no-build-cache
すべてのパラメータは「–」で始まり、二重引用符で囲まれ、コンマで区切られます。
--serverName=<SNOWFLAKE サーバー名>
- アカウントの完全名(Snowflakeから提供)を指定します。アカウントの完全名には、アカウントがホストされているリージョンとクラウドプラットフォームを識別する追加セグメントが含まれている場合があります。
- 例:
--serverName=xy12345.eu-west-1.gcp..snowflakecomputing.com
--username=<SNOWFLAKE ユーザー名>
- ユーザーのログイン名を指定します。
- 例:
--username=my_username
--password=<SNOWFLAKE パスワード>
- 指定されたユーザーのパスワードを指定します。
- 例:
--password=my_secret
--schema=<SNOWFLAKE スキーマ>
- 接続後に使用するスキーマを指定します。指定されたスキーマは、指定されたユーザーのロールが権限を持つ既存のスキーマである必要があります。
- 例:
--schema=PUBLIC
--table=<SNOWFLAKE データベース内のテーブル>
- 例:
--table=MY_TABLE
- 例:
--database=<SNOWFLAKE データベース>
- 接続後に使用するデータベースを指定します。指定されたデータベースは、指定されたユーザーのロールが権限を持つ既存のデータベースである必要があります。
- 例:
--database=MY_DATABASE
--storageIntegrationName=<SNOWFLAKE ストレージインテグレーション名>
- 選択したクラウドストレージ用にSnowflakeで作成されたストレージインテグレーションの名前です。Snowflakeドキュメントを参照してください。
- 例:
--storageIntegrationName=my_google_integration
Dataflow でのパイプラインの実行
デフォルトでは、パイプラインはローカルマシンのDirect Runnerで実行されます。Google Dataflowでパイプラインを実行するには、以下のパイプラインオプションを指定する必要があります。
--runner=DataflowRunner
- Dataflow固有のランナーです。
--project=<GCS プロジェクト>
- Google Cloud Platformプロジェクトの名前です。
--stagingBucketName=<GCS または S3 バケット>
- BeamファイルがステージングされるGoogle Cloud ServicesバケットまたはAWS S3バケットです。
--maxNumWorkers=5
- (省略可能) ワーカーの最大数。
--appName=<ジョブ名>
- (省略可能) Dataflowダッシュボードでのジョブ名のプレフィックス。
Dataflowのその他のパイプラインオプションについては、こちらを参照してください。
注意:Google Cloudで適切に認証するには、gcloudを使用するか、Google Cloudドキュメントに従ってください。
重要:Google Dataflowの価格をご確認ください。
Dataflow でのパイプラインテンプレートの実行
Google Dataflowはテンプレートの作成をサポートしています。つまり、Cloud Storageにパイプラインをステージングし、パイプラインの実行時にのみ利用可能なランタイムパラメータを渡して実行できます。
独自のDataflowテンプレート作成手順は以下のとおりです。
- 独自のパイプラインを作成します。
- 実行時にSnowflakeIOがサポートしているオプションを確認しながら、Dataflowテンプレートを作成します。
- Cloud Console、REST API、またはgcloudを使用してDataflowテンプレートを実行します。
現在、SnowflakeIOは実行時に以下のオプションをサポートしています。
`--serverName` アカウント、ゾーン、ドメインを含む完全なサーバー名。
`--username` ユーザー名/パスワード認証と秘密キー認証に必要なもの。
`--password` ユーザー名/パスワード認証のみに必要なもの。
--rawPrivateKey
秘密鍵ファイル。秘密鍵認証の場合のみ必須です。--privateKeyPassphrase
秘密鍵のパスフレーズ。秘密鍵認証の場合のみ必須です。--stagingBucketName
外部バケットパス(末尾に/
が必要です)。例:{gs,s3}://bucket/
。サブディレクトリも許可されます。--storageIntegrationName
ストレージインテグレーション名。--warehouse
使用するウェアハウス。省略可能です。--database
接続するデータベース名。省略可能です。--schema
使用するスキーマ。省略可能です。--table
使用するテーブル。省略可能。注:tableはデフォルトのパイプラインオプションには含まれていません。--query
使用するクエリ。省略可能。注:queryはデフォルトのパイプラインオプションには含まれていません。--role
使用するロール。省略可能です。--snowPipe
SnowPipe名。省略可能です。
現在、SnowflakeIOは実行時に以下のオプションをサポートしていません。
`--url` パラメーターなしでアカウント名とリージョンを含む Snowflake の JDBC ライクな URL。
`--oauthToken` OAuth 認証のみに必要なもの。
`--privateKeyPath` 秘密キーファイルへのパス。秘密キー認証のみに必要なもの。
--authenticator
使用する認証子。省略可能です。--portNumber
ポート番号。省略可能です。--loginTimeout
ログインタイムアウト。省略可能です。
Snowflake テーブルへの書き込み
SnowflakeIOの機能の1つは、Snowflakeテーブルへの書き込みです。この変換により、ユーザーのPCollectionをSnowflakeデータベースに送信する出力操作でBeamパイプラインを終了できます。
バッチ書き込み (バウンドソースから)
基本的な.write()
操作の使用方法は次のとおりです。
PCollection
オブジェクトのデータ型をtypeに置き換えます。例:文字列の入力PCollection
の場合はSnowflakeIO.<String>
。以下のすべてのパラメータは必須です。
.withDataSourceConfiguration()
DatasourceConfigurationオブジェクトを受け入れます。.to()
ターゲットのSnowflakeテーブル名を受け入れます。.withStagingBucketName()
スラッシュで終わるクラウドバケットパスを受け入れます。例:.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
Snowflakeドキュメントに従って作成されたSnowflakeストレージインテグレーションオブジェクトの名前を受け入れます。例:そして.withUserDataMapper()
ユーザーのPCollectionを文字列値の配列(String[])
にマッピングするUserDataMapper関数を受け入れます。
注意:SnowflakeIOは内部的にCOPY
ステートメントを使用して書き込みます(COPY to tableを使用)。StagingBucketNameは、Snowflakeに送信されるCSVファイルを保存するために使用されます。これらのCSVファイルは、「stagingBucketName」パスに保存されます。
省略可能(バッチ処理用)
.withQuotationMark()
- デフォルト値:
‘
(シングルクォーテーション)。 - 1文字の文字列を受け入れます。CSVに保存されるすべてのテキスト(文字列)フィールドを囲みます。SnowflakeのFIELD_OPTIONALLY_ENCLOSED_BYパラメータで許可されている文字のいずれかである必要があります(二重引用符、シングルクォーテーション、またはなし)。
- 例:
.withQuotationMark("'")
- デフォルト値:
ストリーミング書き込み (アンバウンドソースから)
SnowflakeコンソールでSnowPipeを作成する必要があります。SnowPipeは、.withStagingBucketName
と.withStorageIntegrationName
メソッドで指定されたものと同じインテグレーションとバケットを使用する必要があります。書き込み操作は次のようになります。
data.apply(
SnowflakeIO.<type>write()
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withDataSourceConfiguration(dc)
.withUserDataMapper(mapper)
.withSnowPipe("MY_SNOW_PIPE")
.withFlushTimeLimit(Duration.millis(time))
.withFlushRowLimit(rowsNumber)
.withShardsNumber(shardsNumber)
)
パラメーター
必須(ストリーミング用)
.withDataSourceConfiguration()
- DatasourceConfigurationオブジェクトを受け入れます。
.to()
- ターゲットのSnowflakeテーブル名を受け入れます。
- 例:
.to("MY_TABLE")
.withStagingBucketName()
- スラッシュで終わるクラウドバケットパスを受け入れます。
- 例:
.withStagingBucketName("{gs,s3}://bucket/my/dir/")
.withStorageIntegrationName()
- Snowflakeドキュメントに従って作成されたSnowflakeストレージインテグレーションオブジェクトの名前を受け入れます。
- 例そして
.withSnowPipe()
ターゲットのSnowPipe名を受け入れます。
.withSnowPipe()
はSnowpipeの正確な名前を受け入れます。例そして
注意:スキーマとデータベースの名前を指定することが重要です。
.withUserDataMapper()
- ユーザーのPCollectionを文字列値の配列
(String[])
にマッピングするUserDataMapper関数を受け入れます。
- ユーザーのPCollectionを文字列値の配列
注記:
前述のように、SnowflakeIOは、無制限のソースからの書き込みのために、内部的にSnowPipe REST呼び出しを使用します。StagingBucketNameは、Snowflakeに送信されるCSVファイルを保存するために使用されます。SnowflakeIOは、ストリーミング中またはストリーミング終了後も、「stagingBucketName」パス下の作成されたCSVファイルを削除しません。
省略可能(ストリーミング用)
.withFlushTimeLimit()
- デフォルト値:30秒
- ストリーミング書き込みが繰り返される時間を指定したDurationオブジェクトを受け入れます。
- 例:
.withFlushTimeLimit(Duration.millis(180000))
.withFlushRowLimit()
- デフォルト値:10,000行
- 各ステージングされたファイルに書き込まれる行数の制限
- 例:
.withFlushRowLimit(500000)
.withShardNumber()
- デフォルト値:1シャード
- 各フラッシュで保存されるファイルの数(並列書き込みのため)。
- 例:
.withShardNumber(5)
.withQuotationMark()
- デフォルト値:
‘
(シングルクォーテーション)。 - 1文字の文字列を受け入れます。CSVに保存されるすべてのテキスト(文字列)フィールドを囲みます。SnowflakeのFIELD_OPTIONALLY_ENCLOSED_BYパラメータで許可されている文字のいずれかである必要があります(二重引用符、シングルクォーテーション、またはなし)。例:
.withQuotationMark("")
(引用符なし)
- デフォルト値:
.withDebugMode()
- 受け入れます
SnowflakeIO.StreamingLogLevel.INFO
- 読み込まれたファイルに関するすべての情報を表示します。SnowflakeIO.StreamingLogLevel.ERROR
- エラーのみを表示します。
- insertReportと同様に、Snowflakeにストリーミングされたファイルに関するログを表示します。デバッグモードを有効にすると、パフォーマンスに影響を与える可能性があります。
- 例:
.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)
- 受け入れます
重要な注意事項:
- ストリーミングは、キーペア認証のみを受け入れます。詳細については、Issue 21287を参照してください。
SnowflakeIO.DataSourceConfiguration
オブジェクトで設定されたroleパラメータは、ストリーミング書き込みでは無視されます。詳細については、Issue 21365を参照してください。
フラッシュ時間:期間と行数
期間:ストリーミング書き込みは、フラッシュタイムリミットで指定された時間間隔に従って、ステージに定期的にファイルを書き込みます(例:1分ごと)。
行数:書き込みのためにステージングされたファイルは、フラッシュタイムリミットに達するまで、フラッシュ行リミットで指定された行数になります(例:制限が1000行で、バッファに99行が収集され、1分のフラッシュ時間が経過した場合、行は挿入のためにSnowPipeに送信されます)。
ステージングされたファイルのサイズは、行のサイズと使用される圧縮(GZIP)によって異なります。
UserDataMapper 関数
UserDataMapper
関数は、write()
操作がデータを一時的な.csv
ファイルに保存する前に、PCollection
からのデータを文字列値の配列にマッピングするために必要です。例:
追加の書き込みオプション
変換クエリ
.withQueryTransformation()
オプションは、SQLクエリを文字列値として受け入れます。このクエリは、CSVファイルにステージングされたデータをターゲットのSnowflakeテーブルに直接転送する際に実行されます。変換SQL構文の詳細については、Snowflakeドキュメントを参照してください。
使用方法
書き込みディスポジション
.withWriteDisposition(...)
オプションを指定して、データが書き込まれるテーブルに基づいて書き込み動作を定義します。
APPEND
- デフォルトの動作。書き込まれたデータは、テーブルの既存の行に追加されます。EMPTY
- ターゲットテーブルは空である必要があります。そうでない場合、書き込み操作は失敗します。TRUNCATE
- 書き込み操作は、書き込む前にターゲットテーブルからすべての行を削除します。
使用例
作成ディスポジション
.withCreateDisposition()
オプションは、対象テーブルが存在しない場合の書き込み操作の動作を定義します。以下の値がサポートされています。
CREATE_IF_NEEDED
- デフォルト動作。書き込み操作は、指定された対象テーブルが存在するかどうかをチェックします。存在しない場合、書き込み操作はテーブルの作成を試みます。対象テーブルのスキーマは、.withTableSchema()
オプションを使用して指定してください。CREATE_NEVER
- 対象テーブルが存在しない場合、書き込み操作は失敗します。
使用方法
テーブルスキーマディスポジション
.withCreateDisposition()
オプションが CREATE_IF_NEEDED
に設定されている場合、.withTableSchema()
オプションを使用すると、作成される対象テーブルのスキーマを指定できます。テーブルスキーマは、テーブルの各列に対応する列名と型を持つ SnowflakeColumn
オブジェクトのリストです。
使用方法
SnowflakeTableSchema tableSchema =
new SnowflakeTableSchema(
SnowflakeColumn.of("my_date", new SnowflakeDate(), true),
new SnowflakeColumn("id", new SnowflakeNumber()),
SnowflakeColumn.of("name", new SnowflakeText(), true));
data.apply(
SnowflakeIO.<~>write()
.withDataSourceConfiguration(dc)
.to("MY_TABLE")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withUserDataMapper(mapper)
.withTableSchema(tableSchema)
)
Snowflake からの読み取り
SnowflakeIO の機能の1つは、Snowflakeテーブルの読み込みです。テーブル名による完全なテーブル読み込み、またはクエリによるカスタムデータ読み込みが可能です。読み込み変換の出力は、ユーザー定義データ型のPCollectionです。
一般的な使用方法
基本的な.read()
操作の使用方法
PCollection<USER_DATA_TYPE> items = pipeline.apply(
SnowflakeIO.<USER_DATA_TYPE>read()
.withDataSourceConfiguration(dc)
.fromTable("MY_TABLE") // or .fromQuery("QUERY")
.withStagingBucketName("BUCKET")
.withStorageIntegrationName("STORAGE INTEGRATION NAME")
.withCsvMapper(mapper)
.withCoder(coder));
)
.withDataSourceConfiguration(...)
- DataSourceConfigurationオブジェクトを受け入れます。
.fromTable(...) または .fromQuery(...)
- Snowflakeテーブル名またはカスタムSQLクエリを指定します。
.withStagingBucketName()
- クラウドバケット名を受け入れます。
.withStorageIntegrationName()
Snowflakeドキュメントに従って作成されたSnowflakeストレージインテグレーションオブジェクトの名前を受け入れます。例
そして.withCsvMapper(mapper)
- CSVMapper インスタンスを受け入れ、String[] を USER_DATA_TYPE にマッピングします。
.withCoder(coder)
- Coder を USER_DATA_TYPE に対して受け入れます。
注記: SnowflakeIO は、バックグラウンドでCOPY
文を使用して、クラウドストレージにステージングされたファイルを読み込みます(COPY to location を使用)。StagingBucketName は、CSVファイルの一時保存場所として使用されます。これらのテンポラリディレクトリはsf_copy_csv_DATE_TIME_RANDOMSUFFIX
という名前になり、読み込み操作が完了すると自動的に削除されます。
CSVMapper
SnowflakeIO は、COPY INTO
CSVMapper の役割は、ユーザーが文字列の配列をユーザー定義型(例:AvroまたはParquetファイルのGenericRecord、またはカスタムPOJO)に変換できるようにすることです。
GenericRecord 用の CsvMapper の実装例
AWS S3 との SnowflakeIO の使用
stagingBucketName
として AWS S3 バケットを使用するには、
SnowflakePipelineOptions
と S3Options を拡張するPipelineOptions インターフェースを作成し、AwsAccessKey
とAwsSecretKey
オプションを追加します。例
public interface AwsPipelineOptions extends SnowflakePipelineOptions, S3Options {
@Description("AWS Access Key")
@Default.String("access_key")
String getAwsAccessKey();
void setAwsAccessKey(String awsAccessKey);
@Description("AWS secret key")
@Default.String("secret_key")
String getAwsSecretKey();
void setAwsSecretKey(String awsSecretKey);
}
AwsAccessKey
と AwsSecretKey
オプションを使用して、AwsCredentialsProvider
オプションを設定します。注記: S3Options から awsRegion
を設定することを忘れないでください。
Python SDK での SnowflakeIO の使用
イントロ
Snowflake のクロス言語実装は、Portability Framework Roadmap の一部であるクロス言語機能のおかげで、Python プログラミング言語での読み込みと書き込み操作の両方をサポートしています。これは、開発者の観点からは、異なる言語(Java/Python/Go)で記述された変換を組み合わせることができることを意味します。
クロス言語の詳細については、マルチSDKの取り組み と クロス言語変換APIと拡張サービス の記事をご覧ください。
追加のリソース
Snowflake からの読み取り
SnowflakeIO の機能の1つは、Snowflakeテーブルの読み込みです。テーブル名による完全なテーブル読み込み、またはクエリによるカスタムデータ読み込みが可能です。読み込み変換の出力は、ユーザー定義データ型のPCollectionです。
一般的な使用方法
必須パラメーター
server_name
アカウント、ゾーン、ドメインを含む完全なSnowflakeサーバー名。schema
使用するデータベース内のSnowflakeスキーマ名。database
使用するSnowflakeデータベース名。staging_bucket_name
Google Cloud StorageバケットまたはAWS S3バケットの名前。バケットは、CSVファイルの一時保存場所として使用されます。これらのテンポラリディレクトリはsf_copy_csv_DATE_TIME_RANDOMSUFFIX
という名前になり、読み込み操作が完了すると自動的に削除されます。storage_integration_name
Snowflakeドキュメントに従って作成されたSnowflakeストレージインテグレーションオブジェクトの名前です。csv_mapper
ユーザー定義オブジェクトを文字列の配列に変換する関数。SnowflakeIO は、COPY INTO文を使用して、SnowflakeテーブルからGCS/S3にCSVファイルとしてデータを移動します。これらのファイルはその後、FileIOを介してダウンロードされ、行ごとに処理されます。各行は、OpenCSVライブラリを使用して文字列の配列に分割されます。 csv_mapper
関数の役割は、ユーザーが文字列の配列をユーザー定義型(例:AvroまたはParquetファイルのGenericRecord、またはカスタムオブジェクト)に変換できるようにすることです。例table
またはquery
Snowflakeテーブル名またはカスタムSQLクエリを指定します。
認証パラメーター
認証には、以下の有効なパラメータの組み合わせのいずれかを渡す必要があります。
username
とpassword
ユーザー名/パスワード認証方法のユーザー名とパスワードを指定します。private_key_path
とprivate_key_passphrase
秘密鍵へのパスとキーペア認証方法のパスフレーズを指定します。raw_private_key
とprivate_key_passphrase
秘密鍵とキーペア認証方法のパスフレーズを指定します。o_auth_token
OAuth認証方法のアクセストークンを指定します。
追加のパラメーター
role
Snowflakeロールを指定します。指定しない場合、ユーザーのデフォルトが使用されます。warehouse
Snowflakeウェアハウス名を指定します。指定しない場合、ユーザーのデフォルトが使用されます。expansion_service
拡張サービスのURLを指定します。
Snowflake への書き込み
SnowflakeIO の機能の1つは、Snowflakeテーブルへの書き込みです。この変換により、ユーザーのPCollectionをSnowflakeデータベースに送信する出力操作でBeamパイプラインを終了できます。
一般的な使用方法
OPTIONS = ["--runner=FlinkRunner"]
with TestPipeline(options=PipelineOptions(OPTIONS)) as p:
(p
| <SOURCE OF DATA>
| WriteToSnowflake(
server_name=<SNOWFLAKE SERVER NAME>,
username=<SNOWFLAKE USERNAME>,
password=<SNOWFLAKE PASSWORD>,
o_auth_token=<OAUTH TOKEN>,
private_key_path=<PATH TO P8 FILE>,
raw_private_key=<PRIVATE_KEY>
private_key_passphrase=<PASSWORD FOR KEY>,
schema=<SNOWFLAKE SCHEMA>,
database=<SNOWFLAKE DATABASE>,
staging_bucket_name=<GCS OR S3 BUCKET>,
storage_integration_name=<SNOWFLAKE STORAGE INTEGRATION NAME>,
create_disposition=<CREATE DISPOSITION>,
write_disposition=<WRITE DISPOSITION>,
table_schema=<SNOWFLAKE TABLE SCHEMA>,
user_data_mapper=<USER DATA MAPPER FUNCTION>,
table=<SNOWFLAKE TABLE>,
query=<IF NOT TABLE THEN QUERY>,
role=<SNOWFLAKE ROLE>,
warehouse=<SNOWFLAKE WAREHOUSE>,
expansion_service=<EXPANSION SERVICE ADDRESS>))
必須パラメーター
server_name
アカウント、ゾーン、ドメインを含む完全なSnowflakeサーバー名。schema
使用するデータベース内のSnowflakeスキーマ名。database
使用するSnowflakeデータベース名。staging_bucket_name
スラッシュで終わるGoogle Cloud StorageバケットまたはAWS S3バケットへのパス。バケットは、Snowflakeに保存されるCSVファイルの保存に使用されます。これらのCSVファイルは「staging_bucket_name」パス下に保存されます。storage_integration_name
Snowflakeドキュメントに従って作成されたSnowflakeストレージインテグレーションオブジェクトの名前です。user_data_mapper
書き込み操作がデータを一時的な.csvファイルに保存する前に、PCollectionからのデータを文字列値の配列にマッピングする関数を指定します。例table
またはquery
Snowflakeテーブル名またはカスタムSQLクエリを指定します。
認証パラメーター
認証には、以下の有効なパラメータの組み合わせのいずれかを渡す必要があります。
username
とpassword
ユーザー名/パスワード認証方法を指定します。private_key_path
とprivate_key_passphrase
秘密鍵へのパスとキーペア認証方法のパスフレーズを指定します。raw_private_key
とprivate_key_passphrase
秘密鍵とキーペア認証方法のパスフレーズを指定します。o_auth_token
OAuth認証方法のアクセストークンを指定します。
追加のパラメーター
role
Snowflakeロールを指定します。指定しない場合、ユーザーのデフォルトが使用されます。warehouse
Snowflakeウェアハウス名を指定します。指定しない場合、ユーザーのデフォルトが使用されます。create_disposition
対象テーブルが存在しない場合の書き込み操作の動作を定義します。以下の値がサポートされています。CREATE_IF_NEEDED
- デフォルト動作。書き込み操作は、指定された対象テーブルが存在するかどうかをチェックします。存在しない場合、書き込み操作はテーブルの作成を試みます。対象テーブルのスキーマは、table_schema
パラメータを使用して指定してください。CREATE_NEVER
- 対象テーブルが存在しない場合、書き込み操作は失敗します。
write_disposition
データが書き込まれるテーブルに基づいて書き込み動作を定義します。以下の値がサポートされています。APPEND
- デフォルトの動作。書き込まれたデータは、テーブルの既存の行に追加されます。EMPTY
- ターゲットテーブルは空である必要があります。そうでない場合、書き込み操作は失敗します。TRUNCATE
- 書き込み操作は、書き込む前にターゲットテーブルからすべての行を削除します。
table_schema
create_disposition
パラメータが CREATE_IF_NEEDED に設定されている場合、table_schema
パラメータを使用して、作成される対象テーブルのスキーマを指定できます。テーブルスキーマは、以下の構造を持つJSON配列です。サポートされているすべてのデータ型Snowflakeのデータ型については、Snowflakeデータ型をご覧ください。{"type":"date"}, {"type":"datetime"}, {"type":"time"}, {"type":"timestamp"}, {"type":"timestamp_ltz"}, {"type":"timestamp_ntz"}, {"type":"timestamp_tz"}, {"type":"boolean"}, {"type":"decimal","precision":38,"scale":1}, {"type":"double"}, {"type":"float"}, {"type":"integer","precision":38,"scale":0}, {"type":"number","precision":38,"scale":1}, {"type":"numeric","precision":38,"scale":2}, {"type":"real"}, {"type":"array"}, {"type":"object"}, {"type":"variant"}, {"type":"binary","size":null}, {"type":"char","length":1}, {"type":"string","length":null}, {"type":"text","length":null}, {"type":"varbinary","size":null}, {"type":"varchar","length":100}]
expansion_service
拡張サービスのURLを指定します。
制限事項
SnowflakeIOには現在、以下の制限があります。
ストリーミング書き込みは、キーペア認証のみをサポートしています。詳細については、Issue 21287を参照してください。
SnowflakeIO.DataSourceConfiguration
オブジェクトで設定されたroleパラメータは、ストリーミング書き込みでは無視されます。詳細については、Issue 21365を参照してください。
最終更新日: 2024/10/31
お探しの情報は見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したい点があれば教えてください!