ファイル処理パターン

このページでは、一般的なファイル処理タスクについて説明します。ファイルベースのI/Oの詳細については、パイプラインI/Oおよびファイルベースの入出力データを参照してください。

ファイル到着時の処理

このセクションでは、ファイルシステムまたはオブジェクトストア(Google Cloud Storageなど)にファイルが到着したときにファイルを処理する方法について説明します。新しいファイルが到着したときに、継続的にファイルを読み取るか、ストリームおよび処理パイプラインをトリガーできます。

継続読み取りモード

FileIOまたはTextIOを使用して、新しいファイルのソースを継続的に読み取ることができます。

FileIOクラスを使用して、単一のファイルパターンを継続的に監視します。次の例では、30秒ごとにファイルパターンを繰り返し照合し、新しい一致ファイルを無制限のPCollection<Metadata>として継続的に返し、1時間新しいファイルが表示されない場合は停止します。

// This produces PCollection<MatchResult.Metadata>
p.apply(
    FileIO.match()
        .filepattern("...")
        .continuously(
            Duration.standardSeconds(30),
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

TextIOクラスのwatchForNewFilesプロパティは、新しいファイルの一致をストリーミングします。

// This produces PCollection<String>
p.apply(
    TextIO.read()
        .from("<path-to-files>/*")
        .watchForNewFiles(
            // Check for new files every minute.
            Duration.standardMinutes(1),
            // Stop watching the file pattern if no new files appear for an hour.
            Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))));

一部のランナーは更新中にファイルリストを保持する場合がありますが、パイプラインを再起動するとファイルリストは保持されません。ファイルリストを保存するには、次のいずれかの方法を使用できます。

連続読み取りオプションはPythonでは使用できません。

外部ソースからトリガーされるストリーム処理

ストリーミングパイプラインは、無制限のソースからのデータを処理できます。たとえば、Google Cloud Pub/Subでストリーム処理をトリガーするには、次のようにします。

  1. 外部プロセスを使用して、新しいファイルが到着したことを検出します。
  2. ファイルへのURIを含むGoogle Cloud Pub/Subメッセージを送信します。
  3. Google Cloud Pub/Subソースに続くDoFnからURIにアクセスします。
  4. ファイルを処理します。

外部ソースからトリガーされるバッチ処理

ファイルが到着したときにバッチパイプラインジョブを開始またはスケジュールするには、トリガーイベントをソースファイル自体に書き込みます。パイプラインは処理前に初期化する必要があるため、これが最も遅延が大きくなります。低頻度の大きなファイルサイズの更新に最適です。

ファイル名へのアクセス

FileIOクラスを使用して、パイプラインジョブでファイル名を読み取ります。FileIOPCollection<ReadableFile>オブジェクトを返し、ReadableFileインスタンスにはファイル名が含まれます。

ファイル名にアクセスするには

  1. FileIOReadableFileインスタンスを作成します。FileIOPCollection<ReadableFile>オブジェクトを返します。ReadableFileクラスにはファイル名が含まれます。
  2. readFullyAsUTF8String()メソッドを呼び出して、ファイルをメモリに読み込み、ファイル名をStringオブジェクトとして返します。メモリが限られている場合は、FileSystemsなどのユーティリティクラスを使用して、ファイルを直接操作できます。

パイプラインジョブでファイル名を読み取るには

  1. ファイルURIのリストを収集します。FileSystemsモジュールを使用して、グロブパターンに一致するファイルのリストを取得できます。
  2. ファイルURIをPCollectionに渡します。

p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
    // The withCompression method is optional. By default, the Beam SDK detects compression from
    // the filename.
    .apply(FileIO.readMatches().withCompression(Compression.GZIP))
    .apply(
        ParDo.of(
            new DoFn<FileIO.ReadableFile, String>() {
              @ProcessElement
              public void process(@Element FileIO.ReadableFile file) {
                // We can now access the file and its metadata.
                LOG.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
              }
            }));
with beam.Pipeline() as pipeline:
  readable_files = (
      pipeline
      | fileio.MatchFiles('hdfs://path/to/*.txt')
      | fileio.ReadMatches()
      | beam.Reshuffle())
  files_and_contents = (
      readable_files
      | beam.Map(lambda x: (x.metadata.path, x.read_utf8())))