List your Apache Kafka for BigQuery topics

To list your topics in a cluster, you can use the Google Cloud console, the Google Cloud CLI, the client library, the Apache Kafka for BigQuery API, or the open source Apache Kafka APIs.

Required roles and permissions to list your topics

To get the permissions that you need to list your topics, ask your administrator to grant you the Managed Kafka Viewer (roles/managedkafka.viewer) IAM role on your project. For more information about granting roles, see Manage access.

This predefined role contains the permissions required to list your topics. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to list your topics:

  • List topics: managedkafka.topics.list
  • Get topic: managedkafka.topics.get

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Viewer (roles/managedkafka.viewer) IAM role, see Apache Kafka for BigQuery predefined roles.

List your topics

Console

  1. In the Google Cloud console, go to the Clusters page.

    Go to Clusters

    The clusters you created in a project are listed.

  2. Click the cluster for which you want to see the topics.

    The cluster details page is displayed. In the cluster details page, for the Resources tab, the topics are listed.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud beta managed-kafka topics list command:

    gcloud beta managed-kafka topics list CLUSTER_ID \
        --location=LOCATION_ID \
        --limit=LIMIT
    

    This command retrieves a list of all topics present within the specified Apache Kafka for BigQuery cluster. You can use optional flags to filter, limit, and sort the output.

    Replace the following:

    • CLUSTER_ID: The name of the cluster whose topics you want to list.

    • LOCATION_ID: The location of the cluster.

    • LIMIT: (Optional) The maximum number of topics to list.

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/iterator"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func listTopics(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	req := &managedkafkapb.ListTopicsRequest{
		Parent: clusterPath,
	}
	topicIter := client.ListTopics(ctx, req)
	for {
		res, err := topicIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("topicIter.Next() got err: %w", err)
		}
		fmt.Fprintf(w, "Got topic: %v", res)
	}
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import java.io.IOException;

public class ListTopics {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    listTopics(projectId, region, clusterId);
  }

  public static void listTopics(String projectId, String region, String clusterId)
      throws Exception {
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      ClusterName clusterName = ClusterName.of(projectId, region, clusterId);
      // This operation is being handled synchronously.
      for (Topic topic : managedKafkaClient.listTopics(clusterName).iterateAll()) {
        System.out.println(topic.getAllFields());
      }
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.listTopics got err: %s", e.getMessage());
    }
  }
}

What's next?