Apache Beam を試す
インタラクティブなノートブックを使用して、Apache Beam パイプラインを試すことができます。
- Java SDK
- Python SDK
- Go SDK
Colab でインタラクティブな WordCount
このインタラクティブなノートブックは、単純で最小限の WordCount のバージョンがどのようなものかを示しています。
package samples.quickstart;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read lines", TextIO.read().from(inputsDir))
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Count words", Count.perElement())
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to(outputsPrefix));
pipeline.run();
}
}
自分のコンピュータに Apache Beam Java SDK をインストールして実行する方法については、Java クイックスタートの手順に従ってください。
import apache_beam as beam
import re
inputs_pattern = 'data/*'
outputs_prefix = 'outputs/part'
with beam.Pipeline() as pipeline:
(
pipeline
| 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
| 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
| 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
| 'Group and sum' >> beam.CombinePerKey(sum)
| 'Format results' >> beam.Map(lambda word_count: str(word_count))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
自分のコンピュータに Apache Beam Python SDK をインストールして実行する方法については、Python クイックスタートの手順に従ってください。
package main
import (
"context"
"flag"
"fmt"
"regexp"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
)
var (
input = flag.String("input", "data/*", "File(s) to read.")
output = flag.String("output", "outputs/wordcounts.txt", "Output filename.")
)
var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
func main() {
flag.Parse()
beam.Init()
pipeline := beam.NewPipeline()
root := pipeline.Root()
lines := textio.Read(root, *input)
words := beam.ParDo(root, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)
counted := stats.Count(root, words)
formatted := beam.ParDo(root, func(word string, count int) string {
return fmt.Sprintf("%s: %v", word, count)
}, counted)
textio.Write(root, *output, formatted)
direct.Execute(context.Background(), pipeline)
}
自分のコンピュータに Apache Beam Go SDK をインストールして実行する方法については、Go クイックスタートの手順に従ってください。
WordCount がどのように機能するかについての詳細な説明については、WordCount の例のチュートリアルを参照してください。
次のステップ
- WordCount の例のチュートリアルで、追加の WordCount の例を順を追って見ていきましょう。
- 学習リソースで、自分のペースで学習を進めましょう。
- お気に入りの動画とポッドキャストをご覧ください。
- Beam users@ メーリングリストに参加してください。
- Apache Beam のコードベースへの貢献に関心がある場合は、貢献ガイドを参照してください。
何か問題が発生した場合は、お気軽にお問い合わせください。
最終更新日: 2024/10/31
お探しの情報はすべて見つかりましたか?
すべてが役立ち、明確でしたか?何か変更したいことはありますか?お知らせください!