異常検知の例
この異常検知の例では、リアルタイムでPub/Subからテキストを読み取り、トレーニング済みのHDBSCANクラスタリングモデルを使用して異常を検知する異常検知パイプラインの設定方法を示します。
異常検知用データセット
この例では、6つの基本的な感情(怒り、恐れ、喜び、愛、悲しみ、驚き)を持つ20,000件の英語のTwitterメッセージを含むemotionというデータセットを使用します。このデータセットには、トレーニング用、検証用、パフォーマンス評価用のテストという3つの分割があります。テキストとデータセットのカテゴリ(クラス)が含まれているため、教師ありデータセットです。このデータセットには、Hugging Faceデータセットページからアクセスできます。
以下のテキストは、データセットのトレーニング分割からの例を示しています。
テキスト | 感情の種類 |
---|---|
欲張りで間違っていると感じています。 | 怒り |
暖炉について郷愁を感じるときはいつでも、それがまだ敷地内にあることを知っています。 | 愛 |
推奨量の2倍、または数回服用してきました。眠りにつくのがずっと早くなりましたが、同時にとても奇妙な気分です。 | 恐れ |
デンマークへのボート旅行中です。 | 喜び |
SFの世界では、自分が偽物のように感じています。 | 悲しみ |
幻覚、動く人や物、音、振動に苦しめられ、週に数回発生し始めました。 | 恐れ |
異常検知アルゴリズム
HDBSCANは、DBSCANを階層型クラスタリングアルゴリズムに変換し、クラスタの安定性に基づいてフラットクラスタリングを抽出することで拡張したクラスタリングアルゴリズムです。モデルはトレーニングされると、新しいデータポイントが外れ値の場合は`-1`を予測し、それ以外の場合は既存のクラスタのいずれかを予測します。
Pub/Subへのインジェスト
クラスタリング中にモデルがPub/Subからツイートを読み取ることができるように、データをPub/Subにインジェストします。Pub/Subは、アプリケーションやサービス間でイベントデータを交換するためのメッセージングサービスです。ストリーミング分析とデータ統合パイプラインは、データのインジェストと配信にPub/Subを使用します。
Pub/Subへのデータインジェストに関する完全なコード例は、GitHubを参照してください。
インジェストパイプラインのファイル構造を以下の図に示します。
write_data_to_pubsub_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── utils.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/utils.py
には、感情データセットのロードとデータ変換に使用される2つのbeam.DoFn
のコードが含まれています。
pipeline/options.py
には、Dataflowパイプラインを設定するためのパイプラインオプションが含まれています。
config.py
は、Google Cloud PROJECT_IDやNUM_WORKERSなど、複数回使用される変数を定義します。
setup.py
は、パイプラインを実行するためのパッケージと要件を定義します。
main.py
には、パイプラインコードとパイプラインの実行に使用される追加の関数が含まれています。
パイプラインの実行
パイプラインを実行するには、必要なパッケージをインストールします。この例では、Google Cloudプロジェクトへのアクセス権が必要です。また、PROJECT_ID
、REGION
、PubSub TOPIC_ID
などのGoogle Cloud変数をconfig.py
ファイルで設定する必要があります。
- ローカルマシンで:
python main.py
- GCPでDataflowを使用する場合:
python main.py --mode cloud
write_data_to_pubsub_pipeline
には4つの異なるトランスフォームが含まれています。
- Hugging Faceデータセットを使用して感情データセットをロードします(簡略化のため、6つのクラスではなく3つのクラスからサンプルを取得します)。
- 各テキストに一意の識別子(UID)を関連付けます。
- テキストをPub/Subが期待する形式に変換します。
- フォーマットされたメッセージをPub/Subに書き込みます。
ストリーミングデータに対する異常検知
データをPub/Subにインジェストした後、異常検知パイプラインを実行します。このパイプラインは、Pub/Subからストリーミングメッセージを読み取り、言語モデルを使用してテキストを埋め込みに変換し、既にトレーニング済みのクラスタリングモデルに埋め込みを送信して、メッセージが異常かどうかを予測します。このパイプラインの前提条件の1つは、データセットのトレーニング分割でトレーニングされたHDBSCANクラスタリングモデルを用意することです。
異常検知の完全なコード例は、GitHubを参照してください。
以下の図は、異常検知パイプラインのファイル構造を示しています。
anomaly_detection_pipeline/
├── pipeline/
│ ├── __init__.py
│ ├── options.py
│ └── transformations.py
├── __init__.py
├── config.py
├── main.py
└── setup.py
pipeline/transformations.py
には、パイプラインで使用される様々なbeam.DoFn
と追加関数のコードが含まれています。
pipeline/options.py
には、Dataflowパイプラインを設定するためのパイプラインオプションが含まれています。
config.py
は、Google CloudのPROJECT_IDやNUM_WORKERSなど、複数回使用される変数を定義しています。
setup.py
は、パイプラインを実行するためのパッケージと要件を定義します。
main.py
には、パイプラインコードと、パイプラインを実行するために使用される追加関数が含まれています。
パイプラインの実行
必要なパッケージをインストールし、データをPub/Subにプッシュします。この例では、Google Cloudプロジェクトへのアクセス権が必要であり、PROJECT_ID
、REGION
、PubSub SUBSCRIPTION_ID
など、Google Cloud変数をconfig.py
ファイルで設定する必要があります。
- ローカルマシンで:
python main.py
- GCPでDataflowを使用する場合:
python main.py --mode cloud
パイプラインには、以下のステップが含まれています。
- Pub/Subからメッセージを読み取ります。
- Pub/Subメッセージを、キーがUIDで値がTwitterテキストである辞書
PCollection
に変換します。 - トークナイザーを使用して、テキストをトランスフォーマーで読み取り可能なトークンID整数にエンコードします。
- RunInferenceを使用して、トランスフォーマーベースの言語モデルからベクトル埋め込みを取得します。
- 埋め込みを正規化します。
- RunInferenceを使用して、訓練済みのHDBSCANクラスタリングモデルから異常予測を取得します。
- 予測をBigQueryに書き込み、必要に応じてクラスタリングモデルを再トレーニングできるようにします。
- 異常が検出された場合、メールアラートを送信します。
以下のコードスニペットは、パイプラインの最初の2つのステップを示しています。
次のセクションでは、以下のパイプラインステップについて説明します。
- テキストのトークン化
- RunInferenceを使用した埋め込みの取得
- HDBSCANモデルからの予測の取得
言語モデルからの埋め込みの取得
テキストデータでクラスタリングを行うには、まずテキストを統計分析に適した数値ベクトルの集合にマッピングする必要があります。この例では、sentence-transformers/stsb-distilbert-baseと呼ばれるトランスフォーマーベースの言語モデルを使用しています。このモデルは、文と段落を768次元の密ベクトル空間にマッピングし、クラスタリングや意味検索などのタスクに使用できます。
言語モデルは生のテキストではなくトークン化された入力を期待しているため、まずテキストのトークン化から始めます。トークン化は、予測を取得するためにモデルにテキストを入力できるように変換する前処理タスクです。
ここで、tokenize_sentence
は、テキストとIDを含む辞書を受け取り、テキストをトークン化し、テキストとID、およびトークン化された出力をタプルとして返す関数です。
Tokenizer = AutoTokenizer.from_pretrained(cfg.TOKENIZER_NAME)
def tokenize_sentence(input_dict):
"""
Takes a dictionary with a text and an id, tokenizes the text, and
returns a tuple of the text and id and the tokenized text
Args:
input_dict: a dictionary with the text and id of the sentence
Returns:
A tuple of the text and id, and a dictionary of the tokens.
"""
text, uid = input_dict["text"], input_dict["id"]
tokens = Tokenizer([text], padding=True, truncation=True, return_tensors="pt")
tokens = {key: torch.squeeze(val) for key, val in tokens.items()}
return (text, uid), tokens
トークン化された出力は、埋め込みを取得するために言語モデルに渡されます。言語モデルから埋め込みを取得するために、Apache BeamのRunInference()
を使用します。
embedding_model_handler
はPytorchModelHandler
をラップしてバッチサイズを1に制限するPytorchNoBatchModelHandler
を定義します。
# Can be removed once: https://github.com/apache/beam/issues/21863 is fixed
class PytorchNoBatchModelHandler(PytorchModelHandlerKeyedTensor):
"""Wrapper to PytorchModelHandler to limit batch size to 1.
The tokenized strings generated from BertTokenizer may have different
lengths, which doesn't work with torch.stack() in current RunInference
implementation since stack() requires tensors to be the same size.
Restricting max_batch_size to 1 means there is only 1 example per `batch`
in the run_inference() call.
"""
def batch_elements_kwargs(self):
return {"max_batch_size": 1}
DistilBertModel
のforward()
は埋め込みを返さないため、ベクトル埋め込みを取得するために、モデルクラスModelWrapper
をカスタム定義します。
class ModelWrapper(DistilBertModel):
"""Wrapper to DistilBertModel to get embeddings when calling
forward function."""
def forward(self, **kwargs):
output = super().forward(**kwargs)
sentence_embedding = (
self.mean_pooling(output,
kwargs["attention_mask"]).detach().cpu().numpy())
return sentence_embedding
# Mean Pooling - Take attention mask into account for correct averaging
def mean_pooling(self, model_output, attention_mask):
"""
Calculates the mean of token embeddings
Args:
model_output: The output of the model.
attention_mask: This is a tensor that contains 1s for all input tokens and
0s for all padding tokens.
Returns:
The mean of the token embeddings.
"""
token_embeddings = model_output[
0] # First element of model_output contains all token embeddings
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float())
return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9)
各Twitterテキストの埋め込みを取得した後、訓練済みのモデルは正規化された埋め込みを期待しているため、埋め込みを正規化します。
予測の取得
正規化された埋め込みは、次に訓練済みのHDBSCANモデルに転送され、予測が取得されます。
ここで、clustering_model_handler
は
SklearnModelHandlerNumpy
をラップしてバッチサイズを1に制限し、異常予測を取得するためにhdbscan.approximate_predict()
が使用されるようにrun_inference
をオーバーライドするCustomSklearnModelHandlerNumpy
を定義します。
class CustomSklearnModelHandlerNumpy(SklearnModelHandlerNumpy):
# limit batch size to 1 can be removed once: https://github.com/apache/beam/issues/21863 is fixed
def batch_elements_kwargs(self):
"""Limit batch size to 1 for inference"""
return {"max_batch_size": 1}
# run_inference can be removed once: https://github.com/apache/beam/issues/22572 is fixed
def run_inference(self, batch, model, inference_args=None):
"""Runs inferences on a batch of numpy arrays.
Args:
batch: A sequence of examples as numpy arrays. They should
be single examples.
model: A numpy model or pipeline. Must implement predict(X).
Where the parameter X is a numpy array.
inference_args: Any additional arguments for an inference.
Returns:
An Iterable of type PredictionResult.
"""
_validate_inference_args(inference_args)
vectorized_batch = np.vstack(batch)
predictions = hdbscan.approximate_predict(model, vectorized_batch)
return [PredictionResult(x, y) for x, y in zip(batch, predictions)]
モデル予測を取得した後、RunInference
からの出力を辞書にデコードします。次に、分析のためにBigQueryテーブルに予測を保存し、HDBSCANモデルを更新し、予測が異常である場合はメールアラートを送信します。
_ = (
predictions
| "Decode Prediction" >> beam.ParDo(DecodePrediction())
| "Write to BQ" >> beam.io.WriteToBigQuery(
table=cfg.TABLE_URI,
schema=cfg.TABLE_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
))
_ = predictions | "Alert by Email" >> beam.ParDo(TriggerEmailAlert())
最終更新日:2024/10/31
お探しのものはお見つけいただけましたか?
すべて役に立ち、分かりやすかったでしょうか?変更したい点があれば教えてください!