データ探索

Apache Beamの様々なデータ処理は、AI/MLプロジェクトに適用できます。

データ処理は、主に2つのトピックに分類できます。この例では、まずデータ探索、次にデータ前処理と検証の両方を使用するMLのデータパイプラインを調べます。データ後処理は、前処理と似ているため、扱いません。後処理は、パイプラインの順序と種類だけが異なります。

初期データ探索

Pandas は、データ探索を実行するための一般的なツールです。Pandasは、Python向けのデータ分析および操作ツールです。2次元表形式のデータを含むデータ構造であるDataFrameを使用し、データに対してラベル付きの行と列を提供します。Apache Beam Python SDKは、PandasライクなDataFrameオブジェクトを操作するためのDataFrame APIを提供します。

Beam DataFrame APIは、Apache Beamパイプライン内で使い慣れたプログラミングインターフェースにアクセスすることを目的としています。このAPIを使用すると、データ探索を実行できます。データ前処理パイプラインのコードを再利用できます。DataFrame APIを使用することで、標準的なPandasコマンドを呼び出すことで、複雑なデータ処理パイプラインを構築できます。

Beamインタラクティブランナーと組み合わせてJupyterLabノートブックでDataFrame APIを使用できます。ノートブックを使用して、パイプラインを反復的に開発し、個々のパイプラインステップの結果を表示します。

以下は、ノートブックでのApache Beamにおけるデータ探索の例です。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

p = beam.Pipeline(InteractiveRunner())
beam_df = p | beam.dataframe.io.read_csv(input_path)

# Investigate columns and data types
beam_df.dtypes

# Generate descriptive statistics
ib.collect(beam_df.describe())

# Investigate missing values
ib.collect(beam_df.isnull())

AI/MLプロジェクトでApache BeamとDataFrame APIを使用してデータ探索とデータ前処理を実装する完全なエンドツーエンドの例については、Beam DataFrame API AI/MLチュートリアルを参照してください。

ML用データパイプライン

一般的なデータ前処理パイプラインは、次の手順で構成されています。

  1. データの読み取りと書き込み:ファイルシステム、データベース、またはメッセージキューからデータを読み取り、書き込みます。Apache Beamには、データの取り込みと書き込みのための豊富なIOコネクタがあります。
  2. データクレンジング:MLモデルで使用前にデータをフィルタリングおよびクレンジングします。重複データや無関係なデータを削除したり、データセットの誤りを修正したり、不要な外れ値をフィルタリングしたり、欠損データに対処したりする必要がある場合があります。
  3. データ変換:データは、モデルのトレーニングに必要な期待される入力に適合する必要があります。データを正規化、one-hotエンコーディング、スケーリング、またはベクトル化する必要がある場合があります。
  4. データエンリッチメント:データを外部データソースで拡張して、データの意味を明確にしたり、MLモデルが解釈しやすくしたりすることができます。たとえば、都市名や住所を座標のセットに変換することができます。
  5. データ検証とメトリクス:データが、パイプラインで検証できる特定の要件に準拠していることを確認します。クラス分布など、データからのメトリクスを報告します。

Apache Beamパイプラインを使用して、これらの手順すべてを実装できます。この例では、前述のすべてのステップを示すパイプラインを示します。

import apache_beam as beam
from apache_beam.metrics import Metrics

with beam.Pipeline() as pipeline:
  # Create data
  input_data = (
      pipeline
      | beam.Create([
         {'age': 25, 'height': 176, 'weight': 60, 'city': 'London'},
         {'age': 61, 'height': 192, 'weight': 95, 'city': 'Brussels'},
         {'age': 48, 'height': 163, 'weight': None, 'city': 'Berlin'}]))

  # Clean data
  def filter_missing_data(row):
    return row['weight'] is not None

  cleaned_data = input_data | beam.Filter(filter_missing_data)

  # Transform data
  def scale_min_max_data(row):
    row['age'] = (row['age']/100)
    row['height'] = (row['height']-150)/50
    row['weight'] = (row['weight']-50)/50
    yield row

  transformed_data = cleaned_data | beam.FlatMap(scale_min_max_data)

  # Enrich data
  side_input = pipeline | beam.io.ReadFromText('coordinates.csv')
  def coordinates_lookup(row, coordinates):
    row['coordinates'] = coordinates.get(row['city'], (0, 0))
    del row['city']
    yield row

  enriched_data = (
      transformed_data
      | beam.FlatMap(coordinates_lookup, coordinates=beam.pvalue.AsDict(side_input)))

  # Metrics
  counter = Metrics.counter('main', 'counter')

  def count_data(row):
    counter.inc()
    yield row

  output_data = enriched_data | beam.FlatMap(count_data)

  # Write data
  output_data | beam.io.WriteToText('output.csv')