BigQuery ML 統合

このページのサンプルでは、BigQuery ML (BQML)からエクスポートされたモデルを、TFX Basic Shared Libraries (tfx_bsl)を使用してApache Beamパイプラインに統合する方法を示します。

概略的に、以下のセクションでは、次の手順を詳しく説明します。

  1. BigQuery MLモデルの作成とトレーニング
  2. BigQuery MLモデルのエクスポート
  3. 真新しいBigQuery MLモデルを使用するトランスフォームを作成する

BigQuery MLモデルの作成とトレーニング

tfx_bslを使用してBQMLモデルをApache Beamパイプラインに組み込むには、TensorFlow SavedModel形式にする必要があります。BQMLのさまざまなモデルタイプとそのエクスポートモデル形式のマッピングの概要については、こちらを参照してください。

簡略化のため、公開されているGoogleアナリティクスのサンプルデータセット(日付でシャード化されたテーブルです。代わりに、パーティション化されたテーブルに遭遇する場合もあります)を使用して、BQMLクイックスタートガイドにある(簡略化されたバージョンの)ロジスティック回帰モデルをトレーニングします。BQMLを使用して作成できるすべてのモデルの概要については、こちらを参照してください。

BigQueryデータセットを作成した後、SQLで完全に定義されたモデルを作成します。

CREATE MODEL IF NOT EXISTS `bqml_tutorial.sample_model`
OPTIONS(model_type='logistic_reg', input_label_cols=["label"]) AS
SELECT
  IF(totals.transactions IS NULL, 0, 1) AS label,
  IFNULL(geoNetwork.country, "") AS country
FROM
  `bigquery-public-data.google_analytics_sample.ga_sessions_*`
WHERE
  _TABLE_SUFFIX BETWEEN '20160801' AND '20170630'

このモデルは、2016年8月1日から2017年6月30日までに収集されたデータに基づいて、訪問者の国を考慮した購入予測を行います。

BigQuery MLモデルのエクスポート

モデルをApache Beamパイプラインに組み込むには、エクスポートする必要があります。そのためには、bqコマンドラインツールのインストールと、エクスポートされたモデルを保存するためのGoogle Cloud Storageバケットの作成が必要です。

次のコマンドを使用してモデルをエクスポートします。

bq extract -m bqml_tutorial.sample_model gs://some/gcs/path

BigQuery MLモデルを使用するApache Beamトランスフォームの作成

このセクションでは、作成してエクスポートしたBigQuery MLモデルを使用するApache Beamパイプラインを構築します。このモデルはGoogle Cloud AI Platform Predictionを使用して提供できます。これについては、AI Platformパターンを参照してください。ここでは、tfx_bslライブラリを使用してローカル予測(Apache Beamワーカー上)を行う方法を示します。

まず、モデルをパイプラインの残りの部分を開発するローカルディレクトリ(例:serving_dir/sample_model/1)にダウンロードする必要があります。

次に、通常どおりパイプラインの開発を開始できます。tfx_bslライブラリのRunInference PTransformを使用し、モデルが保存されているローカルディレクトリを指定します(コード例ではmodel_path変数を参照)。このトランスフォームは、tf.train.Example型の要素を入力として受け取り、tensorflow_serving.apis.prediction_log_pb2.PredictionLog型の要素を出力します。モデルのシグネチャに応じて、出力から値を抽出できます。ここでは、ロジスティック回帰モデルに関するドキュメントに従って、extract_prediction関数でlabel_probslabel_valuespredicted_labelを抽出します。

import apache_beam
import tensorflow as tf
from google.protobuf import text_format
from tensorflow.python.framework import tensor_util
from tfx_bsl.beam import run_inference
from tfx_bsl.public.beam import RunInference
from tfx_bsl.public.proto import model_spec_pb2


inputs = tf.train.Example(features=tf.train.Features(
            feature={
                'os': tf.train.Feature(bytes_list=tf.train.BytesList(b"Microsoft"))
            })
          )

model_path = "serving_dir/sample_model/1"

def extract_prediction(response):
  yield response.predict_log.response.outputs['label_values'].string_val,
        tensor_util.MakeNdarray(response.predict_log.response.outputs['label_probs']),
        response.predict_log.response.outputs['predicted_label'].string_val

with beam.Pipeline() as p:
    res = (
        p
        | beam.Create([inputs])
        | RunInference(
            model_spec_pb2.InferenceSpecType(
                saved_model_spec=model_spec_pb2.SavedModelSpec(
                    model_path=model_path,
                    signature_name=['serving_default'])))
        | beam.ParDo(extract_prediction)
Implemented in Python.