ファイル処理パターン
このページでは、一般的なファイル処理タスクについて説明します。ファイルベースのI/Oの詳細については、パイプラインI/Oおよびファイルベースの入出力データを参照してください。
- Java SDK
- Python SDK
ファイル到着時の処理
このセクションでは、ファイルシステムまたはオブジェクトストア(Google Cloud Storageなど)にファイルが到着したときにファイルを処理する方法について説明します。新しいファイルが到着したときに、継続的にファイルを読み取るか、ストリームおよび処理パイプラインをトリガーできます。
継続読み取りモード
FileIO
またはTextIO
を使用して、新しいファイルのソースを継続的に読み取ることができます。
FileIO
クラスを使用して、単一のファイルパターンを継続的に監視します。次の例では、30秒ごとにファイルパターンを繰り返し照合し、新しい一致ファイルを無制限のPCollection<Metadata>
として継続的に返し、1時間新しいファイルが表示されない場合は停止します。
TextIO
クラスのwatchForNewFiles
プロパティは、新しいファイルの一致をストリーミングします。
// This produces PCollection<String>
p.apply(
TextIO.read()
.from("<path-to-files>/*")
.watchForNewFiles(
// Check for new files every minute.
Duration.standardMinutes(1),
// Stop watching the file pattern if no new files appear for an hour.
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));
一部のランナーは更新中にファイルリストを保持する場合がありますが、パイプラインを再起動するとファイルリストは保持されません。ファイルリストを保存するには、次のいずれかの方法を使用できます。
- 処理済みのファイル名を外部ファイルに保存し、次の変換でリストを重複排除する
- ファイル名にタイムスタンプを追加し、新しいファイルのみを取り込むグロブパターンを書き込み、パイプラインが再起動したときにパターンを照合する
連続読み取りオプションはPythonでは使用できません。
外部ソースからトリガーされるストリーム処理
ストリーミングパイプラインは、無制限のソースからのデータを処理できます。たとえば、Google Cloud Pub/Subでストリーム処理をトリガーするには、次のようにします。
- 外部プロセスを使用して、新しいファイルが到着したことを検出します。
- ファイルへのURIを含むGoogle Cloud Pub/Subメッセージを送信します。
- Google Cloud Pub/Subソースに続く
DoFn
からURIにアクセスします。 - ファイルを処理します。
外部ソースからトリガーされるバッチ処理
ファイルが到着したときにバッチパイプラインジョブを開始またはスケジュールするには、トリガーイベントをソースファイル自体に書き込みます。パイプラインは処理前に初期化する必要があるため、これが最も遅延が大きくなります。低頻度の大きなファイルサイズの更新に最適です。
ファイル名へのアクセス
FileIO
クラスを使用して、パイプラインジョブでファイル名を読み取ります。FileIO
はPCollection<ReadableFile>
オブジェクトを返し、ReadableFile
インスタンスにはファイル名が含まれます。
ファイル名にアクセスするには
FileIO
でReadableFile
インスタンスを作成します。FileIO
はPCollection<ReadableFile>
オブジェクトを返します。ReadableFile
クラスにはファイル名が含まれます。readFullyAsUTF8String()
メソッドを呼び出して、ファイルをメモリに読み込み、ファイル名をString
オブジェクトとして返します。メモリが限られている場合は、FileSystems
などのユーティリティクラスを使用して、ファイルを直接操作できます。
パイプラインジョブでファイル名を読み取るには
- ファイルURIのリストを収集します。
FileSystems
モジュールを使用して、グロブパターンに一致するファイルのリストを取得できます。 - ファイルURIを
PCollection
に渡します。
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// The withCompression method is optional. By default, the Beam SDK detects compression from
// the filename.
.apply(FileIO.readMatches().withCompression(Compression.GZIP))
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We can now access the file and its metadata.
LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
}
}));
最終更新日: 2024/10/31
お探しのものはすべて見つかりましたか?
すべて役立ち、明確でしたか?何か変更したいことはありますか?ぜひお聞かせください!