スキーマパターン
このページのサンプルは、スキーマを使用した一般的なパターンについて説明しています。スキーマは、特定のプログラミング言語の型とは独立した、Beamレコードの型システムを提供します。複数のJavaクラスが同じスキーマを持つ場合があります(たとえば、Protocol BufferクラスまたはPOJOクラス)。Beamでは、これらの型間をシームレスに変換できます。スキーマは、異なるプログラミング言語API間で型について推論する簡単な方法も提供します。詳細については、スキーマに関するプログラミングガイドセクションを参照してください。
- Java SDK
結合の使用
Beamは、結合条件がフィールドのサブセットの等価性に依存するスキーマ`PCollection`の等結合をサポートしています。
関連する情報を持つ複数のコレクションがあり、それらの構造が既知である場合は、`Join`の使用を検討してください。
たとえば、ユーザーデータを持つ2つの異なるコレクションがあるとします。1つのコレクションには名前とメールアドレスが含まれ、もう1つのコレクションには名前と電話番号が含まれています。名前を共通キーとして、他のデータを関連付けられた値として使用して、2つのコレクションを結合できます。結合後、各名前に関連付けられたすべての情報(メールアドレスと電話番号)を含む1つのコレクションが作成されます。
次の概念例では、2つの入力コレクションを使用して`Join`のメカニズムを示します。
まず、スキーマとユーザーデータを定義します。
// Define Schemas
Schema emailSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("email", Schema.FieldType.STRING));
Schema phoneSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("phone", Schema.FieldType.STRING));
// Create User Data Collections
final List<Row> emailUsers =
Arrays.asList(
Row.withSchema(emailSchema).addValue("person1").addValue("person1@example.com").build(),
Row.withSchema(emailSchema).addValue("person2").addValue("person2@example.com").build(),
Row.withSchema(emailSchema).addValue("person3").addValue("person3@example.com").build(),
Row.withSchema(emailSchema).addValue("person4").addValue("person4@example.com").build(),
Row.withSchema(emailSchema)
.addValue("person6")
.addValue("person6@example.com")
.build());
final List<Row> phoneUsers =
Arrays.asList(
Row.withSchema(phoneSchema).addValue("person1").addValue("111-222-3333").build(),
Row.withSchema(phoneSchema).addValue("person2").addValue("222-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person3").addValue("444-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person4").addValue("555-333-4444").build(),
Row.withSchema(phoneSchema).addValue("person5").addValue("777-333-4444").build());
次に、ユーザーデータの`PCollection`を作成し、`Join`を使用して2つの`PCollection`で結合を実行します。
// Create/Read Schema PCollections
PCollection<Row> emailList =
p.apply("CreateEmails", Create.of(emailUsers).withRowSchema(emailSchema));
PCollection<Row> phoneList =
p.apply("CreatePhones", Create.of(phoneUsers).withRowSchema(phoneSchema));
// Perform Join
PCollection<Row> resultRow =
emailList.apply("Apply Join", Join.<Row, Row>innerJoin(phoneList).using("name"));
// Preview Result
resultRow.apply(
"Preview Result",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
System.out.println(x);
return "";
}));
/* Sample Output From the pipeline:
Row:[Row:[person1, person1@example.com], Row:[person1, 111-222-3333]]
Row:[Row:[person2, person2@example.com], Row:[person2, 222-333-4444]]
Row:[Row:[person4, person4@example.com], Row:[person4, 555-333-4444]]
Row:[Row:[person3, person3@example.com], Row:[person3, 444-333-4444]]
*/
結果の`Row`は`Row:[Row(emailSchema)、Row(phoneSchema)]`型であり、以下のコードスニペットに示すように、目的の形式に変換できます。
PCollection<String> result =
resultRow.apply(
"Format Output",
MapElements.into(TypeDescriptors.strings())
.via(
x -> {
String userInfo =
"Name: "
+ x.getRow(0).getValue("name")
+ " Email: "
+ x.getRow(0).getValue("email")
+ " Phone: "
+ x.getRow(1).getValue("phone");
System.out.println(userInfo);
return userInfo;
}));
/* Sample output From the pipeline
Name: person4 Email: person4@example.com Phone: 555-333-4444
Name: person2 Email: person2@example.com Phone: 222-333-4444
Name: person3 Email: person3@example.com Phone: 444-333-4444
Name: person1 Email: person1@example.com Phone: 111-222-3333
*/
最終更新日:2024/10/31
お探しのものはすべて見つかりましたか?
すべて役に立ち、明確でしたか?変更したいことはありますか?お知らせください!