パイプラインオプションパターン
このページのサンプルは、一般的なパイプライン構成を示しています。パイプライン構成オプションの詳細については、パイプラインの作成とパイプラインオプションの構成を参照してください。
- Java SDK
- Python SDK
実行時パラメータの遡及的なログ記録
ValueProvider
インターフェースを使用して、パイプラインジョブの完了後に実行時パラメータにアクセスします。
ValueProvider
インターフェースを使用して実行時パラメータをパイプラインに渡すことができますが、Beam DAG内からのみパラメータをログに記録できます。解決策としては、プレースホルダー値を処理してから実行時パラメータをログに記録するDoFn
を使用して、パイプラインブランチを追加することです。
/** Sample of PipelineOptions with a ValueProvider option argument. */
public interface MyOptions extends PipelineOptions {
@Description("My option")
@Default.String("Hello world!")
ValueProvider<String> getStringValue();
void setStringValue(ValueProvider<String> value);
}
public static void accessingValueProviderInfoAfterRunSnip1(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
// Create pipeline.
Pipeline p = Pipeline.create(options);
// Add a branch for logging the ValueProvider value.
p.apply(Create.of(1))
.apply(
ParDo.of(
new DoFn<Integer, Integer>() {
// Define the DoFn that logs the ValueProvider value.
@ProcessElement
public void process(ProcessContext c) {
MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
// This example logs the ValueProvider value, but you could store it by
// pushing it to an external database.
LOG.info("Option StringValue was {}", ops.getStringValue());
}
}));
// The main pipeline.
p.apply(Create.of(1, 2, 3, 4)).apply(Sum.integersGlobally());
p.run();
}
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--string_value', type=str)
class LogValueProvidersFn(beam.DoFn):
def __init__(self, string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value, but
# you could store it by pushing it to an external database.
def process(self, an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value', str, ''))
beam_options = PipelineOptions()
args = beam_options.view_as(MyOptions)
# Create pipeline.
with beam.Pipeline(options=beam_options) as pipeline:
# Add a branch for logging the ValueProvider value.
_ = (
pipeline
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(LogValueProvidersFn(args.string_value)))
# The main pipeline.
result_pc = (
pipeline
| "main_pc" >> beam.Create([1, 2, 3])
| beam.combiners.Sum.Globally())
最終更新日:2024年10月31日
探していたものが見つかりましたか?
すべて役立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!