Google BigQueryパターン

このページのサンプルは、BigQueryIOで使用する一般的なパターンを示しています。

BigQueryIOデッドレターパターン

本番システムでは、BigQueryIOによって処理中にエラーが発生した要素を別のPCollectionに出力して、さらに処理を行うことで、デッドレターパターンを実装することが役立ちます。以下のサンプルはエラーを出力しますが、本番システムでは、後で修正するためにデッドレターテーブルに送信できます。

STREAMING_INSERTSを使用する場合は、WriteResultオブジェクトを使用して、BigQueryに挿入できなかったTableRowsを含むPCollectionにアクセスできます。また、withExtendedErrorInfoプロパティを設定した場合、WriteResultからPCollection<BigQueryInsertError>にアクセスできます。PCollectionには、テーブル、データ行、InsertErrorsへの参照が含まれます。デッドレターキューに追加されるエラーは、InsertRetryPolicyによって決定されます。

結果のタプルでは、失敗した挿入にアクセスするためにFailedRowsにアクセスできます。

      PipelineOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

      Pipeline p = Pipeline.create(options);

      // Create a bug by writing the 2nd value as null. The API will correctly
      // throw an error when trying to insert a null value into a REQUIRED field.
      WriteResult result =
          p.apply(Create.of(1, 2))
              .apply(
                  BigQueryIO.<Integer>write()
                      .withSchema(
                          new TableSchema()
                              .setFields(
                                  ImmutableList.of(
                                      new TableFieldSchema()
                                          .setName("num")
                                          .setType("INTEGER")
                                          .setMode("REQUIRED"))))
                      .to("Test.dummyTable")
                      .withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                      // Forcing the bounded pipeline to use streaming inserts
                      .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                      // set the withExtendedErrorInfo property.
                      .withExtendedErrorInfo()
                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

      result
          .getFailedInsertsWithErr()
          .apply(
              MapElements.into(TypeDescriptors.strings())
                  .via(
                      x -> {
                        System.out.println(" The table was " + x.getTable());
                        System.out.println(" The row was " + x.getRow());
                        System.out.println(" The error was " + x.getError());
                        return "";
                      }));
      p.run();

      /*  Sample Output From the pipeline:
       <p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
       <p>The row was GenericData{classInfo=[f], {num=null}}
       <p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
      */
    }
  # Create pipeline.
  schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})

  pipeline = beam.Pipeline()

  errors = (
      pipeline | 'Data' >> beam.Create([1, 2])
      | 'CreateBrokenData' >>
      beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
          "<Your Project:Test.dummy_a_table",
          schema=schema,
          insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
          create_disposition='CREATE_IF_NEEDED',
          write_disposition='WRITE_APPEND'))
  result = (
      errors['FailedRows']
      | 'PrintErrors' >>
      beam.FlatMap(lambda err: print("Error Found {}".format(err))))