Apache Sparkからの入門

既にApache Sparkを知っている場合は、Beamの使用は簡単です。基本的な概念は同じであり、APIも似ています。

Sparkは、構造化データについてはSpark DataFramesに、非構造化データについてはResilient Distributed Datasets(RDD)にデータを格納します。このガイドではRDDを使用しています。

SparkのRDDは要素のコレクションを表しますが、BeamではParallel Collection(PCollection)と呼ばれます。BeamのPCollectionには、順序の保証はありません

同様に、Beamの変換はParallel Transform(PTransform)と呼ばれます。

PySparkとBeamの間で共通の操作とその同等のものをいくつか紹介します。

概要

1から4までの数値を取り、2倍して、すべての値を合計し、結果を出力するPySparkパイプラインの簡単な例を次に示します。

import pyspark

sc = pyspark.SparkContext()
result = (
    sc.parallelize([1, 2, 3, 4])
    .map(lambda x: x * 2)
    .reduce(lambda x, y: x + y)
)
print(result)

Beamでは、data.map(...)のようなメソッドの連結ではなく、data | beam.Map(...)のようなパイプ演算子|を使用してデータをパイプラインに送ります。しかし、それらは同じことを行っています。

Beamの同等のパイプラインを次に示します。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | beam.Create([1, 2, 3, 4])
        | beam.Map(lambda x: x * 2)
        | beam.CombineGlobally(sum)
        | beam.Map(print)
    )

ℹ️ Map変換内でprintを呼び出したことに注意してください。これは、PCollectionの要素にはPTransform内からしかアクセスできないためです。データをローカルで検査するには、InteractiveRunnerを使用できます。

もう1つ注意すべき点は、Beamパイプラインは遅延評価されることです。つまり、|でデータをパイプラインに送ると、変換とその実行順序を宣言するだけで、実際の計算は行われません。パイプラインはwith beam.Pipeline() as pipelineコンテキストが閉じられた後に実行されます。

ℹ️ with beam.Pipeline() as pipelineコンテキストが閉じると、暗黙的にpipeline.run()が呼び出され、計算が開始されます。

次に、パイプラインは選択したランナーに送信され、データが処理されます。

ℹ️ パイプラインは、DirectRunnerを使用してローカルで実行することも、Flink、Spark、Dataflowなどの分散ランナーで実行することもできます。SparkランナーはPySparkとは関係ありません。

右シフト演算子>>を使用して、data | 'My description' >> beam.Map(...)のように、変換にラベルをオプションで追加できます。これはコメントとして機能し、パイプラインのデバッグを容易にします。

ラベルを追加した後のパイプラインを次に示します。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    result = (
        pipeline
        | 'Create numbers' >> beam.Create([1, 2, 3, 4])
        | 'Multiply by two' >> beam.Map(lambda x: x * 2)
        | 'Sum everything' >> beam.CombineGlobally(sum)
        | 'Print results' >> beam.Map(print)
    )

設定

PySparkとBeamの両方の入門方法の比較を次に示します。

PySparkBeam
インストール$ pip install pyspark$ pip install apache-beam
インポートimport pysparkimport apache_beam as beam
作成
ローカルパイプライン
sc = pyspark.SparkContext() as sc
# パイプラインコードをここに記述します。
with beam.Pipeline() as pipeline
    # パイプラインコードをここに記述します。
値の作成values = sc.parallelize([1, 2, 3, 4])values = pipeline | beam.Create([1, 2, 3, 4])
作成
キーと値のペア
pairs = sc.parallelize([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
pairs = pipeline | beam.Create([
    ('key1', 'value1'),
    ('key2', 'value2'),
    ('key3', 'value3'),
])
パイプラインの実行
ローカルパイプライン
$ spark-submit spark_pipeline.py$ python beam_pipeline.py

変換

PySparkとBeamの両方におけるいくつかの一般的な変換の同等物を次に示します。

PySparkBeam
Mapvalues.map(lambda x: x * 2)values | beam.Map(lambda x: x * 2)
Filtervalues.filter(lambda x: x % 2 == 0)values | beam.Filter(lambda x: x % 2 == 0)
FlatMapvalues.flatMap(lambda x: range(x))values | beam.FlatMap(lambda x: range(x))
キーによるグループ化pairs.groupByKey()pairs | beam.GroupByKey()
Reducevalues.reduce(lambda x, y: x+y)values | beam.CombineGlobally(sum)
キーによるReducepairs.reduceByKey(lambda x, y: x+y)pairs | beam.CombinePerKey(sum)
Distinctvalues.distinct()values | beam.Distinct()
Countvalues.count()values | beam.combiners.Count.Globally()
キーによるCountpairs.countByKey()pairs | beam.combiners.Count.PerKey()
最小値の取得values.takeOrdered(3)values | beam.combiners.Top.Smallest(3)
最大値の取得values.takeOrdered(3, lambda x: -x)values | beam.combiners.Top.Largest(3)
ランダムサンプリングvalues.takeSample(False, 3)values | beam.combiners.Sample.FixedSizeGlobally(3)
Unionvalues.union(otherValues)(values, otherValues) | beam.Flatten()
Co-grouppairs.cogroup(otherPairs){'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey()

ℹ️ Beamで使用可能な変換の詳細については、Python変換ギャラリーを参照してください。

計算された値の使用

潜在的に分散環境で作業しているため、計算した結果が特定のマシンで利用可能であることを保証できません。

PySparkでは、data.collect()、またはreduce()count()などその他の集約を使用して、要素のコレクション(RDD)から結果を取得できます。

数値を0から1の範囲にスケールする例を次に示します。

import pyspark

sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)

# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))

# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())

Beamでは、すべての変換の結果はPCollectionになります。変換にPCollectionを入力し、その値にアクセスするには、サイド入力を使用します。

Mapなど、関数を受け入れる変換は、サイド入力を受け入れることができます。単一の値しか必要ない場合は、beam.pvalue.AsSingletonを使用してPython値としてアクセスできます。複数の値が必要な場合は、beam.pvalue.AsIterを使用してiterableとしてアクセスできます。

import apache_beam as beam

with beam.Pipeline() as pipeline:
    values = pipeline | beam.Create([1, 2, 3, 4])
    min_value = values | beam.CombineGlobally(min)
    max_value = values | beam.CombineGlobally(max)

    # To access `min_value` and `max_value`, we need to pass them as a side input.
    scaled_values = values | beam.Map(
        lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
        minimum=beam.pvalue.AsSingleton(min_value),
        maximum=beam.pvalue.AsSingleton(max_value),
    )

    scaled_values | beam.Map(print)

ℹ️ Beamでは、サイド入力を明示的に渡す必要がありますが、リダクションまたは集約をメモリに収める必要がないという利点があります。サイド入力を遅延評価することで、個別のリダクションごとにvaluesを計算する必要がなくなり(またはRDDの明示的なキャッシングが必要なくなります)。

次のステップ

問題が発生した場合は、お気軽にお問い合わせください