PTransform スタイルガイド

新しい再利用可能なPTransformを作成する際のスタイルガイド。

言語に依存しない考慮事項

一貫性

既存の慣習と整合性を保つ

PTransformの公開 vs その他

Beamパイプラインで人々が使用するライブラリ(サードパーティシステムへのコネクタ、機械学習アルゴリズムなど)を開発したいと考えています。どのように公開する必要がありますか?

実施事項

実施しないこと

命名

実施事項

実施しないこと

設定

設定と入力コレクションの区別

公開するパラメータ

実施事項

実施しないこと

エラー処理

変換設定エラー

早期にエラーを検出します。エラーは次の段階で検出できます。

例:

ランタイムエラーとデータ整合性

何よりもデータ整合性を優先します。データの損失または破損を隠蔽しないでください。データの損失を防ぐことができない場合は、失敗します。

実施事項

実施しないこと

パフォーマンス

多くのランナーは、`ParDo`の連鎖を、各入力要素につき少量から中程度の要素を出力する、または要素ごとの処理が比較的安価な`ParDo`の場合(例:Dataflowの「融合」)、パフォーマンスを向上させる方法で最適化しますが、これらの仮定が満たされない場合は並列化を制限します。その場合、「融合ブレーク」(`Reshuffle.of()`)が必要になる可能性があり、`ParDo`の出力`PCollection`の処理の並列化が向上します。

ドキュメント

変換の構成方法(コード例を示す)、および入力に関する期待する保証または出力に関する提供する保証を、Beamモデルを考慮して文書化します。例:

ロギング

変換のユーザーが遭遇する可能性のある異常な状況を予測します。デバッグに十分な情報をログに記録しますが、ログのボリュームを制限します。これはすべてのプログラムに適用されるアドバイスですが、データボリュームが膨大で実行が分散されている場合は特に重要です。

実施事項

実施しないこと

テスト

データ処理は複雑で、多くの特殊なケースがあり、デバッグが困難です。これは、パイプラインの実行に時間がかかるため、出力が正しいかどうかを確認するのが難しく、デバッガーを接続できず、多くの場合、データの大量のため、希望するほど多くログに記録できないためです。そのため、テストは特に重要です。

変換のランタイム動作のテスト

変換の構築と検証のテスト

変換の構築と検証のためのコードは通常自明であり、ほとんどが定型的なものです。ただし、その中の小さな間違いやタイプミスは、重大な結果(例:ユーザーが設定したプロパティを無視する)をもたらす可能性があるため、テストする必要があります。しかし、過剰な量の自明なテストは、保守が難しく、変換が十分にテストされているという誤った印象を与える可能性があります。

実施事項

実施しないこと

互換性

実施事項

実施しないこと

Java固有の考慮事項

以下のほとんどのプラクティスの良い例は、`JdbcIO`と`MongoDbIO`です。

API

入力と出力PCollectionの型の選択

可能な限り、変換の性質に固有の型を使用してください。必要に応じて、独自の型から変換`DoFn`を使用してラップできます。例:DatastoreコネクタはDatastore`Entity`型を使用する必要があります。MongoDbコネクタは、JSONの文字列表現ではなく、Mongo`Document`型を使用する必要があります。

それが不可能な場合があります(例:JDBCはBeamと互換性のある(Coderでエンコード可能な)「JDBCレコード」データ型を提供しません)。その場合、変換固有の型とBeamと互換性のある型間の変換を行う関数をユーザーに提供させます(例:`JdbcIO`と`MongoDbGridFSIO`を参照)。

変換が、まだJavaクラスが存在しない複合型を論理的に返す必要がある場合、適切な名前のフィールドを持つ新しいPOJOクラスを作成します。汎用タプルクラスまたはKVは、フィールドが正当にキーと値である場合を除いて使用しないでください。

複数の出力コレクションを持つ変換

変換が複数のコレクションを返す必要がある場合、それはPTransform<..., PCollectionTuple>でなければならず、各コレクションに対してgetBlahTag()メソッドを公開する必要があります。

例えば、PCollection<Foo>PCollection<Bar>を返す場合は、TupleTag<Foo> getFooTag()TupleTag<Bar> getBarTag()を公開します。

例:

public class MyTransform extends PTransform<..., PCollectionTuple> {
  private final TupleTag<Moo> mooTag = new TupleTag<Moo>() {};
  private final TupleTag<Blah> blahTag = new TupleTag<Blah>() {};
  ...
  PCollectionTuple expand(... input) {
    ...
    PCollection<Moo> moo = ...;
    PCollection<Blah> blah = ...;
    return PCollectionTuple.of(mooTag, moo)
                           .and(blahTag, blah);
  }

  public TupleTag<Moo> getMooTag() {
    return mooTag;
  }

  public TupleTag<Blah> getBlahTag() {
    return blahTag;
  }
  ...
}

設定のためのFluentビルダー

変換クラスを不変にし、変更された不変オブジェクトを生成するメソッドを使用します。AutoValueを使用します。AutoValueは、ビルダーヘルパークラスを提供できます。プリミティブ型(例:int)を除き、デフォルト値がないか、デフォルト値がnullであるクラス型の引数をマークするには、@Nullableを使用します。

@AutoValue
public abstract static class MyTransform extends PTransform<...> {
  int getMoo();
  @Nullable abstract String getBlah();

  abstract Builder toBuilder();

  @AutoValue.Builder
  abstract static class Builder {
    abstract Builder setMoo(int moo);
    abstract Builder setBlah(String blah);

    abstract MyTransform build();
  }
  ...
}
ファクトリメソッド

引数のない単一の静的ファクトリメソッドを、包含クラス内(「変換ファミリの梱包」を参照)または変換クラス自体に用意します。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }

  public abstract static class Twiddle extends PTransform<...> { ... }
}

// or:
public abstract static class TwiddleThumbs extends PTransform<...> {
  public static TwiddleThumbs create() {
    return new AutoValue_Thumbs_Twiddle.Builder().build();
  }
  ...
}

例外:変換に非常に重要な単一の引数がある場合、ファクトリメソッドofを呼び出し、引数をファクトリメソッドの引数に入れます。例:ParDo.of(DoFn).withAllowedLateness()

パラメータを設定するためのFluentビルダーメソッド

それらをwithBlah()と呼びます。すべてのビルダーメソッドは、まったく同じ型を返す必要があります。それがパラメータ化された(ジェネリックな)型である場合、型パラメータの値は同じです。

withBlah()メソッドをキーワード引数の順序付けられていない集合として扱います。結果は、withFoo()withBar()を呼び出す順序に依存してはなりません(例:withBar()はfooの現在の値を読み取ってはなりません)。

withBlahメソッドの影響を文書化します。このメソッドをいつ使用するべきか、どのような値が許可されているか、デフォルト値は何であるか、値を変更することの影響は何であるか。

/**
 * Returns a new {@link TwiddleThumbs} transform with moo set
 * to the given value.
 *
 * <p>Valid values are 0 (inclusive) to 100 (exclusive). The default is 42.
 *
 * <p>Higher values generally improve throughput, but increase chance
 * of spontaneous combustion.
 */
public Twiddle withMoo(int moo) {
  checkArgument(moo >= 0 && moo < 100,
      "Thumbs.Twiddle.withMoo() called with an invalid moo of %s. "
      + "Valid values are 0 (inclusive) to 100 (exclusive)",
      moo);
  return toBuilder().setMoo(moo).build();
}
パラメータのデフォルト値

ファクトリメソッドで指定します(ファクトリメソッドはデフォルト値を持つオブジェクトを返します)。

public class Thumbs {
  public static Twiddle twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder().setMoo(42).build();
  }
  ...
}
複数のパラメータを再利用可能なオブジェクトにパッケージ化

変換のいくつかのパラメータが非常に密接に論理的に結合されている場合、それらをコンテナオブジェクトにカプセル化することが理にかなう場合があります。このコンテナオブジェクトにも同じガイドラインを使用します(不変にする、ビルダーを含むAutoValueを使用する、withBlah()メソッドを文書化するなど)。例として、JdbcIO.DataSourceConfigurationを参照してください。

型パラメータを持つ変換

すべての型パラメータは、ファクトリメソッドで明示的に指定する必要があります。ビルダーメソッド(withBlah())は型を変更してはなりません。

public class Thumbs {
  public static Twiddle<T> twiddle() {
    return new AutoValue_Thumbs_Twiddle.Builder<T>().build();
  }

  @AutoValue
  public abstract static class Twiddle<T>
       extends PTransform<PCollection<Foo>, PCollection<Bar<T>>> {
    
    @Nullable abstract Bar<T> getBar();

    abstract Builder<T> toBuilder();

    @AutoValue.Builder
    abstract static class Builder<T> {
      
      abstract Builder<T> setBar(Bar<T> bar);

      abstract Twiddle<T> build();
    }
    
  }
}

// User code:
Thumbs.Twiddle<String> twiddle = Thumbs.<String>twiddle();
// Or:
PCollection<Bar<String>> bars = foos.apply(Thumbs.<String>twiddle()  );

例外:変換に最も重要なパラメータが1つあり、そのパラメータが型Tに依存する場合、それをファクトリメソッドに直接入れる方が優先されます。例:Combine.globally(SerializableFunction<Iterable<V>,V>)。これにより、Javaの型推論が向上し、ユーザーは型パラメータを明示的に指定する必要がなくなります。

変換に2つ以上の型パラメータがある場合、またはパラメータの意味があまり明確でない場合、型パラメータをSomethingTのように命名します。例:分類アルゴリズムを実装し、各入力要素にラベルを割り当てるPTransformは、Classify<InputT, LabelT>として型指定される場合があります。

ユーザー指定の動作の注入

変換に、ユーザーコードによってカスタマイズされる動作の側面がある場合、次のように決定します。

実施事項

実施しないこと

変換群のパッケージ化

密接に関連する変換のファミリー(例:異なる方法で同じシステムと対話する、または同じ上位タスクの異なる実装を提供する)を開発する場合は、最上位クラスを名前空間として使用し、個々のユースケースに対応する変換を返す複数のファクトリメソッドを使用します。

コンテナクラスはprivateコンストラクタを持たなければならないため、直接インスタンス化することはできません。

FooIOレベルで共通のものを文書化し、各ファクトリメソッドを個別に文書化します。

/** Transforms for clustering data. */
public class Cluster {
  // Force use of static factory methods.
  private Cluster() {}

  /** Returns a new {@link UsingKMeans} transform. */
  public static UsingKMeans usingKMeans() { ... }
  public static Hierarchically hierarchically() { ... }

  /** Clusters data using the K-Means algorithm. */
  public static class UsingKMeans extends PTransform<...> { ... }
  public static class Hierarchically extends PTransform<...> { ... }
}

public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static Read read() { ... }
  ...

  public static class Read extends PTransform<...> { ... }
  public static class Write extends PTransform<...> { ... }
  public static class Delete extends PTransform<...> { ... }
  public static class Mutate extends PTransform<...> { ... }
}

互換性のないAPIを持つ複数のバージョンをサポートする場合は、バージョンも名前空間のようなクラスとして使用し、異なるAPIバージョンの実装を異なるファイルに配置します。

// FooIO.java
public class FooIO {
  // Force use of static factory methods.
  private FooIO() {}

  public static FooV1 v1() { return new FooV1(); }
  public static FooV2 v2() { return new FooV2(); }
}

// FooV1.java
public class FooV1 {
  // Force use of static factory methods outside the package.
  FooV1() {}
  public static Read read() { ... }
  public static class Read extends PTransform<...> { ... }
}

// FooV2.java
public static class FooV2 {
  // Force use of static factory methods outside the package.
  FooV2() {}
  public static Read read() { ... }

  public static class Read extends PTransform<...> { ... }
}

動作

不変性

シリアライゼーション

DoFnPTransformCombineFnなどのインスタンスはシリアライズされます。シリアライズされたデータの量を最小限に抑えます。シリアライズしたくないフィールドをtransientとしてマークします。可能な限りクラスをstaticにします(インスタンスが包含クラスインスタンスをキャプチャしてシリアライズしないようにするため)。注:場合によっては、匿名クラスを使用できないことを意味します。

検証

@AutoValue
public abstract class TwiddleThumbs
    extends PTransform<PCollection<Foo>, PCollection<Bar>> {
  abstract int getMoo();
  abstract String getBoo();

  ...
  // Validating individual parameters
  public TwiddleThumbs withMoo(int moo) {
    checkArgument(
        moo >= 0 && moo < 100,
        "Moo must be between 0 (inclusive) and 100 (exclusive), but was: %s",
        moo);
    return toBuilder().setMoo(moo).build();
  }

  public TwiddleThumbs withBoo(String boo) {
    checkArgument(boo != null, "Boo can not be null");
    checkArgument(!boo.isEmpty(), "Boo can not be empty");
    return toBuilder().setBoo(boo).build();
  }

  @Override
  public void validate(PipelineOptions options) {
    int woo = options.as(TwiddleThumbsOptions.class).getWoo();
    checkArgument(
       woo > getMoo(),
      "Woo (%s) must be smaller than moo (%s)",
      woo, getMoo());
  }

  @Override
  public PCollection<Bar> expand(PCollection<Foo> input) {
    // Validating that a required parameter is present
    checkArgument(getBoo() != null, "Must specify boo");

    // Validating a combination of parameters
    checkArgument(
        getMoo() == 0 || getBoo() == null,
        "Must specify at most one of moo or boo, but was: moo = %s, boo = %s",
        getMoo(), getBoo());

    ...
  }
}

コーダー

Coderは、Beamランナーが中間データを具体化するか、必要に応じてワーカー間で送信する方法です。Coderの特定のバイナリエンコーディングは、そのプライベートな実装の詳細となることを意図しているため、Coderを汎用APIとしてバイナリ形式の解析や書き込みに使用しないでください。

型のデフォルトコーダーを提供する

すべての新しいデータ型に対してデフォルトのCoderを提供します。@DefaultCoderアノテーションまたは@AutoServiceでアノテーションされたCoderProviderRegistrarクラスを使用します。例については、SDKでのこれらのクラスの使用例を参照してください。パフォーマンスが重要でない場合は、SerializableCoderまたはAvroCoderを使用できます。そうでない場合は、効率的なカスタムコーダーを開発します(具体的な型にはAtomicCoderを、ジェネリック型にはStructuredCoderをサブクラス化します)。

出力コレクションにコーダーを設定する

PTransformによって作成されたすべてのPCollection(出力コレクションと中間コレクションの両方)には、Coderが設定されている必要があります。ユーザーが.setCoder()を呼び出して、PTransformによって生成されたPCollectionのコーダーを「修正」する必要はありません(実際、Beamは最終的にsetCoderを非推奨にする予定です)。場合によっては、コーダーの推論でこれを実現できます。他の場合では、変換はコレクションに対してsetCoderを明示的に呼び出す必要があります。

コレクションが具体的な型である場合、その型には通常、対応するコーダーがあります。SerializableCoderなどの汎用コーダーではなく、特定の最も効率的なコーダー(例:文字列にはStringUtf8Coder.of()、バイト配列にはByteArrayCoder.of()など)を使用します。

コレクションの型にジェネリック型変数が含まれる場合、状況はより複雑になります。