Beam MLについて
| ![]() |
Apache Beamを使用して以下のことができます
- 前処理と推論の両方で、大量のデータを処理する。
- プロジェクトの探索段階でデータを活用して実験する。
- 本番環境でML opsエコシステムの一部としてデータパイプラインを拡張する。
- バッチとストリーミングの両方で、変化するデータ負荷で本番環境でモデルを実行する。
AI/MLワークロード
Apache Beamは、データ検証、データ前処理、モデル検証、モデルのデプロイと推論に使用できます。
- データの取り込み:受信した新しいデータは、ファイルシステムまたはデータベースに保存されるか、メッセージキューに公開されます。
- **データ検証**:データを受信した後、データの品質を確認します。たとえば、外れ値を検出し、標準偏差とクラス分布を計算することができます。
- **データの前処理**:データの検証後、モデルのトレーニングに使用できるようにデータを変換します。
- モデルのトレーニング:データの準備ができたら、AI/MLモデルをトレーニングします。この手順は、トレーニングされたモデルの品質に応じて、通常複数回繰り返されます。
- モデル検証:モデルをデプロイする前に、そのパフォーマンスと精度を検証します。
- **モデルのデプロイ**:モデルをデプロイし、それを使用して新しいデータまたは既存のデータに対して推論を実行します。
データの増加と進化に合わせてモデルを最新の状態に保ち、良好なパフォーマンスを維持するには、これらの手順を複数回実行します。さらに、ML opsをプロジェクトに適用して、モデルとデータのライフサイクル全体でAI/MLワークフローを自動化できます。オーケストレーターを使用してこのフローを自動化し、プロジェクト内のさまざまなビルディングブロック間の遷移を処理します。
RunInferenceの使用
RunInference APIは、機械学習推論用に最適化された`PTransform`であり、パイプラインでMLモデルを効率的に使用できます。APIには次の機能が含まれています
- モデルに効率的に供給するために、Apache Beamの`BatchElements`変換を使用して、パイプラインのスループットに基づいて入力を動的にバッチ処理します。
- メモリとスループットの使用量のバランスをとるために、中央モデルマネージャーを使用してロードする最適なモデル数を決定します。スループットを最大化するために、必要に応じてこれらのモデルをスレッドとプロセス間で共有します。
- 自動モデル更新機能を使用して、パイプラインがモデルの最新デプロイバージョンを使用していることを確認します。
- 複数のフレームワークとモデルハブ(Tensorflow、Pytorch、Sklearn、XGBoost、Hugging Face、TensorFlow Hub、Vertex AI、TensorRT、ONNXなど)をサポートします。
- カスタムモデルハンドラーを使用して、任意のフレームワークをサポートします。
- マルチモデルパイプラインをサポートします。
- サポートされているランナーでGPUを使用して推論速度を向上させることができます。詳細については、DataflowドキュメントのDataflowでのGPUをご覧ください。
サポートと制限事項
- RunInference APIは、Apache Beam 2.40.0以降のバージョンでサポートされています。
- モデルハンドラーは、PyTorch、scikit-learn、TensorFlow、Hugging Face、Vertex AI、ONNX、TensorRT、XGBoostで利用できます。カスタムモデルハンドラーを使用することもできます。
- RunInference APIは、バッチパイプラインとストリーミングパイプラインの両方をサポートしています。
- RunInference APIは、リモート推論とランナーワーカーのローカル推論の両方をサポートしています。
BatchElements PTransform
多くのモデルが実装しているベクトル化推論の最適化を利用するために、`BatchElements`変換は、モデルの予測を行う前の中間ステップとして使用されます。この変換は、要素をまとめてバッチ処理します。次に、バッチ処理された要素にRunInferenceの特定のフレームワークの変換が適用されます。たとえば、numpy `ndarrays`の場合は`numpy.stack()`を呼び出し、torch `Tensor`要素の場合は`torch.stack()`を呼び出します。
`beam.BatchElements`の設定をカスタマイズするには、`ModelHandler`で`batch_elements_kwargs`関数をオーバーライドします。たとえば、`min_batch_size`を使用してバッチあたりの最小要素数を設定するか、`max_batch_size`を使用してバッチあたりの最大要素数を設定します。
詳細については、`BatchElements`変換のドキュメントをご覧ください。
共有ヘルパークラス
RunInference実装内で`Shared`クラスを使用すると、プロセスごとにモデルを1回だけロードし、そのプロセスで作成されたすべてのDoFnインスタンスと共有することができます。この機能により、メモリ消費量とモデルのロード時間が削減されます。詳細については、`Shared`クラスのドキュメントをご覧ください。
MLモデルを使用するようにPythonパイプラインを変更する
RunInference変換を使用するには、次のコードをパイプラインに追加します
from apache_beam.ml.inference.base import RunInference
with pipeline as p:
predictions = ( p | 'Read' >> beam.ReadFromSource('a_source')
| 'RunInference' >> RunInference(<model_handler>)
`model_handler`をモデルハンドラーのセットアップコードに置き換えます。
モデルをインポートするには、基になるモデルをラップする`ModelHandler`オブジェクトを設定する必要があります。インポートするモデルハンドラーは、フレームワークと、入力が含まれているデータ構造体の種類によって異なります。 `ModelHandler`オブジェクトでは、`env_vars`キーワード引数を使用して、推論に必要な環境変数を設定することもできます。次の例は、インポートする可能性のあるモデルハンドラーを示しています。
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerPandas
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor
from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerKeyedTensor
from tfx_bsl.public.beam.run_inference import CreateModelHandler
事前トレーニング済みモデルの使用
このセクションでは、PyTorch、Scikit-learn、およびTensorflowで事前学習済みモデルを使用するための要件について説明します。
PyTorch
モデルの保存された重みが含まれるファイルへのパスを提供する必要があります。このパスはパイプラインからアクセスできる必要があります。 RunInference APIとPyTorchフレームワークで事前学習済みモデルを使用するには、次の手順を実行します。
- 事前学習済みの重みをダウンロードし、パイプラインがアクセスできる場所にホストします。
- 次のコードを使用して、モデルの重みのパスをPyTorchの
ModelHandler
に渡します。state_dict_path=<重みへのパス>
。
Apache BeamでPyTorchモデルを実行する方法を示すこのノートブックを参照してください。
Scikit-learn
pickle化されたScikit-learnモデルを含むファイルへのパスを提供する必要があります。このパスはパイプラインからアクセスできる必要があります。 RunInference APIとScikit-learnフレームワークで事前学習済みモデルを使用するには、次の手順を実行します。
- pickle化されたモデルクラスをダウンロードし、パイプラインがアクセスできる場所にホストします。
- 次のコードを使用して、モデルのパスをSklearnの
ModelHandler
に渡します。model_uri=<pickle化されたファイルへのパス>
およびmodel_file_type: <ModelFileType>
。モデルのシリアル化方法に応じて、ModelFileType.PICKLE
またはModelFileType.JOBLIB
を指定できます。
Apache BeamでScikit-learnモデルを実行する方法を示すこのノートブックを参照してください。
TensorFlow
RunInference APIでTensorFlowを使用するには、2つのオプションがあります。
- Apache Beam SDKに組み込まれているTensorFlow Model Handler(
TFModelHandlerNumpy
とTFModelHandlerTensor
)を使用します。- モデルの入力タイプに応じて、
numpy
入力にはTFModelHandlerNumpy
を、tf.Tensor
入力にはTFModelHandlerTensor
をそれぞれ使用します。 - tensorflow 2.7以降を使用します。
model_uri=<学習済みモデルへのパス>
を使用して、モデルのパスをTensorFlowのModelHandler
に渡します。- または、学習済みモデルの保存された重みへのパス、
create_model_fn=<関数>
を使用してモデルを構築する関数、およびmodel_type=ModelType.SAVED_WEIGHTS
を設定することもできます。組み込みのモデルハンドラーでTensorflowモデルを実行する方法を示すこのノートブックを参照してください。
- モデルの入力タイプに応じて、
tfx_bsl
を使用する。- モデル入力が
tf.Example
タイプの場合は、このアプローチを使用します。 tfx_bsl
バージョン1.10.0以降を使用します。tfx_bsl.public.beam.run_inference.CreateModelHandler()
を使用してモデルハンドラーを作成します。apache_beam.ml.inference.base.RunInference
変換でモデルハンドラーを使用します。Apache Beamとtfx-bslでTensorFlowモデルを実行する方法を示すこのノートブックを参照してください。
- モデル入力が
カスタムモデルの使用
サポートされているフレームワークのいずれかで指定されていないモデルを使用する場合、RunInference APIは、任意のカスタム機械学習モデルを使用できるように柔軟に設計されています。モデルを読み込んで推論を実行するために使用するロジックを使用して、独自のModelHandler
またはKeyedModelHandler
を作成するだけで済みます。
簡単な例はこのノートブックにあります。 load_model
メソッドは、一般的なspaCy
パッケージを使用してモデルを読み込む方法を示し、run_inference
は、 examplesのバッチで推論を実行する方法を示しています。.
RunInferenceパターン
このセクションでは、推論パイプラインをよりシンプルに、より堅牢に、そしてより効率的にするために使用できるパターンとベストプラクティスを提案します。
キー付きModelHandlerオブジェクトの使用
examplesにキーが添付されている場合は、KeyedModelHandler
をModelHandler
オブジェクトでラップします。
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler(PytorchModelHandlerTensor(...))
with pipeline as p:
data = p | beam.Create([
('img1', torch.tensor([[1,2,3],[4,5,6],...])),
('img2', torch.tensor([[1,2,3],[4,5,6],...])),
('img3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
データにキーが付けられているかどうかわからない場合は、MaybeKeyedModelHandler
を使用できます。
また、KeyedModelHandler
を使用して、関連付けられたキーに基づいていくつかの異なるモデルを読み込むこともできます。次の例では、config1
を使用してモデルを読み込みます。そのモデルは、key1
に関連付けられたすべての examplesの推論に使用されます。 config2
を使用して2番目のモデルを読み込みます。そのモデルは、key2
およびkey3
に関連付けられたすべての examplesに使用されます。
from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>))
])
with pipeline as p:
data = p | beam.Create([
('key1', torch.tensor([[1,2,3],[4,5,6],...])),
('key2', torch.tensor([[1,2,3],[4,5,6],...])),
('key3', torch.tensor([[1,2,3],[4,5,6],...])),
])
predictions = data | RunInference(keyed_model_handler)
より詳細な例については、ノートブック複数の異なるトレーニング済みモデルでML推論を実行するを参照してください。
複数のモデルを同時にロードすると、メモリ不足エラー(OOM)のリスクが高まります。デフォルトでは、KeyedModelHandler
はメモリにロードされるモデルの数を制限しません同時に。モデルがすべてメモリに収まらない場合、パイプラインはメモリ不足エラーで失敗する可能性があります。この問題を回避するには、max_models_per_worker_hint
パラメーターを使用して、メモリに同時にロードできるモデルの最大数を設定します。
次の例では、SDKワーカープロセスごとに最大2つのモデルを同時にロードします。現在使用されていないモデルはアンロードされます。
mhs = [
KeyModelMapping(['key1'], PytorchModelHandlerTensor(<config1>)),
KeyModelMapping(['key2', 'key3'], PytorchModelHandlerTensor(<config2>)),
KeyModelMapping(['key4'], PytorchModelHandlerTensor(<config3>)),
KeyModelMapping(['key5', 'key6', 'key7'], PytorchModelHandlerTensor(<config4>)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)
特定のマシンに複数のSDKワーカープロセスを持つランナーは、マシンに最大max_models_per_worker_hint * <ワーカープロセス数>
モデルをロードします。
モデルと他の変換からの追加のメモリニーズのために十分なスペースを残してください。モデルがオフロードされた後、メモリがすぐに解放されない可能性があるため、追加のバッファを残すことをお勧めします。
**注**: 多くのモデルがありますが、max_models_per_worker_hint
が小さいと、*メモリのスラッシング*が発生する可能性があります。これは、モデルをメモリに出し入れするために大量の実行時間が使用される場合です。メモリのスラッシングの可能性と影響を軽減するために、分散ランナーを使用している場合は、推論ステップの前にGroupByKey
変換を挿入します。 GroupByKey
変換は、同じキーとモデルを持つ要素が同じワーカーに配置されるようにすることで、スラッシングを削減します。
詳細については、KeyedModelHander
を参照してください。
PredictionResultオブジェクトの使用
Apache Beamで予測を行う場合、出力PCollection
には、入力 examplesのキーと推論の両方が含まれます。これらの両方の項目を出力に含めると、予測を決定した入力を 찾을 수 있습니다。
PredictionResult
オブジェクトは、それぞれexample
とinference
という名前の入力と推論の両方を含むNamedTuple
です。キーが入力データとともにRunInference変換に渡されると、出力PCollection
はキーとPredictionResult
オブジェクトであるTuple[str, PredictionResult]
を返します。パイプラインは、RunInference変換後の手順でPredictionResult
オブジェクトと対話します。
class PostProcessor(beam.DoFn):
def process(self, element: Tuple[str, PredictionResult]):
key, prediction_result = element
inputs = prediction_result.example
predictions = prediction_result.inference
# Post-processing logic
result = ...
yield (key, result)
with pipeline as p:
output = (
p | 'Read' >> beam.ReadFromSource('a_source')
| 'PyTorchRunInference' >> RunInference(<keyed_model_handler>)
| 'ProcessOutput' >> beam.ParDo(PostProcessor()))
このオブジェクトを明示的に使用する必要がある場合は、パイプラインに次の行を含めてオブジェクトをインポートします。
from apache_beam.ml.inference.base import PredictionResult
詳細については、PredictionResult
ドキュメントを参照してください。
自動モデル更新
パイプラインを停止せずにRunInference PTransform
で使用されているモデルを自動的に更新するには、ModelMetadata
サイド入力PCollection
をRunInference入力パラメーターmodel_metadata_pcoll
に渡します。
ModelMetdata
は、次のものを含むNamedTuple
です。
model_id
: モデルの一意の識別子。これは、モデルにアクセスできるファイルパスまたはURLにすることができます。これは、推論のためにモデルをロードするために使用されます。それぞれのModelHandlers
がエラーなしでモデルをロードできるように、URLまたはファイルパスは互換性のある形式である必要があります。たとえば、
PyTorchModelHandler
は、最初に重みとモデルクラスを使用してモデルをロードします。サイド入力を使用してモデルを更新するときに異なるモデルクラスの重みを渡すと、元のモデルクラスの重みが想定されるため、モデルは正しくロードされません。model_name
: モデルの判読可能な名前。この名前を使用して、RunInference変換によって生成されたメトリックでモデルを識別できます。
ユースケース
- MLモデルを自動的に更新するには、
WatchFilePattern
をRunInferencePTransform
へのサイド入力として使用します。詳細については、WatchFilePattern
をサイド入力として使用してRunInferenceでMLモデルを自動更新するを参照してください。
サイド入力PCollection
は、エラーを回避するためにAsSingleton
ビューに従う必要があります。
**注**: メインPCollection
が入力を出力し、サイド入力がまだ入力を受信していない場合、サイド入力が更新されるまでメインPCollection
はバッファされます。これは、AfterCount
やAfterProcessingTime
などのデータ駆動型トリガーを持つグローバルウィンドウ化されたサイド入力で発生する可能性があります。サイド入力が更新されるまで、それぞれのModelHandler
を渡すために使用されるデフォルトまたは初期モデルIDをサイド入力として出力します。
レコードの前処理と後処理
RunInferenceを使用すると、変換に前処理と後処理の操作を追加できます。前処理操作を適用するには、モデルハンドラーでwith_preprocess_fn
を使用します。
inference = pcoll | RunInference(model_handler.with_preprocess_fn(lambda x : do_something(x)))
後処理操作を適用するには、モデルハンドラーでwith_postprocess_fn
を使用します。
inference = pcoll | RunInference(model_handler.with_postprocess_fn(lambda x : do_something_to_result(x)))
複数の前処理および後処理操作を連鎖させることもできます。
inference = pcoll | RunInference(
model_handler.with_preprocess_fn(
lambda x : do_something(x)
).with_preprocess_fn(
lambda x : do_something_else(x)
).with_postprocess_fn(
lambda x : do_something_after_inference(x)
).with_postprocess_fn(
lambda x : do_something_else_after_inference(x)
))
前処理関数は、バッチ処理と推論の前に実行されます。この関数は、入力PCollection
をモデルハンドラーの基本入力タイプにマップします。複数の前処理関数を適用する場合、それらは、最後に適用されたものから最初に適用されたものの順序で元のPCollection
で実行されます。
後処理関数は、推論後に実行されます。この関数は、基本モデルハンドラーの出力タイプを目的の出力タイプにマップします。複数の後処理関数を適用する場合、それらは、最初に適用されたものから最後に適用されたものの順序で元の推論結果で実行されます。
エラー処理
RunInferenceの使用中にエラーを確実に処理するには、*デッドレターキュー*を使用できます。デッドレターキューは、失敗したレコードを別のPCollection
に出力して、さらに処理します。このPCollection
は、分析してストレージシステムに送信し、そこでレビューしてパイプラインに再送信したり、破棄したりできます。 RunInferenceには、デッドレターキューの組み込みサポートがあります。 RunInference変換にwith_exception_handling
を適用することにより、デッドレターキューを使用できます。
main, other = pcoll | RunInference(model_handler).with_exception_handling()
other.failed_inferences | beam.Map(print) # insert logic to handle failed records here
このパターンは、関連する前処理および後処理操作を伴う RunInference 変換にも適用できます。
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
other.failed_preprocessing[0] | beam.Map(print) # handles failed preprocess operations, indexed in the order in which they were applied
other.failed_inferences | beam.Map(print) # handles failed inferences
other.failed_postprocessing[0] | beam.Map(print) # handles failed postprocess operations, indexed in the order in which they were applied
Javaパイプラインからの推論の実行
RunInference API は、Apache Beam の複数言語パイプラインフレームワークを通じて、Beam Java SDK バージョン 2.41.0 以降で使用できます。Java ラッパー変換については、RunInference.java を参照してください。試してみるには、Java Sklearn Mnist 分類の例を参照してください。さらに、Beam Java SDK パイプラインからの前処理と後処理と共に RunInference API を使用する複合 Python 変換の例については、Java SDK からの RunInference の使用を参照してください。
カスタム推論
RunInference API は、現在、Natural Language API や Cloud Vision API などを使用したリモート推論呼び出しをサポートしていません。したがって、これらのリモート API を Apache Beam で使用するには、カスタム推論呼び出しを作成する必要があります。Apache Beam ノートブックでのリモート推論は、beam.DoFn
を使用してカスタム リモート推論呼び出しを実装する方法を示しています。実際のプロジェクトでリモート推論を実装する場合は、次の要素を考慮してください。
API クォータと、外部 API にかかる可能性のある高負荷。外部 API への呼び出しを最適化するために、外部リモート API への並列呼び出しを制限するように
PipelineOptions
を構成できます。障害が発生する可能性があることを想定し、障害を特定して、可能な限り適切に処理できるように準備してください。指数バックオフやデッドレターキュー(未処理メッセージキュー)などの手法を使用します。
外部 API で推論を実行する場合は、より効率的な実行を可能にするために、入力をまとめてバッチ処理します。
デプロイ時にパイプラインのパフォーマンスの監視と測定を検討してください。監視により、アプリケーションの状態と正常性に関する洞察を得ることができます。
マルチモデルパイプライン
RunInference 変換を使用して、パイプラインに複数の推論モデルを追加します。マルチモデルパイプラインは、A/B テストや、トークン化、文分割、品詞タグ付け、固有表現抽出、言語検出、相互参照解決などを実行するモデルで構成されるカスケードモデルの構築に役立ちます。詳細については、マルチモデルパイプラインを参照してください。
A/Bパターン
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>)
model_b_predictions = data | RunInference(<model_handler_B>)
ここで、model_handler_A
と model_handler_B
はモデルハンドラーのセットアップコードです。
カスケードパターン
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>)
model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(<model_handler_B>)
ここで、model_handler_A
と model_handler_B
はモデルハンドラーのセットアップコードです。
異なるモデル要件にリソースヒントを使用する
単一のパイプラインで複数のモデルを使用する場合、モデルによってメモリまたはワーカー SKU の要件が異なる場合があります。リソースヒントを使用すると、パイプラインの各ステップの計算リソース要件に関する情報をランナーに提供できます。
たとえば、次のスニペットは、各 RunInference 呼び出しに RAM とハードウェアアクセラレータの要件を指定するためのヒントを追加することで、前のカスケードパターンを拡張しています。
with pipeline as p:
data = p | 'Read' >> beam.ReadFromSource('a_source')
model_a_predictions = data | RunInference(<model_handler_A>).with_resource_hints(min_ram="20GB")
model_b_predictions = model_a_predictions
| beam.Map(some_post_processing)
| RunInference(<model_handler_B>).with_resource_hints(
min_ram="4GB",
accelerator="type:nvidia-tesla-k80;count:1;install-nvidia-driver")
リソースヒントの詳細については、リソースヒントを参照してください。
モデル検証
モデル検証を使用すると、以前に確認されていないデータセットに対してモデルのパフォーマンスをベンチマークできます。選択したメトリクスを抽出し、視覚化を作成し、メタデータをログに記録し、さまざまなモデルのパフォーマンスを比較して、モデルがデプロイの準備ができているかどうかを検証することを最終目標とします。Beam は、パイプライン内で直接 TensorFlow モデルでモデル評価を実行するためのサポートを提供します。
ML モデル評価ページでは、TensorFlow Model Analysis (TFMA) を使用して、モデル評価をパイプラインの一部として統合する方法を示しています。
トラブルシューティング
パイプラインまたはジョブで問題が発生した場合、このセクションでは発生する可能性のある問題と、それらを修正するための提案を示します。
テンソル要素をバッチ処理できない
RunInference は動的バッチ処理を使用します。ただし、RunInference API は異なるサイズのテンソル要素をバッチ処理できないため、RunInference
変換に渡されるサンプルは同じ次元または長さである必要があります。サイズが異なる画像または長さが異なる単語埋め込みを提供すると、次のエラーが発生する可能性があります。
ファイル "/beam/sdks/python/apache_beam/ml/inference/pytorch_inference.py"、232 行目、run_inference で batched_tensors = torch.stack(key_to_tensor_list[key]) RuntimeError: stack は各テンソルのサイズが等しいことを想定していますが、エントリ 0 で [12]、エントリ 1 で [10] が得られました [ 'PyTorchRunInference/ParDo(_RunInferenceDoFn)' の実行中]
この問題を回避するには、同じサイズの要素を使用するか、バッチ処理を無効にします。
オプション 1: 同じサイズの要素を使用する
同じサイズの要素を使用するか、入力のサイズを変更します。コンピュータビジョンアプリケーションの場合は、画像入力をサイズ変更して、同じ次元になるようにします。長さが異なるテキストを持つ自然言語処理 (NLP) アプリケーションの場合は、テキストまたは単語埋め込みのサイズを変更して、同じ長さになるようにします。長さが異なるテキストを扱う場合、サイズ変更は不可能な場合があります。このシナリオでは、バッチ処理を無効にすることができます(オプション 2 を参照)。
オプション 2: バッチ処理を無効にする
ModelHandler で batch_elements_kwargs
関数をオーバーライドし、最大バッチサイズ (max_batch_size
) を 1 に設定することにより、バッチ処理を無効にします: max_batch_size=1
。詳細については、BatchElements PTransform を参照してください。例については、言語モデリングの例を参照してください。
関連リンク
| ![]() |
最終更新日: 2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!