Apache Sparkからの入門
既にApache Sparkを知っている場合は、Beamの使用は簡単です。基本的な概念は同じであり、APIも似ています。
Sparkは、構造化データについてはSpark DataFramesに、非構造化データについてはResilient Distributed Datasets(RDD)にデータを格納します。このガイドではRDDを使用しています。
SparkのRDDは要素のコレクションを表しますが、BeamではParallel Collection(PCollection)と呼ばれます。BeamのPCollectionには、順序の保証はありません。
同様に、Beamの変換はParallel Transform(PTransform)と呼ばれます。
PySparkとBeamの間で共通の操作とその同等のものをいくつか紹介します。
概要
1から4までの数値を取り、2倍して、すべての値を合計し、結果を出力するPySparkパイプラインの簡単な例を次に示します。
Beamでは、data.map(...)
のようなメソッドの連結ではなく、data | beam.Map(...)
のようなパイプ演算子|
を使用してデータをパイプラインに送ります。しかし、それらは同じことを行っています。
Beamの同等のパイプラインを次に示します。
ℹ️
Map
変換内で
もう1つ注意すべき点は、Beamパイプラインは遅延評価されることです。つまり、|
でデータをパイプラインに送ると、変換とその実行順序を宣言するだけで、実際の計算は行われません。パイプラインはwith beam.Pipeline() as pipeline
コンテキストが閉じられた後に実行されます。
ℹ️
with beam.Pipeline() as pipeline
コンテキストが閉じると、暗黙的にpipeline.run()
が呼び出され、計算が開始されます。
次に、パイプラインは選択したランナーに送信され、データが処理されます。
ℹ️ パイプラインは、DirectRunnerを使用してローカルで実行することも、Flink、Spark、Dataflowなどの分散ランナーで実行することもできます。SparkランナーはPySparkとは関係ありません。
右シフト演算子>>
を使用して、data | 'My description' >> beam.Map(...)
のように、変換にラベルをオプションで追加できます。これはコメントとして機能し、パイプラインのデバッグを容易にします。
ラベルを追加した後のパイプラインを次に示します。
設定
PySparkとBeamの両方の入門方法の比較を次に示します。
PySpark | Beam | |
---|---|---|
インストール | $ pip install pyspark | $ pip install apache-beam |
インポート | import pyspark | import apache_beam as beam |
作成 ローカルパイプライン | sc = pyspark.SparkContext() as sc # パイプラインコードをここに記述します。 | with beam.Pipeline() as pipeline # パイプラインコードをここに記述します。 |
値の作成 | values = sc.parallelize([1, 2, 3, 4]) | values = pipeline | beam.Create([1, 2, 3, 4]) |
作成 キーと値のペア | pairs = sc.parallelize([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) | pairs = pipeline | beam.Create([ ('key1', 'value1'), ('key2', 'value2'), ('key3', 'value3'), ]) |
パイプラインの実行 ローカルパイプライン | $ spark-submit spark_pipeline.py | $ python beam_pipeline.py |
変換
PySparkとBeamの両方におけるいくつかの一般的な変換の同等物を次に示します。
PySpark | Beam | |
---|---|---|
Map | values.map(lambda x: x * 2) | values | beam.Map(lambda x: x * 2) |
Filter | values.filter(lambda x: x % 2 == 0) | values | beam.Filter(lambda x: x % 2 == 0) |
FlatMap | values.flatMap(lambda x: range(x)) | values | beam.FlatMap(lambda x: range(x)) |
キーによるグループ化 | pairs.groupByKey() | pairs | beam.GroupByKey() |
Reduce | values.reduce(lambda x, y: x+y) | values | beam.CombineGlobally(sum) |
キーによるReduce | pairs.reduceByKey(lambda x, y: x+y) | pairs | beam.CombinePerKey(sum) |
Distinct | values.distinct() | values | beam.Distinct() |
Count | values.count() | values | beam.combiners.Count.Globally() |
キーによるCount | pairs.countByKey() | pairs | beam.combiners.Count.PerKey() |
最小値の取得 | values.takeOrdered(3) | values | beam.combiners.Top.Smallest(3) |
最大値の取得 | values.takeOrdered(3, lambda x: -x) | values | beam.combiners.Top.Largest(3) |
ランダムサンプリング | values.takeSample(False, 3) | values | beam.combiners.Sample.FixedSizeGlobally(3) |
Union | values.union(otherValues) | (values, otherValues) | beam.Flatten() |
Co-group | pairs.cogroup(otherPairs) | {'Xs': pairs, 'Ys': otherPairs} | beam.CoGroupByKey() |
ℹ️ Beamで使用可能な変換の詳細については、Python変換ギャラリーを参照してください。
計算された値の使用
潜在的に分散環境で作業しているため、計算した結果が特定のマシンで利用可能であることを保証できません。
PySparkでは、data.collect()
、またはreduce()
、count()
などその他の集約を使用して、要素のコレクション(RDD)から結果を取得できます。
数値を0から1の範囲にスケールする例を次に示します。
import pyspark
sc = pyspark.SparkContext()
values = sc.parallelize([1, 2, 3, 4])
min_value = values.reduce(min)
max_value = values.reduce(max)
# We can simply use `min_value` and `max_value` since it's already a Python `int` value from `reduce`.
scaled_values = values.map(lambda x: (x - min_value) / (max_value - min_value))
# But to access `scaled_values`, we need to call `collect`.
print(scaled_values.collect())
Beamでは、すべての変換の結果はPCollectionになります。変換にPCollectionを入力し、その値にアクセスするには、サイド入力を使用します。
Map
など、関数を受け入れる変換は、サイド入力を受け入れることができます。単一の値しか必要ない場合は、beam.pvalue.AsSingleton
を使用してPython値としてアクセスできます。複数の値が必要な場合は、beam.pvalue.AsIter
を使用してiterable
としてアクセスできます。
import apache_beam as beam
with beam.Pipeline() as pipeline:
values = pipeline | beam.Create([1, 2, 3, 4])
min_value = values | beam.CombineGlobally(min)
max_value = values | beam.CombineGlobally(max)
# To access `min_value` and `max_value`, we need to pass them as a side input.
scaled_values = values | beam.Map(
lambda x, minimum, maximum: (x - minimum) / (maximum - minimum),
minimum=beam.pvalue.AsSingleton(min_value),
maximum=beam.pvalue.AsSingleton(max_value),
)
scaled_values | beam.Map(print)
ℹ️ Beamでは、サイド入力を明示的に渡す必要がありますが、リダクションまたは集約をメモリに収める必要がないという利点があります。サイド入力を遅延評価することで、個別のリダクションごとに
values
を計算する必要がなくなり(またはRDDの明示的なキャッシングが必要なくなります)。
次のステップ
- Python変換ギャラリーで利用可能なすべての変換を確認してください。
- プログラミングガイドのPipeline I/Oセクションで、ファイルの読み取りと書き込みについて説明しています。
- WordCountの例の詳細な説明で、追加のWordCountの例を参照してください。
- 学習リソースで、自己学習ペースのツアーに参加してください。
- お気に入りのビデオとポッドキャストをいくつか見てください。
- Beamのusers@メーリングリストに参加してください。
- Apache Beamのコードベースへの貢献に関心のある方は、貢献ガイドを参照してください。
問題が発生した場合は、お気軽にお問い合わせください!
最終更新日: 2024/10/31
探していたものが見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!