組み込み I/O 変換

Web API I/O コネクタ

Beam SDK には、REST や gRPC などの Web API との読み取りと書き込みをサポートする RequestResponseIO という組み込み変換が含まれています。

以下の説明は Java SDK に焦点を当てています。Python の例は今後追加される予定です。トラッカーの問題を参照してください:#30422。さらに、Go SDK のサポートはまだ利用できません。トラッカーの問題を参照してください:#30423

RequestResponseIO の機能

この変換が提供する機能には、次のものがあります。

このガイドは現在、上記の最初の 2 つの箇条書き、最小限のコード要件とエラー処理に焦点を当てています。将来的には、追加機能の例を示すように拡張される可能性があります。追加のリソースへのリンクを以下に示します。

追加のリソース

始める前に

RequestResponseIO を使用するには、Gradlebuild.gradle(.kts) ファイルまたは Mavenpom.xml ファイルに依存関係を追加します。利用可能なバージョンについては Maven Central を参照してください。

以下は、Beam BOM と Beam コアなどの関連する依存関係を build.gradle(.kts) ファイルに追加する例を示しています。

// Apache Beam BOM
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom
implementation("org.apache.beam:beam-sdks-java-bom:2.60.0")

// Beam Core SDK
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core
implementation("org.apache.beam:beam-sdks-java-core")

// RequestResponseIO dependency
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio
implementation("org.apache.beam:beam-sdks-java-io-rrio")

Maven を使用する場合、pom.xml ファイルにアーティファクトの依存関係を追加します。

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-rrio</artifactId>
    <version>2.60.0</version>
</dependency>

RequestResponseIO の基本

最小限のコード

Web API から読み取るまたはWeb API に書き込むために必要な最小限のコードは次のとおりです。

  1. Caller の実装。
  2. RequestResponseIO のインスタンス化。

発信者の実装

Caller は、1 つのメソッドオーバーライドのみが必要です。call メソッドで、API と対話し、リクエストをレスポンスに変換します。この変換の DoFn は、その DoFn.ProcessElement メソッド内でこのメソッドを呼び出します。変換は、失敗したリクエストの繰り返しと指数バックオフ(後述)を含むその他すべてを処理します。

// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse.
class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> {

    @Override
    public MyResponse call(MyRequest request) throws UserCodeExecutionException {

        // Do something with request and return the response.

    }

}

RequestResponseIO のインスタンス化

RequestResponseIO の使用は、以下のように簡単です。前述のように、最小限で 2 つの параметр が必要です。Caller と、期待されるレスポンスの Coder です。(注:Beam Coder の概念が初めての場合は、この件に関する Apache Beam プログラミングガイド を参照してください。このガイドには、以下の例もあります。)

RequestResponseIO 変換は、失敗と成功したレスポンスの PCollection をバンドルする Result を返します。Beam では、これを 追加出力 パターンと呼び、通常は少しの定型コードが必要ですが、変換によって処理されます。変換を使用して、Result::getFailuresResult::getResponses を介して成功と失敗の PCollection を取得できます。

以下は、変換がパイプラインでどのように機能するかを示す簡略化されたスニペットです。

// Step 1. Define the Coder for the response.
Coder<MyResponse> responseCoder = ...

// Step 2. Build the request PCollection.
PCollection<MyRequest> requests = ...

// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection.
Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));

// Step 4a. Do something with the responses.
result.getResponses().apply( ... );

// Step 4b. Apply failures to a dead letter sink.
result.getFailures().apply( ... );

RequestResponseIO は、各リクエストに対して Caller を呼び出すために必要なその他すべてを処理します。生の HTTP 呼び出しを行うか、クライアントコードを使用するかに関係なく、Caller 内で何を行うかは関係ありません。このガイドでは、後でこの設計のテストにおける利点について説明します。

API 呼び出しの繰り返しと失敗

前述のように、RequestResponseIO は、Caller からの結果である成功と失敗の両方の PCollection をバンドルする Result を返します。このセクションでは、失敗の処理と、バックオフによる API 呼び出しの繰り返しに関する詳細について説明します。

失敗の処理

失敗は、ログ変換または後で分析とトラブルシューティングのためにエラーをダウンストリームシンクに保存する変換に適用できる ApiIOError PCollection です。

ApiIOError は既に Beam スキーマにマップされているため、Beam の既存のほとんどの I/O コネクタと互換性があります。(注:Beam スキーマの概念が初めての場合は、Beam プログラミングガイド を参照してください。)たとえば、レコードを TableRow に最初に変換することなく、以下に示すように、分析とトラブルシューティングのために ApiIOError レコードを BigQuery に簡単に送信できます。

static void writeFailuresToBigQuery(
    PCollection<ApiIOError> failures,
    TableReference tableReference,
    BigQueryIO.Write.CreateDisposition createDisposition,
    BigQueryIO.Write.WriteDisposition writeDisposition) {

  // PCollection<ApiIOError> failures = ...
  // TableReference tableReference = ...
  // BigQueryIO.Write.CreateDisposition createDisposition = ...
  // BigQueryIO.Write.WriteDisposition writeDisposition = ...

  failures.apply(
      "Dead letter",
      BigQueryIO.<ApiIOError>write()
          .useBeamSchema()
          .to(tableReference)
          .withCreateDisposition(createDisposition)
          .withWriteDisposition(writeDisposition));
}

API 呼び出しの繰り返しとバックオフ

失敗`PCollection`に出力する前に、この変換処理は、規定された指数バックオフの後、**特定のエラー**に対して再試行を行います。変換処理にバックオフによる再試行を指示するには、`Caller`が特定のエラーをスローする必要があります。UserCodeExecutionExceptionをスローすると、エラーはすぐに`ApiIOError` `PCollection`に出力されます。

`RequestResponseIO`は、`Caller`が以下のエラーをスローした場合、バックオフを伴う再試行を試みます。

規定回数を超えて再試行が行われた場合、エラーは失敗`PCollection`に出力されます。

テスト

`RequestResponseIO`は`Caller`の実装内容を気にしないため、テストがより容易になります。一部のテストでは、実際のAPIへの直接呼び出しに依存する代わりに(外部リソースに依存することになります)、テストロジックに従ってレスポンスを返すか、例外をスローする`Caller`のバージョンを実装するだけで済みます。例えば、特定のレスポンス(空のレコードなど)に対するパイプラインの下流ステップをテストしたい場合、次のように簡単に実行できます。Beamパイプラインのテストに関する詳細は、Beamプログラミングガイドを参照してください。

@Test
void givenEmptyResponse_thenExpectSomething() {
    // Test expects PTransform underTest should do something as a result of empty records, for example.
    PTransform<Iterable<String>, ?> underTest = ...

    PCollection<String> requests = pipeline.apply(Create.of("aRequest"));
    IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
    Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder);

    PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...)

    pipeline.run();
}

// MockEmptyIterableResponse simulates when there are no results from the API.
class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> {
@Override
    public Iterable<String> call(String request) throws UserCodeExecutionException {
        return Collections.emptyList();
    }
}

実践的な例

以下は、エンドツーエンドのBeamパイプラインで統合する2つの例を示しています。このパイプラインの目的は、画像をダウンロードし、Vertex AI上のGeminiを使用して画像の内容を認識することです。

この例は、現在のAI/MLソリューションに取って代わるものではありません。BeamとAI/MLの使用方法の詳細については、AI/MLパイプラインの開始方法を参照してください。

HTTP 呼び出しの直接操作

まず、画像をダウンロードする必要があります。そのためには、画像URLへのHTTP呼び出しを行い、そのコンテンツを`PCollection`に出力して、Gemini APIで使用できるようにする必要があります。この例自体の価値は、`RequestResponseIO`を使用して生のHTTPリクエストを行う方法を示している点にあります。

発信者の定義

`ImageRequest`を受け取り`ImageResponse`を返す`Caller`である`HttpImageClient`を実装します。

デモ目的で、この例ではKVを使用して、`KV`を含む返された`ImageResponse`に生のURLを保持しています。

省略されたスニペット

以下は、重要な部分を示した`HttpImageClient`の省略バージョンです。

class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

    private static final HttpRequestFactory REQUEST_FACTORY =
        new NetHttpTransport().createRequestFactory();

    @Override
    public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException {

        ImageRequest request = requestKV.getValue();
        GenericUrl url = new GenericUrl(request.getImageUrl());
        HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
        HttpResponse response = imageRequest.execute();

        return KV.of(
            requestKV.getKey(),
            ImageResponse
                .builder()
                // Build ImageResponse from HttpResponse
                .build()
        );
    }

}
完全な例

HTTPレスポンスコードに基づいて様々な例外をスローする様子を示した完全な実装を以下に示します。

/**
 * Implements {@link Caller} to process an {@link ImageRequest} into an {@link ImageResponse} by
 * invoking the HTTP request.
 */
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {

  private static final int STATUS_TOO_MANY_REQUESTS = 429;
  private static final int STATUS_TIMEOUT = 408;
  private static final HttpRequestFactory REQUEST_FACTORY =
      new NetHttpTransport().createRequestFactory();

  static HttpImageClient of() {
    return new HttpImageClient();
  }

  /**
   * Invokes an HTTP Get request from the {@param request}, returning an {@link ImageResponse}
   * containing the image data.
   */
  @Override
  public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    ImageRequest request = requestKV.getValue();
    Preconditions.checkArgument(request != null);
    GenericUrl url = new GenericUrl(request.getImageUrl());

    try {
      HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
      HttpResponse response = imageRequest.execute();

      if (response.getStatusCode() >= 500) {
        // Tells transform to repeat the request.
        throw new UserCodeRemoteSystemException(response.getStatusMessage());
      }

      if (response.getStatusCode() >= 400) {

        switch (response.getStatusCode()) {
          case STATUS_TOO_MANY_REQUESTS:
            // Tells transform to repeat the request.
            throw new UserCodeQuotaException(response.getStatusMessage());

          case STATUS_TIMEOUT:
            // Tells transform to repeat the request.
            throw new UserCodeTimeoutException(response.getStatusMessage());

          default:
            // Tells the tranform to emit immediately into failure PCollection.
            throw new UserCodeExecutionException(response.getStatusMessage());
        }
      }

      InputStream is = response.getContent();
      byte[] bytes = ByteStreams.toByteArray(is);

      return KV.of(
          key,
          ImageResponse.builder()
              .setMimeType(request.getMimeType())
              .setData(ByteString.copyFrom(bytes))
              .build());

    } catch (IOException e) {

      // Tells the tranform to emit immediately into failure PCollection.
      throw new UserCodeExecutionException(e);
    }
  }
}

リクエストの定義

`ImageRequest`は、上記の例で定義されている、画像を取得するHTTP呼び出しを呼び出すために`HttpImageClient`に提供するカスタムリクエストです。この例ではGoogle AutoValueを使用していますが、`String`、`Double`などの固有のJavaクラスを含む、任意のBeam `PCollection`で使用するカスタムの`Serializable` Javaクラスを使用できます。便宜上、この例では`@DefaultSchema(AutoValueSchema.class)`を使用することで、そのゲッターに基づいてカスタムタイプをBeamスキーマに自動的にマッピングできます。

/** An HTTP request for an image. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageRequest implements Serializable {

  static final TypeDescriptor<ImageRequest> TYPE = TypeDescriptor.of(ImageRequest.class);
  private static final Map<String, String> EXT_MIMETYPE_MAP =
      ImmutableMap.of(
          "jpg", "image/jpeg",
          "jpeg", "image/jpeg",
          "png", "image/png");

  /** Derive the MIME type of the image from the url based on its extension. */
  private static String mimeTypeOf(String url) {
    String ext = FileNameUtils.getExtension(url);
    if (!EXT_MIMETYPE_MAP.containsKey(ext)) {
      throw new IllegalArgumentException(
          String.format("could not map extension to mimetype: ext %s of url: %s", ext, url));
    }
    return EXT_MIMETYPE_MAP.get(ext);
  }

  static Builder builder() {
    return new AutoValue_ImageRequest.Builder();
  }

  /** Build an {@link ImageRequest} from a {@param url}. */
  static ImageRequest of(String url) {
    return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
  }

  /** The URL of the image request. */
  abstract String getImageUrl();

  /** The MIME type of the image request. */
  abstract String getMimeType();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setImageUrl(String value);

    abstract Builder setMimeType(String value);

    abstract ImageRequest build();
  }
}

レスポンスの定義

`ImageResponse`は、上記の例で定義されている、リモートサーバーを画像URLで呼び出した結果として画像データを含む`HttpImageClient`から返すカスタムレスポンスです。繰り返しますが、この例ではGoogle AutoValueを使用していますが、`String`、`Double`などの固有のJavaクラスを含む、任意のBeam `PCollection`で使用するカスタムの`Serializable` Javaクラスを使用できます。

/** An HTTP response of an image request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageResponse implements Serializable {

  static Builder builder() {
    return new AutoValue_ImageResponse.Builder();
  }

  /** The MIME type of the response payload. */
  abstract String getMimeType();

  /** The payload of the response containing the image data. */
  abstract ByteString getData();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMimeType(String value);

    abstract Builder setData(ByteString value);

    abstract ImageResponse build();
  }
}

レスポンスコーダーの定義

`RequestResponseIO`は、レスポンスのCoderを2番目の必須パラメーターとして必要とします(下記の例を参照)。Beam Coderの詳細については、Beamプログラミングガイドを参照してください。

/** A {@link CustomCoder} of an {@link ImageResponse}. */
class ImageResponseCoder extends CustomCoder<ImageResponse> {
  public static ImageResponseCoder of() {
    return new ImageResponseCoder();
  }

  private static final Coder<byte[]> BYTE_ARRAY_CODER = ByteArrayCoder.of();
  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();

  @Override
  public void encode(ImageResponse value, OutputStream outStream)
      throws CoderException, IOException {
    BYTE_ARRAY_CODER.encode(value.getData().toByteArray(), outStream);
    STRING_CODER.encode(value.getMimeType(), outStream);
  }

  @Override
  public ImageResponse decode(InputStream inStream) throws CoderException, IOException {
    byte[] data = BYTE_ARRAY_CODER.decode(inStream);
    String mimeType = STRING_CODER.decode(inStream);
    return ImageResponse.builder().setData(ByteString.copyFrom(data)).setMimeType(mimeType).build();
  }
}

URL から画像データの取得

以下は、エンドツーエンドのパイプラインですべてを統合する方法の例を示しています。画像URLのリストから、`HttpImageClient` `Caller`実装を使用してインスタンス化された`RequestResponseIO`に適用される`ImageRequest`の`PCollection`を構築します。

`Result`の`getFailures`ゲッターからアクセスできる失敗はすべて、ログに出力されます。既に説明したように、これらの失敗をデータベースまたはファイルシステムに書き込むことができます。

  /** Example demonstrating downloading a list of image URLs using {@link RequestResponseIO}. */
  static void readFromGetEndpointExample(List<String> urls, Pipeline pipeline) {
    //        Pipeline pipeline = Pipeline.create();
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Convert the list of URLs to a PCollection of ImageRequests.
    PCollection<KV<String, ImageRequest>> requests = Images.requestsOf(urls, pipeline);

    // Step 2: RequestResponseIO requires a Coder as its second parameter.
    KvCoder<String, ImageResponse> responseCoder =
        KvCoder.of(StringUtf8Coder.of(), ImageResponseCoder.of());

    // Step 3: Process ImageRequests using RequestResponseIO instantiated from the Caller
    // implementation and the expected PCollection response Coder.
    Result<KV<String, ImageResponse>> result =
        requests.apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), responseCoder));

    // Step 4: Log any failures to stderr.
    result.getFailures().apply("logErrors", Log.errorOf());

    // Step 5: Log output to stdout.
    Images.displayOf(result.getResponses()).apply("logResponses", Log.infoOf());
  }

以下のパイプライン出力は、ダウンロードされた画像の概要、そのURL、MIMEタイプ、サイズを表示します。

KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130}
KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671}
KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809}
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752}

API クライアントコードの使用

最後の例では、HTTPリクエストを直接呼び出す方法を示しました。しかし、`Caller`実装内で使用するクライアントコードを提供するAPIサービスもあります。Beam内でクライアントコードを使用すると、シリアル化という独自の課題が発生します。さらに、一部のクライアントコードでは、設定と終了に関して明示的な処理が必要です。

`RequestResponseIO`は、これらのシナリオのために`SetupTeardown`という追加のインターフェースを処理できます。

SetupTeardownインターフェースには、setupメソッドとteardownメソッドの2つのメソッドしかありません。

interface SetupTeardown {
    void setup() throws UserCodeExecutionException;
    void teardown() throws UserCodeExecutionException;
}

この変換処理は、それぞれ`DoFn`の@Setupメソッドと@Teardownメソッド内でこれらのsetupメソッドとteardownメソッドを呼び出します。

この変換処理は、前述のガイドで説明したように、スローされた例外にも依存するバックオフによる再試行も同様に処理します。

SetupTeardown を使用した発信者の定義

以下は、`SetupTeardown`インターフェースの使用方法に加えて、必要な`Caller`を使用して、Vertex AI Gemini JavaクライアントをBeamパイプラインで動作するように適合させた例です。上記の単純なHTTP例よりもボイラープレートコードが少し多くなります。

省略されたスニペット

重要な部分を抜粋した省略されたスニペットを以下に示します。

`setup`メソッドは、`GeminiAIClient`が`VertexAI`と`GenerativeModel`をインスタンス化し、`teardown`中に`VertexAI`を閉じる場所です。最後に、その`call`メソッドは上記のHTTP例と似ており、リクエストを受け取り、それを利用してAPIを呼び出し、レスポンスを返します。

class GeminiAIClient implements
    Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
    SetupTeardown {

    @Override
    public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
    throws UserCodeExecutionException {
        GenerateContentResponse response = client.generateContent(request.getContentsList());
        return KV.of(requestKV.getKey(), response);
    }

    @Override
    public void setup() throws UserCodeExecutionException {
        vertexAI = new VertexAI(getProjectId(), getLocation());
        client = new GenerativeModel(getModelName(), vertexAI);
    }

    @Override
    public void teardown() throws UserCodeExecutionException {
        vertexAI.close();
    }
}
完全な例

完全な例を以下に示します。この例では重要な点として、`com.google.cloud.vertexai.VertexAI`と`com.google.cloud.vertexai.generativeai.GenerativeModel`はシリアライズ可能ではないため、`transient`でインスタンス化する必要があります。Javaプロジェクトでhttps://checkerframework.org/を使用していない場合、`@MonotonicNonNull`は無視できます。

/**
 * Example {@link Caller} and {@link SetupTeardown} implementation for use with {@link
 * RequestResponseIO} to process Gemini AI {@link GenerateContentRequest}s into {@link
 * GenerateContentResponse}s.
 */
@AutoValue
abstract class GeminiAIClient
    implements Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
        SetupTeardown {

  static Builder builder() {
    return new AutoValue_GeminiAIClient.Builder();
  }

  static final String MODEL_GEMINI_PRO = "gemini-pro";
  static final String MODEL_GEMINI_PRO_VISION = "gemini-pro-vision";

  private transient @MonotonicNonNull VertexAI vertexAI;
  private transient @MonotonicNonNull GenerativeModel client;

  @Override
  public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
      throws UserCodeExecutionException {

    String key = requestKV.getKey();
    GenerateContentRequest request = requestKV.getValue();

    if (request == null) {
      throw new UserCodeExecutionException("request is empty");
    }

    if (request.getContentsList().isEmpty()) {
      throw new UserCodeExecutionException("contentsList is empty");
    }

    try {

      GenerateContentResponse response =
          checkStateNotNull(client).generateContent(request.getContentsList());

      return KV.of(key, response);

    } catch (IOException e) {
      throw new UserCodeExecutionException(e);
    }
  }

  @Override
  public void setup() throws UserCodeExecutionException {
    vertexAI = new VertexAI(getProjectId(), getLocation());
    client = new GenerativeModel(getModelName(), vertexAI);
  }

  @Override
  public void teardown() throws UserCodeExecutionException {
    if (vertexAI != null) {
      vertexAI.close();
    }
  }

  abstract String getModelName();

  abstract String getProjectId();

  abstract String getLocation();

  @AutoValue.Builder
  abstract static class Builder {

    abstract Builder setModelName(String name);

    abstract Optional<String> getModelName();

    abstract Builder setProjectId(String value);

    abstract Builder setLocation(String value);

    abstract GeminiAIClient autoBuild();

    final GeminiAIClient build() {
      if (!getModelName().isPresent()) {
        setModelName(MODEL_GEMINI_PRO);
      }
      return autoBuild();
    }
  }

Gemini AI に画像の識別を依頼する

では、画像の取得に関する前の例をこのGemini AIクライアントと組み合わせて、画像の識別を依頼してみましょう。

以下は、以前見た例ですが、便利なメソッドにカプセル化されています。URLの`List`を受け取り、画像データを含む`ImageResponse`の`PCollection`を返します。

  /**
   * Processes a list of raw image URLs into a {@link ImageResponse} {@link PCollection} using
   * {@link RequestResponseIO}. The resulting {@link KV#getKey} is the original image URL.
   */
  static Result<KV<String, ImageResponse>> imagesOf(List<String> urls, Pipeline pipeline) {

    Coder<KV<String, ImageResponse>> kvCoder = KvCoder.of(STRING_CODER, ImageResponseCoder.of());

    return requestsOf(urls, pipeline)
        .apply(
            ImageResponse.class.getSimpleName(),
            RequestResponseIO.of(HttpImageClient.of(), kvCoder));
  }

次に、`ImageResponse`を`GenerateContentRequest`の`PCollection`に変換します。

    // PCollection<KV<Struct, ImageResponse>> imagesKV = ...

    return imagesKV
        .apply(
            stepName,
            MapElements.into(requestKVType)
                .via(
                    kv -> {
                      String key = kv.getKey();
                      ImageResponse safeResponse = checkStateNotNull(kv.getValue());
                      ByteString data = safeResponse.getData();
                      return buildAIRequest(key, prompt, data, safeResponse.getMimeType());
                    }))
        .setCoder(kvCoder);

最後に、上記の`GeminiAIClient`を使用してインスタンス化された`RequestResponseIO`に`GenerateContentRequest`の`PCollection`を適用します。`RequestResponseIO.of`ではなく、`RequestResponseIO.ofCallerAndSetupTeardown`を使用していることに注意してください。`ofCallerAndSetupTeardown`メソッドは、コンパイラに`Caller`インターフェースと`SetupTeardown`インターフェースの両方の実装を提供していることを伝えるだけです。

    //    PCollection<KV<Struct, GenerateContentRequest>> requestKV = ...
    //    GeminiAIClient client =
    //            GeminiAIClient.builder()
    //                    .setProjectId(options.getProject())
    //                    .setLocation(options.getLocation())
    //                    .setModelName(MODEL_GEMINI_PRO_VISION)
    //                    .build();

    return requestKV.apply(
        "Ask Gemini AI", RequestResponseIO.ofCallerAndSetupTeardown(client, responseCoder));

完全なエンドツーエンドパイプラインを以下に示します。

  /** Demonstrates using Gemini AI to identify a images, acquired from their URLs. */
  static void whatIsThisImage(List<String> urls, GeminiAIOptions options) {
    //        GeminiAIOptions options = PipelineOptionsFactory.create().as(GeminiAIOptions.class);
    //        options.setLocation("us-central1");
    //        options.setProjectId("your-google-cloud-project-id");
    //
    //
    //        List<String> urls = ImmutableList.of(
    //                "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
    //                "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
    //                "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
    //        );

    // Step 1: Instantiate GeminiAIClient, the Caller and SetupTeardown implementation.
    GeminiAIClient client =
        GeminiAIClient.builder()
            .setProjectId(options.getProject())
            .setLocation(options.getLocation())
            .setModelName(MODEL_GEMINI_PRO_VISION)
            .build();

    Pipeline pipeline = Pipeline.create(options);

    // Step 2: Download the images from the list of urls.
    Result<KV<String, ImageResponse>> getImagesResult = Images.imagesOf(urls, pipeline);

    // Step 3: Log any image download errors.
    getImagesResult.getFailures().apply("Log get images errors", Log.errorOf());

    // Step 4: Build Gemini AI requests from the download image data with the prompt 'What is this
    // picture?'.
    PCollection<KV<String, GenerateContentRequest>> requests =
        buildAIRequests("Identify Image", "What is this picture?", getImagesResult.getResponses());

    // Step 5: Using RequestResponseIO, ask Gemini AI 'What is this picture?' for each downloaded
    // image.
    Result<KV<String, GenerateContentResponse>> responses = askAI(client, requests);

    // Step 6: Log any Gemini AI errors.
    responses.getFailures().apply("Log AI errors", Log.errorOf());

    // Step 7: Log the result of Gemini AI's image recognition.
    responses.getResponses().apply("Log AI answers", Log.infoOf());

    pipeline.run();
  }

以下は、完全なパイプラインを実行したときの出力の略で、Gemini AIが画像を識別した結果を示しています。

KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, candidates {
    content {
        role: "model"
        parts {
            text: " This is a picture of a chocolate bar."
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, candidates {
    content {
        role: "model"
        parts {
            text: " The picture is a dog walking application form. It has two sections, one for information
                    about the dog and one for information about the owner. The dog\'s name is Fido,
                    he is a Cavoodle, and he is black and tan. He is 3 years old and has a friendly
                    temperament. The owner\'s name is Mark, and his phone number is 0491570006. He would
                    like Fido to be walked once a week on Tuesdays and Thursdays in the morning."
        }
    }
}

KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg
    content {
        role: "model"
        parts {
            text: " The picture shows a basket of croissants. Croissants are a type of pastry that is made
                    from a yeast-based dough that is rolled and folded several times in the rising process.
                    The result is a light, flaky pastry that is often served with butter, jam, or chocolate.
                    Croissants are a popular breakfast food and can also be used as a dessert or snack."
        }
    }
}