Apache Kafka to Apache Kafka テンプレートは、Apache Kafka ソースからバイトとしてデータを取り込み、Apache Kafka シンクに書き込むストリーミング パイプラインを作成します。
パイプラインの要件
- Apache Kafka ソーストピックが存在している。
- Apache Kafka ソースおよびシンク ブローカー サーバーが動作していて、Dataflow ワーカーマシンから到達可能である。
- Google Cloud Managed Service for Apache Kafka をソースまたはシンクとして使用している場合は、テンプレートを起動する前にトピックが存在している必要がある。
Kafka メッセージ形式
Apache Kafka ソース メッセージは、バイトとして読み取られ、Apache Kafka シンクに書き込まれます。
認証
Apache Kafka to Apache Kafka テンプレートは、Kafka ブローカーに対する SASL / PLAIN 認証と TLS 認証をサポートしています。
テンプレートのパラメータ
必須パラメータ
- readBootstrapServerAndTopic : 入力を読み取る Kafka ブートストラップ サーバーおよびトピック。(例: localhost:9092;topic1,topic2)。
- kafkaReadAuthenticationMode: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL/PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、証明書ベースの認証の場合は TLS を使用します。APPLICATION_DEFAULT_CREDENTIALS は、アプリケーションのデフォルト認証情報を使用して Google Cloud Apache Kafka for BigQuery で認証できるため、Google Cloud Apache Kafka for BigQuery クラスタにのみ使用する必要があります。
- writeBootstrapServerAndTopic: 出力を書き込む Kafka トピック。
- kafkaWriteAuthenticationMethod: Kafka クラスタで使用する認証モード。認証なしの場合は NONE、SASL / PLAIN のユーザー名とパスワードの場合は SASL_PLAIN、証明書ベースの認証の場合は TLS を使用します。デフォルトは APPLICATION_DEFAULT_CREDENTIALS です。
オプション パラメータ
- enableCommitOffsets: 処理済みメッセージのオフセットを Kafka に commit します。有効にすると、パイプライン再開時のメッセージの処理のギャップや重複を最小限に抑えることができます。コンシューマー グループ ID を指定する必要があります。デフォルトは false です。
- consumerGroupId: このパイプラインが属するコンシューマー グループの固有識別子。Kafka へのオフセット commit が有効な場合は必須です。デフォルトは空です。
- kafkaReadOffset: commit されたオフセットが存在しない場合にメッセージを読み始めるポイント。最も古いメッセージから始まり、最新のメッセージが最後になります。デフォルトは latest です。
- kafkaReadUsernameSecretId: SASL_PLAIN 認証で使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID(例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>)。デフォルトは空です。
- kafkaReadPasswordSecretId: SASL_PLAIN 認証で使用する Kafka パスワードを含む Google Cloud Secret Manager のシークレットの ID例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
- kafkaReadKeystoreLocation: Kafka クラスタで認証を行う際に使用する TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス例: gs://your-bucket/keystore.jks。
- kafkaReadTruststoreLocation: Kafka ブローカー ID を確認するための信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaReadTruststorePasswordSecretId: Kafka TLS 認証用に Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
- kafkaReadKeystorePasswordSecretId: Kafka TLS 認証用に Java KeyStore(JKS)ファイルにアクセスするためのパスワードが含まれる Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
- kafkaReadKeyPasswordSecretId: Kafka TLS 認証用の Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
- kafkaWriteUsernameSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka ユーザー名を含む Google Cloud Secret Manager のシークレットの ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
- kafkaWritePasswordSecretId: Kafka クラスタの宛先との SASL_PLAIN 認証に使用する Kafka パスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。デフォルトは空です。
- kafkaWriteKeystoreLocation: Kafka クラスタの宛先で認証を行うための TLS 証明書と秘密鍵を含む Java KeyStore(JKS)ファイルの Google Cloud Storage パス。例: gs://
- kafkaWriteTruststoreLocation: 宛先の Kafka ブローカーの ID 検証に使用する信頼された証明書を含む Java TrustStore(JKS)ファイルの Google Cloud Storage パス。
- kafkaWriteTruststorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java TrustStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
- kafkaWriteKeystorePasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイルにアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
- kafkaWriteKeyPasswordSecretId: 宛先 Kafka クラスタでの TLS 認証で Java KeyStore(JKS)ファイル内の秘密鍵にアクセスするためのパスワードを含む Google Cloud Secret Manager シークレット ID。例: projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>。
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ the Kafka to Cloud Storage template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- (省略可)1 回限りの処理から 1 回以上のストリーミング モードに切り替えるには、[1 回以上] を選択します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ outputTableSpec=BIGQUERY_TABLE,\ inputTopics=KAFKA_TOPICS,\ javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\ javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\ bootstrapServers=KAFKA_SERVER_ADDRESSES
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名REGION_NAME
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BIGQUERY_TABLE
: 実際の Cloud Storage テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI(例:gs://my-bucket/my-udfs/my_file.js
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "outputTableSpec": "BIGQUERY_TABLE", "inputTopics": "KAFKA_TOPICS", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "bootstrapServers": "KAFKA_SERVER_ADDRESSES" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
次のように置き換えます。
PROJECT_ID
: Dataflow ジョブを実行する Google Cloud プロジェクトの IDJOB_NAME
: 一意の任意のジョブ名LOCATION
: Dataflow ジョブをデプロイするリージョン(例:us-central1
)VERSION
: 使用するテンプレートのバージョン使用できる値は次のとおりです。
latest
: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。- バージョン名(例:
2023-09-12-00_RC00
)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
BIGQUERY_TABLE
: 実際の Cloud Storage テーブル名KAFKA_TOPICS
: Apache Kakfa トピックリスト。複数のトピックを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。PATH_TO_JAVASCRIPT_UDF_FILE
: 使用する JavaScript ユーザー定義関数(UDF)を定義する.js
ファイルの Cloud Storage URI(例:gs://my-bucket/my-udfs/my_file.js
)JAVASCRIPT_FUNCTION
: 使用する JavaScript ユーザー定義関数(UDF)の名前たとえば、JavaScript 関数が
myTransform(inJson) { /*...do stuff...*/ }
の場合、関数名はmyTransform
です。JavaScript UDF の例については、UDF の例をご覧ください。KAFKA_SERVER_ADDRESSES
: Apache Kafka ブローカー サーバーの IP アドレスリスト。各 IP アドレスには、サーバーからアクセス可能なポート番号を付加する必要があります。例:35.70.252.199:9092
。複数のアドレスを指定する場合は、カンマをエスケープする必要があります。gcloud topic escaping
をご覧ください。
詳細については、Dataflow で Kafka から Cloud Storage にデータを書き込むをご覧ください。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。