データ処理のためのMLTransform

Pydoc Pydoc




キー付きデータに一般的な機械学習(ML)処理タスクを適用するには、MLTransformを使用します。Apache Beamは、MLTransformで使用できるMLデータ処理変換を提供します。使用可能なデータ処理変換の完全なリストについては、GitHubのtft.pyファイルを参照してください。

MLTransformを使用してデータ処理変換を定義するには、入力パラメーターとしてcolumnsを使用してデータ処理変換のインスタンスを作成します。指定されたcolumnsのデータは変換され、beam.Rowオブジェクトに出力されます。

次の例は、MLTransformを使用して、データセット全体の最小値と最大値を使用してデータを0から1に正規化する方法を示しています。MLTransformScaleTo01変換を使用します。

scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
  (data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))

この例では、MLTransformwrite_artifact_locationの値を受け取ります。MLTransformはこの場所の値を使用して、変換によって生成されたアーティファクトを書き込みます。データ処理変換を渡すには、MLTransformwith_transformメソッドまたはリストを使用できます。

MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)

MLTransformに渡された変換は、データセットに対して順次適用されます。MLTransformは辞書を期待し、NumPy配列を含む変換された行オブジェクトを返します。

次の例は、MLTransformを使用してデータの前処理を行うパイプラインを作成する方法を示しています。

MLTransformはデータセット全体を完全に処理できます。これは、データセット全体を分析した後にのみ単一の要素を変換する必要がある場合に役立ちます。最初の2つの例では、データ変換を完了するためにデータセット全体を完全に処理する必要があります。

例1

この例は、MLTransformを使用してデータを0から1にスケーリングするパイプラインを作成します。この例は、整数のリストを受け取り、ScaleTo01変換を使用して0から1の範囲に変換します。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ScaleTo01
import tempfile

data = [
    {
        'x': [1, 5, 3]
    },
    {
        'x': [4, 2, 8]
    },
]

artifact_location = tempfile.mkdtemp()
scale_to_0_1_fn = ScaleTo01(columns=['x'])

with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          scale_to_0_1_fn)
      | beam.Map(print))

出力

Row(x=array([0.       , 0.5714286, 0.2857143], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1.        ], dtype=float32))

例2

この例は、MLTransformを使用してデータセット全体でボキャブラリを計算し、各固有のボキャブラリ項目にインデックスを割り当てるパイプラインを作成します。これは、要素のリストではなく、単一の要素を入力として受け取ります。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile

artifact_location = tempfile.mkdtemp()
data = [
    {
        'x': ['I', 'love', 'Beam']
    },
    {
        'x': ['Beam', 'is', 'awesome']
    },
]
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

出力

Row(x=array([4, 1, 0]))
Row(x=array([0, 2, 3]))

例3

この例は、MLTransformを使用してデータセット全体でボキャブラリを計算し、各固有のボキャブラリ項目にインデックスを割り当てるパイプラインを作成します。このパイプラインは、要素のリストではなく、単一の要素を入力として受け取ります。

import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
    {
        'x': 'I'
    },
    {
        'x': 'love'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'Beam'
    },
    {
        'x': 'is'
    },
    {
        'x': 'awesome'
    },
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
  transformed_data = (
      p
      | beam.Create(data)
      | MLTransform(write_artifact_location=artifact_location).with_transform(
          compute_and_apply_vocabulary_fn)
      | beam.Map(print))

出力

Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))