Kafka から Pub/Sub Lite に移行する

このドキュメントは、セルフマネージド Apache Kafka から Pub/Sub Lite への移行を検討している場合に役立ちます。

Pub/Sub Lite の概要

Pub/Sub Lite は、運用コストを低く抑えるために構築された大容量のメッセージング サービスです。Pub/Sub Lite は、ゾーン ストレージとリージョン ストレージに加えて、事前プロビジョニングされた容量を提供します。Pub/Sub Lite では、ゾーン Lite トピックまたはリージョン Lite トピックを選択できます。リージョン Lite トピックは、Pub/Sub トピックと同じ可用性 SLA を提供します。ただし、メッセージ レプリケーションの観点から、Pub/Sub と Pub/Sub Lite には信頼性の違いがあります。

Pub/Sub と Pub/Sub Lite の詳細については、Pub/Sub とはをご覧ください。

Lite でサポートされているリージョンとゾーンの詳細については、Pub/Sub Lite のロケーションをご覧ください。

Pub/Sub Lite の用語

Pub/Sub Lite の主な用語を次に示します。

  • メッセージ。 Pub/Sub Lite サービスを通じて移動するデータ。

  • トピック。メッセージのフィードを示す名前付きリソース。Pub/Sub Lite 内で、ゾーンまたはリージョンの Lite トピックの作成を選択できます。Pub/Sub Lite リージョン トピックでは、単一リージョンの 2 つのゾーンにデータが保存されます。Pub/Sub Lite ゾーン トピックでは、データは 1 つのゾーン内でのみ複製されます。

  • 予約。リージョン内の複数の Lite トピックで共有されるスループット容量の名前付きプール。

  • サブスクリプション。特定の Lite トピックからメッセージを受信することへの関心を表す、名前付きのリソース。サブスクリプションは、単一のトピックにのみ接続する Kafka のコンシューマー グループに似ています。

  • サブスクライバー。Lite トピックと指定したサブスクリプションからメッセージを受信する Pub/Sub Lite のクライアント。サブスクリプションには複数のサブスクライバー クライアントを含めることができます。この場合、メッセージはサブスクライバー クライアント間でロードバランスされます。Kafka では、サブスクライバーはコンシューマーと呼ばれます。

  • パブリッシャー。メッセージを作成して特定の Lite トピックに送信(パブリッシュ)するアプリケーション。トピックは複数のパブリッシャーを持つことができます。 Kafka では、パブリッシャーはプロデューサーと呼ばれます。

Kafka と Pub/Sub Lite の違い

Pub/Sub Lite は概念的に Kafka に似ていますが、データの取り込みに重点を置いた、より幅の狭い API を持つ別のシステムです。これらの違いはストリームの取り込みと処理には重要ではありませんが、その違いが重要となるユースケースがいくつかあります。

データベースとしての Kafka

現在 Pub/Sub Lite では、Kafka とは異なり、トランザクションのパブリッシュやログの圧縮はサポートされていませんが、べき等性はサポートされています。これらの Kafka 機能は、Kafka をメッセージ システムよりもデータベースとして使用する場合に役立ちます。Kafka をデータベースとして使用する場合は、独自の Kafka クラスタを実行するか、Confluent Cloud などのマネージド Kafka ソリューションを使用することを検討してください。どちらのソリューションも使用できない場合は、Cloud Spanner のような水平スケーリング可能なデータベースの使用を検討してください。

Kafka ストリーム

Kafka Streams は、Kafka の上に構築されたデータ処理システムです。コンシューマー クライアントを挿入できますが、すべての管理者オペレーションにアクセスする必要があります。Kafka Streams は、内部メタデータの保存にも Kafka のトランザクション データベース プロパティを使用します。そのため、現時点では Pub/Sub Lite を Kafka ストリーム アプリケーションに使用することはできません。

Apache Beam は、Kafka、Pub/Sub、Pub/Sub Lite と統合された同様のストリーミング データ処理システムです。Beam パイプラインは Dataflow を使用したフルマネージドの方法、または既存の Apache Flink クラスタおよび Apache Spark クラスタで実行できます。

モニタリング

Kafka クライアントはサーバーサイドの指標を読み取ることができます。Pub/Sub Lite では、パブリッシャーとサブスクライバーの動作に関連する指標は、追加の構成をすることなく Cloud Monitoring で管理されます。

容量管理

Kafka トピックの容量は、クラスタの容量によって決まります。 レプリケーション、キー圧縮、バッチ設定によって、Kafka クラスタ上の特定のトピックの処理に必要な容量が決まります。Kafka トピックのスループットは、ブローカーが実行されているマシンの容量によって制限されます。一方、Pub/Sub Lite トピックにはストレージ容量とスループット容量の両方を定義する必要があります。 Pub/Sub Lite のストレージ容量は、トピックの構成可能なプロパティです。スループット容量は、構成された予約の容量と、パーティションごとの固有の上限または構成済み上限に基づいています。

認証とセキュリティ

Apache Kafka は、いくつかのオープン認証メカニズムと暗号化メカニズムをサポートしています。Pub/Sub Lite では、認証は IAM システムに基づいています。保存時と転送中の暗号化により、セキュリティが確保されます。Pub/Sub Lite の認証の詳細については、このドキュメントの後半の移行ワークフロー セクションをご覧ください。

Kafka プロパティを Pub/Sub Lite プロパティにマッピングする

Kafka には、トピック構造、上限、ブローカー プロパティを制御する多くの構成オプションがあります。このセクションでは、データの取り込みに役立つ一般的なプロパティと、Pub/Sub Lite での同等のプロパティについて説明します。Pub/Sub Lite はマネージド システムであるため、多くのブローカー プロパティを考慮する必要はありません。

Topic の構成プロパティ

Kafka プロパティ Pub/Sub Lite プロパティ 説明
retention.bytes パーティションあたりのストレージ Lite トピック内のすべてのパーティションには、同じストレージ容量が構成されています。Lite トピックのストレージ容量は、トピック内のすべてのパーティションのストレージ容量の合計です。
retention.ms メッセージ保持期間。 Lite トピックでメッセージが保存される最長時間。メッセージ保持期間を指定しなかった場合、Lite トピックはストレージ容量を超過するまでメッセージを保存します。
flush.msacks Pub/Sub Lite では構成不可 パブリッシュは、複製されたストレージに保持されることが保証されるまで確認応答されません。
max.message.bytes Pub/Sub Lite では構成不可 Pub/Sub Lite に送信できるメッセージの最大サイズは 3.5 MiB です。メッセージ サイズは繰り返し可能な方法で計算されます。
message.timestamp.type Pub/Sub Lite では構成不可 コンシューマー実装を使用する場合、イベント タイムスタンプが存在する場合はそれが選択されるか、代わりにパブリッシュ タイムスタンプが使用されます。Beam を使用する場合、パブリッシュ タイムスタンプとイベント タイムスタンプの両方を使用できます。

Lite トピックのプロパティの詳細については、Lite トピックのプロパティをご覧ください。

Producer の構成プロパティ

Pub/Sub Lite では Producer ワイヤ プロトコルがサポートされています。一部のプロパティは、Producer の Cloud クライアント ライブラリの動作を変更します。次の表で、一般的なプロパティについて説明します。

Kafka プロパティ Pub/Sub Lite プロパティ 説明
auto.create.topics.enable Pub/Sub Lite では構成不可 Pub/Sub Lite で、1 つのトピックについて、消費者グループとほぼ同じトピックとサブスクリプションを作成します。コンソール、gcloud CLI、API、または Cloud クライアント ライブラリを使用できます。
key.serializervalue.serializer Pub/Sub Lite では構成不可

ワイヤ プロトコルを使用して通信する Kafka Producer または同等のライブラリを使用する場合は必須です。

バッチサイズ Pub/Sub Lite でのサポート バッチ処理がサポートされています。最適なパフォーマンスを得るには、この値の推奨値は 10 MiB です。
linger.ms Pub/Sub Lite でのサポート バッチ処理がサポートされています。最高のパフォーマンスを得るには、この値の推奨値は 50 ミリ秒です。
max.request.size Pub/Sub Lite でのサポート サーバーで、バッチあたり 20 MiB の制限が適用されます。この値は、Kafka クライアントで 20 MiB 未満に設定します。
enable.idempotence Pub/Sub Lite でのサポート
compression.type Pub/Sub Lite ではサポートされていない この値は、明示的に none に設定する必要があります。

Consumer の構成プロパティ

Pub/Sub Lite では Consumer ワイヤ プロトコルがサポートされています。一部のプロパティは、コンシューマの Cloud クライアント ライブラリの動作を変更します。次の表で、一般的なプロパティについて説明します。

Kafka プロパティ 説明
key.deserializervalue.deserializer

ワイヤ プロトコルを使用して通信する Kafka Consumer または同等のライブラリを使用する場合は必須です。

auto.offset.reset この構成はサポートされていません。また、必要もありません。サブスクリプションが作成されると、オフセットのロケーションが必ず定義されます。
message.timestamp.type パブリッシュ タイムスタンプは Pub/Sub Lite から常に利用可能で、パーティション ベースごとに減少しないことが保証されています。イベント タイムスタンプは、パブリッシュ時にメッセージに添付されたかどうかによって、存在する場合と存在しない場合があります。Dataflow を使用する場合、パブリッシュ タイムスタンプとイベント タイムスタンプの両方を同時に使用できます。
max.partition.fetch.bytesmax.poll.records poll() 呼び出しから返されるレコードとバイトの数、および内部取得リクエストから返されるバイト数にソフトリミットを設定します。デフォルトの 1 MiB の「max.partition.fetch.bytes」では、クライアントのスループットが制限される場合があるため、この値を増やすことを検討してください。

Kafka と Pub/Sub Lite の機能を比較する

次の表は、Apache Kafka の機能と Pub/Sub Lite の機能を比較したものです。

機能 Kafka Pub/Sub Lite
メッセージの順序指定
メッセージの重複除去 ○(Dataflow を使用)
プッシュ サブスクリプション × ○(Pub/Sub エクスポートを使用)
トランザクション ×
メッセージ ストレージ 使用可能なマシン ストレージによる制限 無制限
メッセージの再生
ロギングとモニタリング セルフマネージド Cloud Monitoring により自動化
ストリーム処理 ○(Kafka StreamsApache BeamDataproc を使用)。 ○(Beam または Dataproc を使用)。

次の表では、Kafka を使用した自己ホスト型の機能と、Pub/Sub Lite を使用した Google が管理する機能を比較します。

機能 Kafka Pub/Sub Lite
可用性 Kafka を他のロケーションに手動でデプロイする。 世界中にデプロイされています。場所をご覧ください。
障害復旧 独自のバックアップとレプリケーションを設計して維持する。 Google で管理。
インフラストラクチャ管理 仮想マシン(VM)またはマシンを手動でデプロイして運用する。一貫したバージョニングとパッチを維持する。 Google で管理。
キャパシティ プランニング ストレージとコンピューティングのニーズを事前に計画する。 Google で管理。コンピューティングとストレージはいつでも増やすことができます。
サポート なし。 24 時間対応の待機スタッフとサポート。

Kafka と Pub/Sub Lite の費用の比較

Pub/Sub Lite で費用を見積もったり管理したりする方法は、Kafka とは異なります。オンプレミスまたはクラウド上の Kafka クラスタの費用には、マシン、ディスク、ネットワーク形成、受信メッセージ、送信メッセージの費用が含まれます。また、これらのシステムと関連するインフラストラクチャの管理と維持にかかるオーバーヘッド費用も含まれます。Kafka クラスタを管理する場合は、マシンを手動でアップグレードし、クラスタ容量を計画し、広範な計画とテストを含む障害復旧を実装する必要があります。さまざまな費用をすべて集計して、実際の総所有コスト(TCO)を決定する必要があります。

Pub/Sub Lite の料金には、予約費用(パブリッシュされたバイト数、サブスクライブされたバイト数、Kafka プロキシによって処理されるバイト数)とプロビジョニングされたストレージの費用が含まれます。送信メッセージの料金に加えて、予約したリソースに対して正確に料金を支払います。料金計算ツールを使用して、費用を見積もりできます。

移行ワークフロー

トピックを Kafka クラスタから Pub/Sub Lite に移行するには、次の手順を使用します。

Pub/Sub Lite リソースを構成する

  1. 移行するすべてのトピックの予想スループットに対して Pub/Sub Lite 予約を作成します。

    Pub/Sub Lite の料金計算ツールを使用して、既存の Kafka トピックの総スループット指標を計算します。予約の作成方法については、Lite の予約の作成と管理をご覧ください。

  2. Kafka 内の対応するトピックごとに 1 つの Pub/Sub Lite トピックを作成します。

    Lite トピックの作成方法については、Lite トピックを作成して管理するをご覧ください。

  3. Kafka クラスタ内の対応するコンシューマー グループとトピックのペアごとに 1 つの Pub/Sub Lite サブスクリプションを作成します。

    たとえば、topic-atopic-b から使用する consumers という名前のコンシューマー グループの場合、topic-a に接続されたサブスクリプション consumers-a と、topic-b に接続されたサブスクリプション consumers-b を作成します。サブスクリプションを作成する方法については、Lite サブスクリプションを作成して管理するをご覧ください。

Pub/Sub Lite に対して認証する

Kafka クライアントのタイプに応じて、次のいずれかの方法を選択します。

再ビルドしてバージョン 3.1.0 以降を実行している Java ベースの Kafka クライアント

Kafka クライアントを実行しているインスタンス上で再ビルドできるバージョン 3.1.0 以降の Java ベースの Kafka クライアントの場合。

  1. 次の方法で、com.google.cloud:pubsublite-kafka-auth パッケージをインストールします。

  2. com.google.cloud.pubsublite.kafka.ClientParameters.getParams を使用して、Pub/Sub Lite の認証に必要なパラメータを取得します。

    getParams() メソッド(コードサンプルを参照)によって、次の JAAS 構成と SASL 構成が、Pub/Sub Lite への認証に使用するパラメータとして初期化されます。

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=http://localhost:14293
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    

再ビルドせずにバージョン 3.1.0 以降を実行している Java ベースの Kafka クライアント

KIP-768 をサポートする Kafka クライアントの場合、Python サイドカー スクリプトを使用する構成専用の OAUTHBEARER 認証がサポートされます。 これらのバージョンには、2022 年 1 月の Java バージョン 3.1.0 以降が含まれています。

Kafka クライアントを実行しているインスタンスで次の手順を実施します。

  1. Python 3.6 以降をインストールします。

    Python のインストールをご覧ください。

  2. Google 認証パッケージ pip install google-auth をインストールします。

    このライブラリを使用すると、Google API にアクセスするためのさまざまなサーバー間認証メカニズムを簡素化できます。google-auth ページをご覧ください。

  3. kafka_gcp_credentials.py スクリプトを実行します。

    このスクリプトは、ローカル HTTP サーバーを起動し、google.auth.default() を使用して環境のデフォルトの Google Cloud 認証情報を取得します。

    取得された認証情報のプリンシパルには、使用している Google Cloud プロジェクトと接続先のロケーションに対する pubsublite.locations.openKafkaStream 権限が必要です。Pub/Sub Lite パブリッシャー(roles/pubsublite.publisher)ロールと Pub/Sub Lite サブスクライバー(roles/pubsublite.subscriber)ロールには、この必要な権限があります。これらのロールをプリンシパルに追加します。

    認証情報は、Kafka クライアントの SASL/OAUTHBEARER 認証で使用されます。

    Kafka クライアントから Pub/Sub Lite に対して認証を行うには、プロパティに次のパラメータが必要です。

    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.oauthbearer.token.endpoint.url=localhost:14293
    sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required clientId="unused" clientSecret="unused" \
      extension_pubsubProject="PROJECT_ID";
    

    PROJECT_ID は、Pub/Sub Lite を実行しているプロジェクトの ID に置き換えます。

再ビルドしない他のすべてのクライアント

その他のすべてのクライアントの場合は、次の手順を行います。

  1. クライアントに使用するサービス アカウントのサービス アカウント キーの JSON ファイルをダウンロードします。

  2. base64-encode を使用してサービス アカウント ファイルをエンコードし、認証文字列として使用します。

    Linux または macOS システムでは、次のように base64 コマンド(多くの場合デフォルトでインストールされています)を使用できます。

    base64 < my_service_account.json > password.txt
    

    次のパラメータを使用して、認証にパスワード ファイルの内容を使用できます。

    Java

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
     username="PROJECT_ID" \
     password="contents of base64 encoded password file";
    

    PROJECT_ID は、Pub/Sub を実行しているプロジェクトの ID に置き換えます。

    librdkafka

    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.username=PROJECT_ID
    sasl.password=contents of base64 encoded password file
    

    PROJECT_ID は、Pub/Sub を実行しているプロジェクトの ID に置き換えます。

Kafka Connect を使用してデータのクローンを作成する

Pub/Sub Lite チームは、Kafka Connect シンクの実装を維持しています。Kafka Connect クラスタを使用して、Kafka トピックから Pub/Sub Lite トピックにデータをコピーするように、この実装を構成できます。

データのコピーを実行するようにコネクタを構成するには、Pub/Sub Group Kafka Connector をご覧ください。

パーティション アフィニティが移行プロセスの影響を受けないようにするには、kafka トピックと Pub/Sub Lite トピックのパーティション数が同じで、pubsublite.ordering.mode プロパティが KAFKA に設定されていることを確認します。これにより、コネクタは、元に公開された Kafka パーティションと同じインデックスを持つ Pub/Sub Lite パーティションにメッセージを転送します。

コンシューマーを移行する

Pub/Sub Lite のリソースモデルは Kafka のリソースモデルとは異なります。特に、サブスクリプションはコンシューマ グループとは異なり、明示的なリソースであり、1 つのトピックにのみ関連付けられます。この違いのため、topic を渡す必要がある Kafka Consumer API 内の場所には、代わりに完全なサブスクリプション パスを渡す必要があります。

Kafka Consumer API を使用して Pub/Sub Lite を操作する場合は、Kafka クライアントの SASL 構成に加えて、次の設定も必要です。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443
group.id=unused

REGION は、Pub/Sub Lite サブスクリプションがあるリージョンに置き換えます。

特定のサブスクリプションの最初の Pub/Sub Lite コンシューマ ジョブを開始する前に、管理者のシーク オペレーションを待機することなく開始して、コンシューマの最初のロケーションを設定できます。

コンシューマを起動すると、コンシューマはメッセージ バックログにある現在のオフセットに再接続します。古いクライアントと新しいクライアントの両方を動作の確認に要する期間だけ並列に実行し、古いコンシューマー クライアントを停止します。

プロデューサーを移行する

Kafka Producer API を使用して Pub/Sub Lite を操作する場合、Kafka クライアントの SASL 構成に加えて、プロデューサー パラメータとして次も必要になります。

bootstrap.servers=REGION-kafka-pubsub.googleapis.com:443

REGION は、Pub/Sub Lite トピックが存在するリージョンに置き換えます。

トピックのすべてのコンシューマーを Pub/Sub Lite から読み取るように移行した後、プロデューサーのトラフィックを Pub/Sub Lite に直接書き込むように移行します。

Kafka トピックではなく Pub/Sub Lite トピックに書き込むように、プロデューサー クライアントを段階的に移行します。

プロデューサー クライアントを再起動して、新しい構成を取得します。

Kafka Connect を停止する

Pub/Sub Lite に直接書き込むすべてのプロデューサーを移行した後は、コネクタがデータをコピーすることはありません。

Kafka Connect インスタンスを停止できます。

Kafka 接続をトラブルシューティングする

Kafka クライアントは特注ワイヤー プロトコルを介して通信するため、すべてのリクエストでエラーが発生した場合のエラー メッセージを提供できるわけではありません。メッセージの一部として送信されたエラーコードを信頼します。

クライアントで発生したエラーの詳細を確認するには、org.apache.kafka 接頭辞のロギング レベルを FINEST に設定します。

低スループットとバックログの増加

低スループットとバックログの増加が表示される場合がある理由はいくつかあります。1 つの理由は、容量不足の場合です。

スループット容量は、トピックレベルで構成するか、予約を使用して構成できます。サブスクライブとパブリッシュのスループット容量が不十分に構成されている場合、サブスクライブとパブリッシュの対応するスループットがスロットリングされます。

このスループット エラーは、パブリッシャーの topic/flow_control_status 指標とサブスクライバーの subscription/flow_control_status 指標によって通知されます。この指標は、次の状態を提供します。

  • NO_PARTITION_CAPACITY: このメッセージは、パーティションごとのスループットの上限に到達したことを示します。

  • NO_RESERVATION_CAPACITY: このメッセージは、予約ごとのスループットの上限に到達したことを示します。

トピックまたは予約のパブリッシュとサブスクライブの割り当ての使用率グラフを表示し、使用率が 100% に近づいているかどうかを確認できます。

この問題を解決するには、トピックまたは予約のスループット容量を増やします。

トピックの承認に失敗したエラー メッセージ

Kafka API を使用してパブリッシュするには、Lite サービス エージェントが Pub/Sub Lite トピックにパブリッシュする正しい権限が必要です。

Pub/Sub Lite トピックにパブリッシュする適切な権限がない場合、クライアントでエラー TOPIC_AUTHORIZATION_FAILED が発生します。

この問題を解決するには、プロジェクトの Lite サービス エージェントが認証構成で渡されているかどうかを確認します。

無効なトピックに関するエラー メッセージ

Kafka API を使用してサブスクライブするには、Kafka Consumer API で topic を想定しているすべての場所に完全なサブスクリプション パスを渡す必要があります。

正しくフォーマットされたサブスクリプション パスを渡さない場合、コンシューマ クライアントでエラー INVALID_TOPIC_EXCEPTION が発生します。

予約を使用していない場合に無効なリクエスト

Kafka ワイヤ プロトコル サポートを使用する場合、使用料金を請求するには、すべてのトピックに予約が関連付けられている必要があります。