Create a BigQuery Sink connector

BigQuery Sink connectors let you stream data from your Kafka topics into BigQuery tables. This enables real-time data ingestion and analysis within BigQuery.

Before you begin

Before creating a BigQuery Sink connector, ensure you have the following:

Required roles and permissions

To get the permissions that you need to create a BigQuery Sink connector, ask your administrator to grant you the Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM role on your project. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the permissions required to create a BigQuery Sink connector. To see the exact permissions that are required, expand the Required permissions section:

Required permissions

The following permissions are required to create a BigQuery Sink connector:

  • Grant the create a connector permission on the parent Connect cluster: managedkafka.connectors.create

You might also be able to get these permissions with custom roles or other predefined roles.

For more information about the Managed Kafka Connector Editor role, see Managed Service for Apache Kafka predefined roles.

If your Managed Service for Apache Kafka cluster is in the same project as the Connect cluster, no further permissions are required. If the cluster is in a different project, refer to Create a Connect Cluster in a different project.

Grant permissions to write to the BigQuery table

The Connect cluster service account, which follows the format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, requires permission to write to the BigQuery table. To do so, grant the BigQuery Data Editor (roles/bigquery.dataEditor) to the Connect cluster service account on the project containing the BigQuery table.

How a BigQuery Sink connector works

A BigQuery Sink connector pulls data from one or more Kafka topics, and writes that data to one or more tables within a single BigQuery dataset.

Here's a detailed breakdown of how the BigQuery Sink connector copies data:

  • The connector consumes messages from one or more Kafka topics of the source cluster.

  • The connector moves data to the target BigQuery dataset that is specified in the connector configuration.

  • The connector names tables in the BigQuery dataset based on the following specified configurations:

    • If topic2TableMap is not specified, the connector uses the Kafka topic name as the BigQuery table name.

    • If topic2TableMap is specified, the connector uses the specified mapping to determine the BigQuery table name for each Kafka topic.

  • The connector creates the tables in the BigQuery dataset based on the following specified configurations:

    • If autoCreateTables is set to true (the default), the connector attempts to create any tables that don't exist.

    • If autoCreateTables is set to false, the connector assumes that all target tables already exist.

  • When autoCreateTables is set to true, the connector supports several additional properties for fine-grained control over how the table is created and configured.

    See the documentation for properties such as sanitizeTopics, sanitizeFieldNames, allBQFieldsNullable, clusteringPartitionFieldNames, convertDoubleSpecialValues, partitionExpirationMs, and timestampPartitionFieldName at the GitHub repository for the connector plugin.

  • A Kafka record has three components: headers, keys, and values. The connector configuration, including the converters specified, dictate how these components are written to the BigQuery table.

    • The configured converter parses the value field of the Kafka record and then the schema of the record is used to write that record's fields to the columns of the same name in the BigQuery table. The only supported value converter is org.apache.kafka.connect.json.JsonConverter. The mapping of the value field to the BigQuery column is governed by the value.converter.schemas.enable field of org.apache.kafka.connect.json.JsonConverter.

    • In addition to the value field, you can map additional data from Kafka such as metadata information and key information into the BigQuery table by configuring the kafkaDataFieldName and kafkaKeyFieldName fields respectively. Examples of metadata information include the Kafka topic, partition, offset, and insertTime.

  • The connector requires schema information to write data to BigQuery. This schema can come from the Kafka message itself or the existing BigQuery table. For more information about how schemas are used in writing to the BigQuery table, see Schemas for the BigQuery Sink connector.

  • The tasks.max property controls the level of parallelism for the connector. Increasing tasks.max can improve throughput, but the actual parallelism is limited by the number of partitions in the Kafka topics.

Schemas for the BigQuery Sink connector

The BigQuery Sink connector requires a schema to operate. The schema can be derived from either the destination BigQuery table or the Kafka messages.

You can configure one of the following two modes for schema by specifying a value for value.converter.schemas.enable of the org.apache.kafka.connect.json.JsonConverter converter.

The following are some limitations for schemas in the BigQuery Sink connector:

  • The BigQuery Sink connector does not support using a Schema Registry.

  • Kafka Connect also does not support validation against a remote schema by using Schema Registry.

Message-driven configuration

For message-driven configuration, set value.converter.schemas.enable to true.

In message-driven configuration mode, each Kafka message must include its schema as a JSON schema.

Example Kafka message value:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "field": "user",
        "type": "string",
        "optional": false
      },
      {
        "field": "age",
        "type": "integer",
        "optional": false
      }
    ]
  },
  "payload": {
    "user": "userId",
    "age": 30
  }
}

The connector uses the message schema to write to the BigQuery table. If the table doesn't exist and autoCreateTables is set to True, the connector creates a new BigQuery table based on the message schema.

If you want the connector to update the BigQuery table schema as message schemas change, you must set allowNewBigQueryFields, allowSchemaUnionization, or allowBigQueryRequiredFieldRelaxation to true.

Table-driven configuration

For table-driven configuration, set value.converter.schemas.enable to false.

In table-driven configuration mode, Kafka record values don't include schemas. Instead, the connector uses the existing BigQuery table schema to write data.

Example Kafka message value:

{
  "user": "userId",
  "age": 30
}

This mode does not support dynamic schema updates based on incoming messages.

To troubleshoot issues related to table-drive configuration, see Troubleshoot the BigQuery Sink connector.

Properties of a BigQuery Sink connector

When creating a BigQuery Sink connector, you need to specify the following properties:

Connector name

The name or ID of the connector. For guidelines about how to name the resource, see Guidelines to name a Managed Service for Apache Kafka resource. The name is immutable.

Connector plugin

Select BigQuery Sink as the connector plugin in the Google Cloud console. If you don't use the user interface to configure the connector, you must also specify the connector class.

Topics

The Kafka topics from which the connector consumes messages. You can specify one or more topics, or use a regular expression to match multiple topics. For example, topic.* to match all topics starting with "topic". These topics must exist within the Managed Service for Apache Kafka cluster associated with your Connect cluster.

Dataset

Select or create the BigQuery dataset to store the data.

Optionally, create a BigQuery table within the dataset. By default, the connector writes each topic to a separate table with the same name as the topic name. When doing so, the connector attempts to create new tables if they don't yet exist. To disable this behavior, set autoCreateTables=false in the connector configuration.

Configuration

This section lets you specify additional, connector-specific configuration properties for the BigQuery Sink connector.

Since data in Kafka topics can be in various formats like Avro, JSON, or raw bytes, a key part of the configuration involves specifying converters. Converters translate data from the format used in your Kafka topics into the standardized, internal format of Kafka Connect. The BigQuery Sink connector then takes this internal data and transforms it into the format required by your BigQuery tables before writing it.

For more general information about the role of converters in Kafka Connect, supported converter types, and common configuration options, see converters.

Here are some configs specific to the BigQuery Sink connector:

  • project: Specifies the Google Cloud project ID that contains the default BigQuery dataset.

  • defaultDataset: Specifies the BigQuery dataset where data is written.

  • autoCreateTables: Specifies to automatically create BigQuery tables if they don't exist.

For a list of the available configuration properties specific to this connector, see the BigQuery Sink connector configs.

Task restart policy

Specify the policy for restarting failed connector tasks. You can choose to not restart failed tasks or restart them after an exponential backoff delay.

Create a BigQuery Sink connector

Before you create a connector, review the documentation for Properties of a BigQuery Sink connector.

Console

  1. In the Google Cloud console, go to the Connect clusters page.

    Go to Connect clusters

  2. Click the Connect cluster for which you want to create the connector.

    The Connect cluster details page is displayed.

  3. Click Create connector.

    The Create Kafka connector page is displayed.

  4. For the connector name, enter a string.

    For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource.

  5. Select BigQuery Sink as the Connector plugin type.

  6. Specify the Topics from which you can stream data.

  7. Choose the Dataset to store the data.

  8. (Optional) Configure additional settings in the Configuration section.

  9. Select the Task restart policy.

  10. Click Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Run the gcloud alpha managed-kafka connectors createcommand:

    gcloud alpha managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE \
        [--labels=LABELS]

    Replace the following:

    • CONNECTOR_ID: The ID or name of the connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

    • LOCATION: The location where you create the connector. This must be the same location where you created the Connect cluster.

    • CONNECT_CLUSTER_ID: The ID of the Connect cluster where the connector is created.

    • CONFIG_FILE: The path to the YAML configuration file for the BigQuery Sink connector.

      Here is an example of a configuration file for the BigQuery Sink connector:

      name: "BQ_SINK_CONNECTOR_ID"
      project: "GCP_PROJECT_ID"
      topics: "GMK_TOPIC_ID"
      tasks.max: 3"
      connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
      key.converter: "org.apache.kafka.connect.storage.StringConverter"
      value.converter: "org.apache.kafka.connect.json.JsonConverter"
      value.converter.schemas.enable: "false"
      defaultDataset: "BQ_DATASET_ID"

      Replace the following:

      • BQ_SINK_CONNECTOR_ID: The ID or name of the BigQuery Sink connector. For guidelines on how to name a connector, see Guidelines to name a Managed Service for Apache Kafka resource. The name of a connector is immutable.

      • GCP_PROJECT_ID: The ID of the Google Cloud project where your BigQuery dataset resides.

      • GMK_TOPIC_ID: The ID of the Managed Service for Apache Kafka topic from which the data flows to the BigQuery Sink connector.

      • BQ_DATASET_ID: The ID of the BigQuery dataset that acts as the sink for the pipeline.

    • LABELS: (Optional) Labels to associate with the connector. For more information about the format for labels, see Labels. List of label KEY=VALUE pairs to add. Keys must start with a lowercase character and contain only hyphens (-), underscores (), lowercase characters, and numbers. Values must contain only hyphens (-), underscores (), lowercase characters, and numbers.

After you create a connector, you can edit, delete, pause, stop, or restart the connector.

What's next?