異常検知の例

この異常検知の例では、リアルタイムでPub/Subからテキストを読み取り、トレーニング済みのHDBSCANクラスタリングモデルを使用して異常を検知する異常検知パイプラインの設定方法を示します。

異常検知用データセット

この例では、6つの基本的な感情(怒り、恐れ、喜び、愛、悲しみ、驚き)を持つ20,000件の英語のTwitterメッセージを含むemotionというデータセットを使用します。このデータセットには、トレーニング用、検証用、パフォーマンス評価用のテストという3つの分割があります。テキストとデータセットのカテゴリ(クラス)が含まれているため、教師ありデータセットです。このデータセットには、Hugging Faceデータセットページからアクセスできます。

以下のテキストは、データセットのトレーニング分割からの例を示しています。

テキスト感情の種類
欲張りで間違っていると感じています。怒り
暖炉について郷愁を感じるときはいつでも、それがまだ敷地内にあることを知っています。
推奨量の2倍、または数回服用してきました。眠りにつくのがずっと早くなりましたが、同時にとても奇妙な気分です。恐れ
デンマークへのボート旅行中です。喜び
SFの世界では、自分が偽物のように感じています。悲しみ
幻覚、動く人や物、音、振動に苦しめられ、週に数回発生し始めました。恐れ

異常検知アルゴリズム

HDBSCANは、DBSCANを階層型クラスタリングアルゴリズムに変換し、クラスタの安定性に基づいてフラットクラスタリングを抽出することで拡張したクラスタリングアルゴリズムです。モデルはトレーニングされると、新しいデータポイントが外れ値の場合は`-1`を予測し、それ以外の場合は既存のクラスタのいずれかを予測します。

Pub/Subへのインジェスト

クラスタリング中にモデルがPub/Subからツイートを読み取ることができるように、データをPub/Subにインジェストします。Pub/Subは、アプリケーションやサービス間でイベントデータを交換するためのメッセージングサービスです。ストリーミング分析とデータ統合パイプラインは、データのインジェストと配信にPub/Subを使用します。

Pub/Subへのデータインジェストに関する完全なコード例は、GitHubを参照してください。

インジェストパイプラインのファイル構造を以下の図に示します。

write_data_to_pubsub_pipeline/
├── pipeline/
│   ├── __init__.py
│   ├── options.py
│   └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py

pipeline/utils.pyには、感情データセットのロードとデータ変換に使用される2つのbeam.DoFnのコードが含まれています。

pipeline/options.pyには、Dataflowパイプラインを設定するためのパイプラインオプションが含まれています。

config.pyは、Google Cloud PROJECT_IDやNUM_WORKERSなど、複数回使用される変数を定義します。

setup.pyは、パイプラインを実行するためのパッケージと要件を定義します。

main.pyには、パイプラインコードとパイプラインの実行に使用される追加の関数が含まれています。

パイプラインの実行

パイプラインを実行するには、必要なパッケージをインストールします。この例では、Google Cloudプロジェクトへのアクセス権が必要です。また、PROJECT_IDREGIONPubSub TOPIC_IDなどのGoogle Cloud変数をconfig.pyファイルで設定する必要があります。

  1. ローカルマシンで:python main.py
  2. GCPでDataflowを使用する場合:python main.py --mode cloud

write_data_to_pubsub_pipelineには4つの異なるトランスフォームが含まれています。

  1. Hugging Faceデータセットを使用して感情データセットをロードします(簡略化のため、6つのクラスではなく3つのクラスからサンプルを取得します)。
  2. 各テキストに一意の識別子(UID)を関連付けます。
  3. テキストをPub/Subが期待する形式に変換します。
  4. フォーマットされたメッセージをPub/Subに書き込みます。

ストリーミングデータに対する異常検知

データをPub/Subにインジェストした後、異常検知パイプラインを実行します。このパイプラインは、Pub/Subからストリーミングメッセージを読み取り、言語モデルを使用してテキストを埋め込みに変換し、既にトレーニング済みのクラスタリングモデルに埋め込みを送信して、メッセージが異常かどうかを予測します。このパイプラインの前提条件の1つは、データセットのトレーニング分割でトレーニングされたHDBSCANクラスタリングモデルを用意することです。

異常検知の完全なコード例は、GitHubを参照してください。

以下の図は、異常検知パイプラインのファイル構造を示しています。

anomaly_detection_pipeline/
├── pipeline/
│   ├── __init__.py
│   ├── options.py
│   └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py

pipeline/transformations.pyには、パイプラインで使用される様々なbeam.DoFnと追加関数のコードが含まれています。

pipeline/options.pyには、Dataflowパイプラインを設定するためのパイプラインオプションが含まれています。

config.pyは、Google CloudのPROJECT_IDやNUM_WORKERSなど、複数回使用される変数を定義しています。

setup.pyは、パイプラインを実行するためのパッケージと要件を定義します。

main.pyには、パイプラインコードと、パイプラインを実行するために使用される追加関数が含まれています。

パイプラインの実行

必要なパッケージをインストールし、データをPub/Subにプッシュします。この例では、Google Cloudプロジェクトへのアクセス権が必要であり、PROJECT_IDREGIONPubSub SUBSCRIPTION_IDなど、Google Cloud変数をconfig.pyファイルで設定する必要があります。

  1. ローカルマシンで:python main.py
  2. GCPでDataflowを使用する場合:python main.py --mode cloud

パイプラインには、以下のステップが含まれています。

  1. Pub/Subからメッセージを読み取ります。
  2. Pub/Subメッセージを、キーがUIDで値がTwitterテキストである辞書PCollectionに変換します。
  3. トークナイザーを使用して、テキストをトランスフォーマーで読み取り可能なトークンID整数にエンコードします。
  4. RunInferenceを使用して、トランスフォーマーベースの言語モデルからベクトル埋め込みを取得します。
  5. 埋め込みを正規化します。
  6. RunInferenceを使用して、訓練済みのHDBSCANクラスタリングモデルから異常予測を取得します。
  7. 予測をBigQueryに書き込み、必要に応じてクラスタリングモデルを再トレーニングできるようにします。
  8. 異常が検出された場合、メールアラートを送信します。

以下のコードスニペットは、パイプラインの最初の2つのステップを示しています。

    docs = (
        pipeline
        | "Read from PubSub"
        >> ReadFromPubSub(subscription=cfg.SUBSCRIPTION_ID, with_attributes=True)
        | "Decode PubSubMessage" >> beam.ParDo(Decode())
    )

次のセクションでは、以下のパイプラインステップについて説明します。

言語モデルからの埋め込みの取得

テキストデータでクラスタリングを行うには、まずテキストを統計分析に適した数値ベクトルの集合にマッピングする必要があります。この例では、sentence-transformers/stsb-distilbert-baseと呼ばれるトランスフォーマーベースの言語モデルを使用しています。このモデルは、文と段落を768次元の密ベクトル空間にマッピングし、クラスタリングや意味検索などのタスクに使用できます。

言語モデルは生のテキストではなくトークン化された入力を期待しているため、まずテキストのトークン化から始めます。トークン化は、予測を取得するためにモデルにテキストを入力できるように変換する前処理タスクです。

    normalized_embedding = (
        docs
        | "Tokenize Text" >> beam.Map(tokenize_sentence)

ここで、tokenize_sentenceは、テキストとIDを含む辞書を受け取り、テキストをトークン化し、テキストとID、およびトークン化された出力をタプルとして返す関数です。

Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)


def tokenize_sentence(input_dict):
  """
    Takes a dictionary with a text and an id, tokenizes the text, and
    returns a tuple of the text and id and the tokenized text

    Args:
      input_dict: a dictionary with the text and id of the sentence

    Returns:
      A tuple of the text and id, and a dictionary of the tokens.
    """
  text, uid = input_dict["text"], input_dict["id"]
  tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
  tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
  return (text, uid), tokens

トークン化された出力は、埋め込みを取得するために言語モデルに渡されます。言語モデルから埋め込みを取得するために、Apache BeamのRunInference()を使用します。

    | "Get Embedding" >> RunInference(KeyedModelHandler(embedding_model_handler))
ここで、embedding_model_handler

    embedding_model_handler = PytorchNoBatchModelHandler(
        state_dict_path=cfg.MODEL_STATE_DICT_PATH,
        model_class=ModelWrapper,
        model_params={"config": AutoConfig.from_pretrained(cfg.MODEL_CONFIG_PATH)},
        device="cpu",
    )

PytorchModelHandlerをラップしてバッチサイズを1に制限するPytorchNoBatchModelHandlerを定義します。

# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
  """Wrapper to PytorchModelHandler to limit batch size to 1.
    The tokenized strings generated from BertTokenizer may have different
    lengths, which doesn't work with torch.stack() in current RunInference
    implementation since stack() requires tensors to be the same size.
    Restricting max_batch_size to 1 means there is only 1 example per `batch`
    in the run_inference() call.
    """
  def batch_elements_kwargs(self):
    return {"max_batch_size": 1}

DistilBertModelforward()は埋め込みを返さないため、ベクトル埋め込みを取得するために、モデルクラスModelWrapperをカスタム定義します。

class ModelWrapper(DistilBertModel):
  """Wrapper to DistilBertModel to get embeddings when calling
    forward function."""
  def forward(self, **kwargs):
    output = super().forward(**kwargs)
    sentence_embedding = (
        self.mean_pooling(output,
                          kwargs["attention_mask"]).detach().cpu().numpy())
    return sentence_embedding

  # Mean Pooling - Take attention mask into account for correct averaging
  def mean_pooling(self, model_output, attention_mask):
    """
        Calculates the mean of token embeddings

        Args:
          model_output: The output of the model.
          attention_mask: This is a tensor that contains 1s for all input tokens and
          0s for all padding tokens.

        Returns:
          The mean of the token embeddings.
        """
    token_embeddings = model_output[
        0]  # First element of model_output contains all token embeddings
    input_mask_expanded = (
        attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
        input_mask_expanded.sum(1), min=1e-9)

各Twitterテキストの埋め込みを取得した後、訓練済みのモデルは正規化された埋め込みを期待しているため、埋め込みを正規化します。

    | "Normalize Embedding" >> beam.ParDo(NormalizeEmbedding())

予測の取得

正規化された埋め込みは、次に訓練済みのHDBSCANモデルに転送され、予測が取得されます。

    predictions = (
        normalized_embedding
        | "Get Prediction from Clustering Model"
        >> RunInference(model_handler=clustering_model_handler)
    )

ここで、clustering_model_handler

    clustering_model_handler = KeyedModelHandler(
        CustomSklearnModelHandlerNumpy(
            model_uri=cfg.CLUSTERING_MODEL_PATH, model_file_type=ModelFileType.JOBLIB
        )
    )

SklearnModelHandlerNumpyをラップしてバッチサイズを1に制限し、異常予測を取得するためにhdbscan.approximate_predict()が使用されるようにrun_inferenceをオーバーライドするCustomSklearnModelHandlerNumpyを定義します。

class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
  # limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
  def batch_elements_kwargs(self):
    """Limit batch size to 1 for inference"""
    return {"max_batch_size": 1}

  # run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
  def run_inference(self, batch, model, inference_args=None):
    """Runs inferences on a batch of numpy arrays.

        Args:
          batch: A sequence of examples as numpy arrays. They should
            be single examples.
          model: A numpy model or pipeline. Must implement predict(X).
            Where the parameter X is a numpy array.
          inference_args: Any additional arguments for an inference.

        Returns:
          An Iterable of type PredictionResult.
        """
    _validate_inference_args(inference_args)
    vectorized_batch = np.vstack(batch)
    predictions = hdbscan.approximate_predict(model, vectorized_batch)
    return [PredictionResult(x, y) for x, y in zip(batch, predictions)]

モデル予測を取得した後、RunInferenceからの出力を辞書にデコードします。次に、分析のためにBigQueryテーブルに予測を保存し、HDBSCANモデルを更新し、予測が異常である場合はメールアラートを送信します。

    _ = (
        predictions
        | "Decode Prediction" >> beam.ParDo(DecodePrediction())
        | "Write to BQ" >> beam.io.WriteToBigQuery(
            table=cfg.TABLE_URI,
            schema=cfg.TABLE_SCHEMA,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        ))

    _ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())