Google BigQueryパターン
このページのサンプルは、BigQueryIOで使用する一般的なパターンを示しています。
- Java SDK
- Python SDK
BigQueryIOデッドレターパターン
本番システムでは、BigQueryIOによって処理中にエラーが発生した要素を別のPCollectionに出力して、さらに処理を行うことで、デッドレターパターンを実装することが役立ちます。以下のサンプルはエラーを出力しますが、本番システムでは、後で修正するためにデッドレターテーブルに送信できます。
STREAMING_INSERTS
を使用する場合は、WriteResult
オブジェクトを使用して、BigQueryに挿入できなかったTableRows
を含むPCollection
にアクセスできます。また、withExtendedErrorInfo
プロパティを設定した場合、WriteResult
からPCollection<BigQueryInsertError>
にアクセスできます。PCollection
には、テーブル、データ行、InsertErrors
への参照が含まれます。デッドレターキューに追加されるエラーは、InsertRetryPolicy
によって決定されます。
結果のタプルでは、失敗した挿入にアクセスするためにFailedRows
にアクセスできます。
PipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
Pipeline p = Pipeline.create(options);
// Create a bug by writing the 2nd value as null. The API will correctly
// throw an error when trying to insert a null value into a REQUIRED field.
WriteResult result =
p.apply(Create.of(1, 2))
.apply(
BigQueryIO.<Integer>write()
.withSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema()
.setName("num")
.setType("INTEGER")
.setMode("REQUIRED"))))
.to("Test.dummyTable")
.withFormatFunction(x -> new TableRow().set("num", (x == 2) ? null : x))
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
// Forcing the bounded pipeline to use streaming inserts
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
// set the withExtendedErrorInfo property.
.withExtendedErrorInfo()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
result
.getFailedInsertsWithErr()
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(" The table was " + x.getTable());
System.out.println(" The row was " + x.getRow());
System.out.println(" The error was " + x.getError());
return "";
}));
p.run();
/* Sample Output From the pipeline:
<p>The table was GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=Test,projectId=<>, tableId=dummyTable}}
<p>The row was GenericData{classInfo=[f], {num=null}}
<p>The error was GenericData{classInfo=[errors, index],{errors=[GenericData{classInfo=[debugInfo, location, message, reason], {debugInfo=,location=, message=Missing required field: Msg_0_CLOUD_QUERY_TABLE.num., reason=invalid}}],index=0}}
*/
}
# Create pipeline.
schema = ({'fields': [{'name': 'a', 'type': 'STRING', 'mode': 'REQUIRED'}]})
pipeline = beam.Pipeline()
errors = (
pipeline | 'Data' >> beam.Create([1, 2])
| 'CreateBrokenData' >>
beam.Map(lambda src: {'a': src} if src == 2 else {'a': None})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
"<Your Project:Test.dummy_a_table",
schema=schema,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND'))
result = (
errors['FailedRows']
| 'PrintErrors' >>
beam.FlatMap(lambda err: print("Error Found {}".format(err))))
最終更新日:2024年10月31日
探しているものが見つかりましたか?
すべて役に立ち、分かりやすかったですか?変更したいことはありますか?お知らせください!