Apache Kafka to Apache Kafka テンプレートは、Apache Kafka ソースからバイトとしてデータを取り込み、Apache Kafka シンクに書き込むストリーミング パイプラインを作成します。
パイプラインの要件
- Apache Kafka ソーストピックが存在している。
- Apache Kafka ソースおよびシンク ブローカー サーバーが動作していて、Dataflow ワーカーマシンから到達可能である。
- Google Cloud Managed Service for Apache Kafka をソースまたはシンクとして使用している場合は、テンプレートを起動する前にトピックが存在している必要がある。
Kafka メッセージ形式
Apache Kafka ソース メッセージは、バイトとして読み取られ、Apache Kafka シンクに書き込まれます。
認証
Apache Kafka to Apache Kafka テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証と TLS 認証をサポートしています。
テンプレートのパラメータ
必須パラメータ
- readBootstrapServerAndTopic: 入力を読み取る Kafka ブートストラップ サーバーおよびトピック。例:
localhost:9092;topic1,topic2
- kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は
KafkaAuthenticationMethod.NONE
、SASL / PLAIN のユーザー名とパスワードの場合はKafkaAuthenticationMethod.SASL_PLAIN
、SASL_SCRAM_512 認証の場合はKafkaAuthenticationMethod.SASL_SCRAM_512
、証明書ベースの認証の場合はKafkaAuthenticationMethod.TLS
を使用します。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
は Google Cloud Apache Kafka for BigQuery クラスタでのみ使用できます。これにより、アプリケーションのデフォルト認証情報を使用して認証できます。 - writeBootstrapServerAndTopic: 出力を書き込む Kafka トピック。
- kafkaWriteAuthenticationMethod: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、SASL_SCRAM_512 ベースの認証の場合は SASL_SCRAM_512、証明書ベースの認証の場合は TLS を使用します。デフォルトは APPLICATION_DEFAULT_CREDENTIALS です。
オプション パラメータ
- enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
- consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
- kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
- kafkaReadUsernameSecretId:
SASL_PLAIN
認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaReadPasswordSecretId:
SASL_PLAIN
認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例:
gs://your-bucket/keystore.jks
- kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramUsernameSecretId:
SASL_SCRAM
認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramPasswordSecretId:
SASL_SCRAM
認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaReadSaslScramTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadSaslScramTruststorePasswordSecretId: Kafka SASL_SCRAM 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaWriteUsernameSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaWritePasswordSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。デフォルトは空です。 - kafkaWriteKeystoreLocation: Kafka クラスタの宛先で認証を行うための TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例:
gs://<BUCKET>/<KEYSTORE>.jks
- kafkaWriteTruststoreLocation: 宛先の Kafka ブローカーの ID 検証に使用する信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaWriteTruststorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaWriteKeystorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
- kafkaWriteKeyPasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager のシークレット ID。例:
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
READ_BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka のブートストラップ サーバーのアドレスと読み取るトピックWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka ブートストラップ サーバーのアドレスと書き込み先のトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクト IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
READ_BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka のブートストラップ サーバーのアドレスと読み取るトピックWRITE_BOOTSTRAP_SERVER_AND_TOPIC
: Apache Kafka ブートストラップ サーバーのアドレスと書き込み先のトピックブートストラップ サーバーのアドレスとトピックの形式は、クラスタのタイプによって異なります。
- Managed Service for Apache Kafka クラスタ:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka クラスタ:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka クラスタ:
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。