Apache Kafka to Kafka テンプレート

Apache Kafka to Apache Kafka テンプレートは、Apache Kafka ソースからバイトとしてデータを取り込み、Apache Kafka シンクに書き込むストリーミング パイプラインを作成します。

パイプラインの要件

  • Apache Kafka ソーストピックが存在している。
  • Apache Kafka ソースおよびシンク ブローカー サーバーが動作していて、Dataflow ワーカーマシンから到達可能である。
  • Google Cloud Managed Service for Apache Kafka をソースまたはシンクとして使用している場合は、テンプレートを起動する前にトピックが存在している必要がある。

Kafka メッセージ形式

Apache Kafka ソース メッセージは、バイトとして読み取られ、Apache Kafka シンクに書き込まれます。

認証

Apache Kafka to Apache Kafka テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証と TLS 認証をサポートしています。

テンプレートのパラメータ

必須パラメータ

  • readBootstrapServerAndTopic: 入力を読み取る Kafka ブートストラップ サーバーおよびトピック。例: localhost:9092;topic1,topic2
  • kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は KafkaAuthenticationMethod.NONE、SASL / PLAIN のユーザー名とパスワードの場合は KafkaAuthenticationMethod.SASL_PLAIN、SASL_SCRAM_512 認証の場合は KafkaAuthenticationMethod.SASL_SCRAM_512、証明書ベースの認証の場合は KafkaAuthenticationMethod.TLS を使用します。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS は Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して認証できます。
  • writeBootstrapServerAndTopic: 出力を書き込む Kafka トピック。
  • kafkaWriteAuthenticationMethod: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、SASL_SCRAM_512 ベースの認証の場合は SASL_SCRAM_512、証明書ベースの認証の場合は TLS を使用します。デフォルトは APPLICATION_DEFAULT_CREDENTIALS です。

オプション パラメータ

  • enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
  • consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
  • kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
  • kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例: gs://your-bucket/keystore.jks
  • kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramUsernameSecretId: SASL_SCRAM 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramPasswordSecretId: SASL_SCRAM 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaReadSaslScramTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaReadSaslScramTruststorePasswordSecretId: Kafka SASL_SCRAM 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteUsernameSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaWritePasswordSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
  • kafkaWriteKeystoreLocation: Kafka クラスタの宛先で認証を行うための TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例: gs://<BUCKET>/<KEYSTORE>.jks
  • kafkaWriteTruststoreLocation: 宛先の Kafka ブローカーの ID 検証に使用する信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
  • kafkaWriteTruststorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeystorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
  • kafkaWriteKeyPasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
  8. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \
    --parameters \
readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\
kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • READ_BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka のブートストラップ サーバーのアドレスと読み取るトピック
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスと書き込み先のトピック

    ブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。

    • Managed Service for Apache Kafka クラスタ: projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka クラスタ: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS",
          "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC",
          "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • READ_BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka のブートストラップ サーバーのアドレスと読み取るトピック
  • WRITE_BOOTSTRAP_SERVER_AND_TOPIC: Apache Kafka ブートストラップ サーバーのアドレスと書き込み先のトピック

    ブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。

    • Managed Service for Apache Kafka クラスタ: projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
    • 外部 Kafka クラスタ: BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME

次のステップ