The managed I/O connector is an Apache Beam transform that provides a common API for creating sources and sinks.
Overview
You create the managed I/O connector using Apache Beam code, just like any other I/O connector. You specify a source or sink to instantiate and pass in a set of configuration parameters. You can also put the configuration parameters into a YAML file and provide a URL to the file.
On the backend, Dataflow treats the managed I/O connector as a service, which allows Dataflow to manage runtime operations for the connector. You can then focus on the business logic in your pipeline, rather than managing these details.
For more information about the managed I/O API, see Managed
in the
Apache Beam Java SDK documentation.
Dynamic destinations
For some sinks, the managed I/O connector can dynamically select a destination based on field values in the incoming records.
To use dynamic destinations, provide a template string for the destination. The
template string can include field names within curly brackets, such as
"tables.{field1}"
. At runtime, the connector substitutes the value of the
field for each incoming record, to determine the destination for that record.
For example, suppose your data has a field named airport
. You could set the
destination to "flights.{airport}"
. If airport
=SFO
, the record is written
to flights.SFO
. For nested fields, use dot-notation. For example:
{top.middle.nested}
.
Filtering
You might want to filter out certain fields before they are written to the
destination table. For sinks that support dynamic destinations, you can use
the drop
, keep
, or only
parameter for this purpose. These parameters let
you include destination metadata in the input records, without writing the
metadata to the destination.
You can set at most one of these parameters for a given sink.
Configuration parameter | Data type | Description |
---|---|---|
drop |
list of strings | A list of field names to drop before writing to the destination. |
keep |
list of strings | A list of field names to keep when writing to the destination. Other fields are dropped. |
only |
string | The name of exactly one field to keep when writing to the destination. All other fields are dropped. This field must be of row type. |
Apache Iceberg
Managed I/O supports the following capabilities for Apache Iceberg:
Catalog | Batch read | Batch write | Streaming write | Dynamic table creation | Dynamic destinations |
---|---|---|---|---|---|
Hadoop | Supported | Supported | Supported | Supported | Supported |
Hive | Supported | Supported | Supported | Supported | Supported |
REST-based catalogs | Supported | Supported | Supported | Supported | Supported |
For BigQuery tables for Apache Iceberg,
use the
BigQueryIO
connector
with BigQuery Storage API. The table must already exist; dynamic table creation is
not supported.
Managed I/O uses the following configuration parameters for Apache Iceberg:
Read and write configuration | Data type | Description |
---|---|---|
table |
string | The identifier of the Apache Iceberg table. Example:
"db.table1" . |
catalog_name |
string | The name of the catalog. Example: "local" . |
catalog_properties |
map | A map of configuration properties for the Apache Iceberg
catalog. The required properties depend on the catalog. For more
information, see
CatalogUtil in the Apache Iceberg documentation. |
config_properties |
map | An optional set of Hadoop configuration properties. For more
information, see
CatalogUtil in the Apache Iceberg documentation. |
Write configuration | Data type | Description |
triggering_frequency_seconds |
integer | For streaming write pipelines, the frequency at which the sink attempts to produce snapshots, in seconds. |
For more information, including code examples, see the following topics:
Apache Kafka
For Apache Kafka, the Managed I/O uses the following configuration parameters.
Read and write configuration | Data type | Description |
---|---|---|
bootstrap_servers |
string | Required. A comma-separated list of Kafka bootstrap servers.
Example: localhost:9092 . |
topic |
string | Required. The Kafka topic to read or write. |
file_descriptor_path |
string | The path to a protocol buffer file descriptor set. Applies only if
data_format is "PROTO" . |
data_format |
string | The format of the messages. Supported values: "AVRO" ,
"JSON" , "PROTO" , "RAW" . The
default value is "RAW" , which reads or writes the raw
bytes of the message payload. |
message_name |
string | The name of the protocol buffer message. Required if
data_format is "PROTO" . |
schema |
string | The Kafka message schema. The expected schema type depends on the data format:
For read pipelines, this parameter is ignored if
|
Read configuration | Data type | Description |
auto_offset_reset_config |
string | Specifies the behavior when there is no initial offset or the current offset no longer exists on the Kafka server. The following values are supported:
The default value is |
confluent_schema_registry_subject |
string | The subject of a Confluent schema registry. Required if
confluent_schema_registry_url is specified. |
confluent_schema_registry_url |
string | The URL of a Confluent schema registry. If specified, the
schema parameter is ignored. |
consumer_config_updates |
map | Sets configuration parameters for the Kafka consumer. For more information, see Consumer configs in the Kafka documentation. You can use this parameter to customize the Kafka consumer. | max_read_time_seconds |
int | The maximum read time, in seconds. This option produces a bounded
PCollection and is mainly intended for testing or other
non-production scenarios. |
Write configuration | Data type | Description |
producer_config_updates |
map | Sets configuration parameters for the Kafka producer. For more information, see Producer configs in the Kafka documentation. You can use this parameter to customize the Kafka producer. |
To read Avro or JSON messages, you must specify a message schema. To set a
schema directly, use the schema
parameter. To provide the schema through a
Confluent schema registry, set the confluent_schema_registry_url
and
confluent_schema_registry_subject
parameters.
To read or write Protocol Buffer messages, either specify a message schema or
set the file_descriptor_path
parameter.
BigQuery
Requires Apache Beam SDK 2.61.0 or later.
Managed I/O supports the following capabilities for BigQuery:
- Dynamic table creation
- Dynamic destinations
- For reads, the connector uses the BigQuery Storage Read API.
For writes, the connector uses the following BigQuery methods:
- If the source is unbounded, the connector performs direct writes to BigQuery, by using the BigQuery Storage Write API in exactly-once mode.
- If the source is bounded, the connector uses BigQuery file loads.
Managed I/O uses the following configuration parameters for BigQuery:
Read and write configuration | Data type | Description |
---|---|---|
table |
string | The BigQuery table to read or write. Format as
"PROJECT.DATASET.TABLE" . Example:
"my_project.dataset1.table1" . |
kms_key |
string | Specifies a Cloud Key Management Service (Cloud KMS) key to encrypt the BigQuery table when writing, or to encrypt any temporary tables created during reads. |
Read configuration | Data type | Description |
fields |
list of strings | A list of columns to read from the table. This parameter allows efficient reads when a table contains many columns. |
query |
string | A SQL query to read from. If specified, the connector runs the query on BigQuery and reads the query results. |
row_restriction |
string | A predicate that filters data on the server side. Example:
"age > 18" . |
Write configuration | Data type | Description |
triggering_frequency |
integer | For unbounded sources, specifies the frequency at which file writes are triggered, in seconds. |
For reads, you must specify either table
or query
. For writes, you must
specify table
.