AI Platform統合パターン

このページでは、Google Cloud AI Platformトランスフォームを使用したパイプラインにおける一般的なパターンについて説明します。

テキストの構造と意味の分析

このセクションでは、Google Cloud Natural Language API を使用してテキスト分析を実行する方法を示します。

Beamは、AnnotateText というPTransformを提供します。このトランスフォームは、Document型のPCollectionを受け取ります。各Documentオブジェクトには、テキストに関する様々な情報が含まれています。これには、コンテンツ、プレーンテキストかHTMLか、オプションの言語ヒント、その他の設定が含まれます。`AnnotateText`は、APIから返される`AnnotateTextResponse`型のレスポンスオブジェクトを生成します。`AnnotateTextResponse`は、多くの属性(その一部は複雑な構造)を含むprotobufメッセージです。

以下は、文字列のインメモリPCollectionを作成し、各文字列をDocumentオブジェクトに変更し、Natural Language APIを呼び出すパイプラインの例です。次に、各レスポンスオブジェクトに対して、分析の結果を抽出する関数が呼び出されます。

features = nlp.types.AnnotateTextRequest.Features(
    extract_entities=True,
    extract_document_sentiment=True,
    extract_entity_sentiment=True,
    extract_syntax=True,
)

with beam.Pipeline() as pipeline:
  responses = (
      pipeline
      | beam.Create([
          'My experience so far has been fantastic! '
          'I\'d really recommend this product.'
      ])
      | beam.Map(lambda x: nlp.Document(x, type='PLAIN_TEXT'))
      | nlp.AnnotateText(features))

  _ = (
      responses
      | beam.Map(extract_sentiments)
      | 'Parse sentiments to JSON' >> beam.Map(json.dumps)
      | 'Write sentiments' >> beam.io.WriteToText('sentiments.txt'))

  _ = (
      responses
      | beam.Map(extract_entities)
      | 'Parse entities to JSON' >> beam.Map(json.dumps)
      | 'Write entities' >> beam.io.WriteToText('entities.txt'))

  _ = (
      responses
      | beam.Map(analyze_dependency_tree)
      | 'Parse adjacency list to JSON' >> beam.Map(json.dumps)
      | 'Write adjacency list' >> beam.io.WriteToText('adjancency_list.txt'))
AnnotateTextRequest.Features features =
    AnnotateTextRequest.Features.newBuilder()
        .setExtractEntities(true)
        .setExtractDocumentSentiment(true)
        .setExtractEntitySentiment(true)
        .setExtractSyntax(true)
        .build();
AnnotateText annotateText = AnnotateText.newBuilder().setFeatures(features).build();

PCollection<AnnotateTextResponse> responses =
    p.apply(
            Create.of(
                "My experience so far has been fantastic, "
                    + "I\'d really recommend this product."))
        .apply(
            MapElements.into(TypeDescriptor.of(Document.class))
                .via(
                    (SerializableFunction<String, Document>)
                        input ->
                            Document.newBuilder()
                                .setContent(input)
                                .setType(Document.Type.PLAIN_TEXT)
                                .build()))
        .apply(annotateText);

responses
    .apply(MapElements.into(TypeDescriptor.of(TextSentiments.class)).via(extractSentiments))
    .apply(
        MapElements.into(TypeDescriptors.strings())
            .via((SerializableFunction<TextSentiments, String>) TextSentiments::toJson))
    .apply(TextIO.write().to("sentiments.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.maps(TypeDescriptors.strings(), TypeDescriptors.strings()))
            .via(extractEntities))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapEntitiesToJson))
    .apply(TextIO.write().to("entities.txt"));

responses
    .apply(
        MapElements.into(
                TypeDescriptors.lists(
                    TypeDescriptors.maps(
                        TypeDescriptors.strings(),
                        TypeDescriptors.lists(TypeDescriptors.strings()))))
            .via(analyzeDependencyTree))
    .apply(MapElements.into(TypeDescriptors.strings()).via(mapDependencyTreesToJson))
    .apply(TextIO.write().to("adjacency_list.txt"));

感情の抽出

これは、APIから返されるレスポンスオブジェクトの一部です。文レベルの感情は`sentences`属性で見つけることができます。`sentences`は標準的なPythonシーケンスのように動作するため、すべての主要な言語機能(反復処理やスライシングなど)が機能します。全体的な感情は`document_sentiment`属性で見つけることができます。

sentences {
  text {
    content: "My experience so far has been fantastic!"
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}
sentences {
  text {
    content: "I\'d really recommend this product."
    begin_offset: 41
  }
  sentiment {
    magnitude: 0.8999999761581421
    score: 0.8999999761581421
  }
}

...many lines omitted

document_sentiment {
  magnitude: 1.899999976158142
  score: 0.8999999761581421
}

文レベルとドキュメントレベルの感情に関する情報を抽出する関数は、次のコードスニペットに示されています。

return {
    'sentences': [{
        sentence.text.content: sentence.sentiment.score
    } for sentence in response.sentences],
    'document_sentiment': response.document_sentiment.score,
}
extractSentiments =
(SerializableFunction<AnnotateTextResponse, TextSentiments>)
    annotateTextResponse -> {
      TextSentiments sentiments = new TextSentiments();
      sentiments.setDocumentSentiment(
          annotateTextResponse.getDocumentSentiment().getMagnitude());
      Map<String, Float> sentenceSentimentsMap =
          annotateTextResponse.getSentencesList().stream()
              .collect(
                  Collectors.toMap(
                      (Sentence s) -> s.getText().getContent(),
                      (Sentence s) -> s.getSentiment().getMagnitude()));
      sentiments.setSentenceSentiments(sentenceSentimentsMap);
      return sentiments;
    };

このスニペットは`sentences`をループし、各文について感情スコアを抽出します。

出力は次のとおりです。

{"sentences": [{"My experience so far has been fantastic!": 0.8999999761581421}, {"I'd really recommend this product.": 0.8999999761581421}], "document_sentiment": 0.8999999761581421}

エンティティの抽出

次の関数は、エンティティについてレスポンスを検査し、それらのエンティティの名前とタイプを返します。

return [{
    'name': entity.name,
    'type': nlp.enums.Entity.Type(entity.type).name,
} for entity in response.entities]
extractEntities =
(SerializableFunction<AnnotateTextResponse, Map<String, String>>)
    annotateTextResponse ->
        annotateTextResponse.getEntitiesList().stream()
            .collect(
                Collectors.toMap(Entity::getName, (Entity e) -> e.getType().toString()));

エンティティは`entities`属性で見つけることができます。前と同じように、`entities`はシーケンスであるため、リスト内包表記は適切な選択肢です。最も難しい部分は、エンティティのタイプを解釈することです。Natural Language APIは、エンティティタイプを列挙型として定義します。レスポンスオブジェクトでは、エンティティタイプは整数として返されます。そのため、ユーザーは`naturallanguageml.enums.Entity.Type`をインスタンス化して、人間が読み取れる名前を取得する必要があります。

出力は次のとおりです。

[{"name": "experience", "type": "OTHER"}, {"name": "product", "type": "CONSUMER_GOOD"}]

文の依存木へのアクセス

次のコードは、文をループし、各文について、依存木を表す隣接リストを構築します。依存木とは何かについての詳細は、形態論と依存木を参照してください。

from collections import defaultdict
adjacency_lists = []

index = 0
for sentence in response.sentences:
  adjacency_list = defaultdict(list)
  sentence_begin = sentence.text.begin_offset
  sentence_end = sentence_begin + len(sentence.text.content) - 1

  while index < len(response.tokens) and \
      response.tokens[index].text.begin_offset <= sentence_end:
    token = response.tokens[index]
    head_token_index = token.dependency_edge.head_token_index
    head_token_text = response.tokens[head_token_index].text.content
    adjacency_list[head_token_text].append(token.text.content)
    index += 1
  adjacency_lists.append(adjacency_list)
analyzeDependencyTree =
    (SerializableFunction<AnnotateTextResponse, List<Map<String, List<String>>>>)
        response -> {
          List<Map<String, List<String>>> adjacencyLists = new ArrayList<>();
          int index = 0;
          for (Sentence s : response.getSentencesList()) {
            Map<String, List<String>> adjacencyMap = new HashMap<>();
            int sentenceBegin = s.getText().getBeginOffset();
            int sentenceEnd = sentenceBegin + s.getText().getContent().length() - 1;
            while (index < response.getTokensCount()
                && response.getTokens(index).getText().getBeginOffset() <= sentenceEnd) {
              Token token = response.getTokensList().get(index);
              int headTokenIndex = token.getDependencyEdge().getHeadTokenIndex();
              String headTokenContent =
                  response.getTokens(headTokenIndex).getText().getContent();
              List<String> adjacencyList =
                  adjacencyMap.getOrDefault(headTokenContent, new ArrayList<>());
              adjacencyList.add(token.getText().getContent());
              adjacencyMap.put(headTokenContent, adjacencyList);
              index++;
            }
            adjacencyLists.add(adjacencyMap);
          }
          return adjacencyLists;
        };

出力は以下です。可読性を高めるため、インデックスは参照するテキストに置き換えられています。

[
  {
    "experience": [
      "My"
    ],
    "been": [
      "experience",
      "far",
      "has",
      "been",
      "fantastic",
      "!"
    ],
    "far": [
      "so"
    ]
  },
  {
    "recommend": [
      "I",
      "'d",
      "really",
      "recommend",
      "product",
      "."
    ],
    "product": [
      "this"
    ]
  }
]

予測の取得

このセクションでは、Google Cloud AI Platform Prediction を使用して、クラウドホスト型の機械学習モデルから新しいデータについて予測を行う方法を示します。

tfx_bslは、`RunInference`というBeam PTransformを含むライブラリです。`RunInference`は、データの受信に外部サービスエンドポイントを使用できる推論を実行できます。サービスエンドポイントを使用する場合、このトランスフォームは`tf.train.Example`型のPCollectionを受け取り、要素のバッチごとにAI Platform Predictionにリクエストを送信します。バッチのサイズは自動的に計算されます。Beamが最適なバッチサイズを見つける方法の詳細については、BatchElementsのdocstringを参照してください。現在、このトランスフォームは`tf.train.SequenceExample`を入力として使用できませんが、作業中です。

このトランスフォームは、予測を含む`PredictionLog`型のPCollectionを生成します。

開始する前に、TensorFlowモデルをAI Platform Predictionにデプロイします。クラウドサービスは、効率的でスケーラブルな方法で予測リクエストを処理するために必要なインフラストラクチャを管理します。このトランスフォームではTensorFlowモデルのみがサポートされていることに注意してください。詳細については、予測のためのSavedModelのエクスポートを参照してください。

機械学習モデルがデプロイされたら、予測を取得するインスタンスのリストを準備します。バイナリデータを送信するには、入力の名前が`_bytes`で終わっていることを確認します。これにより、リクエストを送信する前にデータがbase64エンコードされます。

以下は、ファイルから入力インスタンスを読み取り、JSONオブジェクトを`tf.train.Example`オブジェクトに変換し、データをAI Platform Predictionに送信するパイプラインの例です。ファイルの内容は次のようになります。

{"input": "the quick brown"}
{"input": "la bruja le"}

この例では`tf.train.BytesList`インスタンスを作成するため、バイトのような文字列を入力として期待します。ただし、`tf.train.FloatList`や`tf.train.Int64List`などの他のデータ型も、このトランスフォームでサポートされています。

コードは以下のとおりです。

import json

import apache_beam as beam

import tensorflow as tf
from tfx_bsl.beam.run_inference import RunInference
from tfx_bsl.proto import model_spec_pb2

def convert_json_to_tf_example(json_obj):
  samples = json.loads(json_obj)
  for name, text in samples.items():
      value = tf.train.Feature(bytes_list=tf.train.BytesList(
        value=[text.encode('utf-8')]))
      feature = {name: value}
      return tf.train.Example(features=tf.train.Features(feature=feature))

with beam.Pipeline() as p:
     _ = (p
         | beam.io.ReadFromText('gs://my-bucket/samples.json')
         | beam.Map(convert_json_to_tf_example)
         | RunInference(
             model_spec_pb2.InferenceEndpoint(
                 model_endpoint_spec=model_spec_pb2.AIPlatformPredictionModelSpec(
                     project_id='my-project-id',
                     model_name='my-model-name',
                     version_name='my-model-version'))))
// Getting predictions is not yet available for Java. [https://github.com/apache/beam/issues/20001]