サイド入力パターン

このページのサンプルでは、一般的なBeamサイド入力パターンを示します。サイド入力は、入力PCollection内の要素を処理するたびにDoFnがアクセスできる追加の入力です。詳細については、サイド入力に関するプログラミングガイドセクションを参照してください。

リモートサービスへのキー値ルックアップを実行してデータをエンリッチしようとしている場合は、最初にエンリッチメント変換を検討することをお勧めします。これにより、サイド入力の詳細の一部を抽象化し、クライアント側のスロットリングなどの追加のメリットを提供できます。

ゆっくりと更新されるグローバルウィンドウサイド入力

FixedWindowのように、グローバルウィンドウではないパイプラインジョブでグローバルウィンドウからサイド入力を取得して使用できます。

グローバルウィンドウではないパイプラインでグローバルウィンドウサイド入力をゆっくりと更新するには

  1. バインドされたソースからデータを定期的にグローバルウィンドウにプルするDoFnを作成します。

    a. GenerateSequenceソース変換を使用して、値を定期的に出力します。

    b. 各要素でアクティブになり、バインドされたソースからデータをプルするデータドリブントリガーをインスタンス化します。

    c. トリガーを起動して、データをグローバルウィンドウに渡します。

  2. ダウンストリーム変換のサイド入力を作成します。サイド入力はメモリに収まる必要があります。

グローバルウィンドウサイド入力は処理時間でトリガーされるため、メインパイプラインはイベント時間内の要素に非決定的にサイド入力を一致させます。

たとえば、次のコードサンプルでは、Mapを使用してDoFnを作成します。Mapは、カウンターティックごとに再構築されるView.asSingletonサイド入力になります。サイド入力は、ワークフローを示すために5秒ごとに更新されます。実際のシナリオでは、サイド入力は通常、数時間ごとまたは1日に1回更新されます。

  public static void sideInputPatterns() {
    // This pipeline uses View.asSingleton for a placeholder external service.
    // Run in debug mode to see the output.
    Pipeline p = Pipeline.create();

    // Create a side input that updates every 5 seconds.
    // View as an iterable, not singleton, so that if we happen to trigger more
    // than once before Latest.globally is computed we can handle both elements.
    PCollectionView<Iterable<Map<String, String>>> mapIterable =
        p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
            .apply(
                ParDo.of(
                    new DoFn<Long, Map<String, String>>() {

                      @ProcessElement
                      public void process(
                          @Element Long input,
                          @Timestamp Instant timestamp,
                          OutputReceiver<Map<String, String>> o) {
                        // Replace map with test data from the placeholder external service.
                        // Add external reads here.
                        o.output(PlaceholderExternalService.readTestData(timestamp));
                      }
                    }))
            .apply(
                Window.<Map<String, String>>into(new GlobalWindows())
                    .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
                    .discardingFiredPanes())
            .apply(Latest.globally())
            .apply(View.asIterable());

    // Consume side input. GenerateSequence generates test data.
    // Use a real source (like PubSubIO or KafkaIO) in production.
    p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
        .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
        .apply(Sum.longsGlobally().withoutDefaults())
        .apply(
            ParDo.of(
                    new DoFn<Long, KV<Long, Long>>() {

                      @ProcessElement
                      public void process(ProcessContext c, @Timestamp Instant timestamp) {
                        Iterable<Map<String, String>> si = c.sideInput(mapIterable);
                        // Take an element from the side input iterable (likely length 1)
                        Map<String, String> keyMap = si.iterator().next();
                        c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

                        LOG.info(
                            "Value is {} with timestamp {}, using key A from side input with time {}.",
                            c.element(),
                            timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")),
                            keyMap.get("Key_A"));
                      }
                    })
                .withSideInputs(mapIterable));

    p.run();
  }

  /** Placeholder class that represents an external service generating test data. */
  public static class PlaceholderExternalService {

    public static Map<String, String> readTestData(Instant timestamp) {

      Map<String, String> map = new HashMap<>();

      map.put("Key_A", timestamp.toString(DateTimeFormat.forPattern("HH:mm:ss")));

      return map;
    }
  }
No sample present.

ウィンドウ処理を使用した、ゆっくりと更新されるサイド入力

サイド入力データを定期的に個別のPCollectionウィンドウに読み込むことができます。サイド入力をメイン入力に適用すると、各メイン入力ウィンドウは自動的に1つのサイド入力ウィンドウに一致します。これにより、単一ウィンドウの期間の一貫性が保証されます。つまり、メイン入力の各ウィンドウは、サイド入力データの1つのバージョンに一致します。

サイド入力データを定期的に個別のPCollectionウィンドウに読み込むには

  1. PeriodicImpulseまたはPeriodicSequence PTransformを使用して、
    • 必要な処理時間間隔で要素の無限シーケンスを生成します。
    • それらを個別のウィンドウに割り当てます。
  2. PCollection要素の到着によってトリガーされるSDF ReadまたはReadAll PTransformを使用してデータをフェッチします。
  3. サイド入力を適用します。
PCollectionView<List<Long>> sideInput =
    p.apply(
            "SIImpulse",
            PeriodicImpulse.create()
                .startAt(startAt)
                .stopAt(stopAt)
                .withInterval(interval1)
                .applyWindowing())
        .apply(
            "FileToRead",
            ParDo.of(
                new DoFn<Instant, String>() {
                  @DoFn.ProcessElement
                  public void process(@Element Instant notUsed, OutputReceiver<String> o) {
                    o.output(fileToRead);
                  }
                }))
        .apply(FileIO.matchAll())
        .apply(FileIO.readMatches())
        .apply(TextIO.readFiles())
        .apply(
            ParDo.of(
                new DoFn<String, String>() {
                  @ProcessElement
                  public void process(@Element String src, OutputReceiver<String> o) {
                    o.output(src);
                  }
                }))
        .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults())
        .apply(View.asList());

PCollection<Instant> mainInput =
    p.apply(
        "MIImpulse",
        PeriodicImpulse.create()
            .startAt(startAt.minus(Duration.standardSeconds(1)))
            .stopAt(stopAt.minus(Duration.standardSeconds(1)))
            .withInterval(interval2)
            .applyWindowing());

// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
PCollection<Long> result =
    mainInput.apply(
        "generateOutput",
        ParDo.of(
                new DoFn<Instant, Long>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    c.output((long) c.sideInput(sideInput).size());
                  }
                })
            .withSideInputs(sideInput));
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window

# from apache_beam.utils.timestamp import MAX_TIMESTAMP
# last_timestamp = MAX_TIMESTAMP to go on indefninitely

# Any user-defined function.
# cross join is used as an example.
def cross_join(left, rights):
  for x in rights:
    yield (left, x)

# Create pipeline.
pipeline = beam.Pipeline()
side_input = (
    pipeline
    | 'PeriodicImpulse' >> PeriodicImpulse(
        first_timestamp, last_timestamp, interval, True)
    | 'MapToFileName' >> beam.Map(lambda x: src_file_pattern + str(x))
    | 'ReadFromFile' >> beam.io.ReadAllFromText())

main_input = (
    pipeline
    | 'MpImpulse' >> beam.Create(sample_main_input_elements)
    |
    'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
    | 'WindowMpInto' >> beam.WindowInto(
        window.FixedWindows(main_input_windowing_interval)))

result = (
    main_input
    | 'ApplyCrossJoin' >> beam.FlatMap(
        cross_join, rights=beam.pvalue.AsIter(side_input)))