Von Dataflow in Apache Iceberg schreiben

Verwenden Sie den verwalteten E/A-Connector, um Daten aus Dataflow in Apache Iceberg zu schreiben.

Die verwaltete E/A unterstützt die folgenden Funktionen für Apache Iceberg:

Kataloge
  • Hadoop
  • Hive
  • REST-basierte Kataloge
  • BigQuery-Metastore (erfordert Apache Beam SDK 2.62.0 oder höher, wenn nicht Runner v2 verwendet wird)
Lesefunktionen Batchlesevorgang
Schreibfunktionen
  • Batchschreibvorgang
  • Streaming-Schreibvorgang
  • Dynamische Ziele
  • Dynamische Tabellenerstellung

Verwenden Sie für BigQuery-Tabellen für Apache Iceberg den BigQueryIO-Connector mit der BigQuery Storage API. Die Tabelle muss bereits vorhanden sein. Das dynamische Erstellen von Tabellen wird nicht unterstützt.

Abhängigkeiten

Fügen Sie Ihrem Projekt die folgenden Abhängigkeiten hinzu:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Konfiguration

Für verwaltete E/A werden die folgenden Konfigurationsparameter für Apache Iceberg verwendet:

Konfiguration lesen und schreiben Datentyp Beschreibung
table String Die Kennung der Apache Iceberg-Tabelle. Beispiel: "db.table1".
catalog_name String Der Name des Katalogs. Beispiel: "local".
catalog_properties Karte Eine Zuordnung von Konfigurationseigenschaften für den Apache Iceberg-Katalog. Die erforderlichen Properties hängen vom Katalog ab. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
config_properties Karte Eine optionale Gruppe von Hadoop-Konfigurationseigenschaften. Weitere Informationen finden Sie in der Apache Iceberg-Dokumentation unter CatalogUtil.
Schreibkonfiguration Datentyp Beschreibung
triggering_frequency_seconds integer Für Streaming-Schreibpipelines die Häufigkeit, mit der die Senke versucht, Snapshots zu erstellen, in Sekunden.

Dynamische Ziele

Verwaltete E/A für Apache Iceberg unterstützt dynamische Ziele. Anstatt in eine einzelne feste Tabelle zu schreiben, kann der Connector dynamisch eine Zieltabelle basierend auf Feldwerten in den eingehenden Datensätzen auswählen.

Wenn Sie dynamische Ziele verwenden möchten, geben Sie eine Vorlage für den Konfigurationsparameter table an. Weitere Informationen finden Sie unter Dynamische Ziele.

Beispiele

Die folgenden Beispiele zeigen, wie Sie mit Managed I/O in Apache Iceberg schreiben.

In eine Apache Iceberg-Tabelle schreiben

Im folgenden Beispiel werden In-Memory-JSON-Daten in eine Apache Iceberg-Tabelle geschrieben.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Mit dynamischen Zielen schreiben

Im folgenden Beispiel werden Daten basierend auf einem Feld in den Eingabedaten in verschiedene Apache Iceberg-Tabellen geschrieben.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;

public class ApacheIcebergDynamicDestinations {

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addInt64Field("id")
      .addStringField("name")
      .addStringField("airport")
      .build();

  // The data to write to table, formatted as JSON strings.
  static final List<String> TABLE_ROWS = List.of(
      "{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
      "{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
      "{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
  );

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);
  }

  // Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table
  // where Dataflow writes each record. The JSON data contains a field named "airport". The
  // Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", "hadoop")
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        // Route the incoming records based on the value of the "airport" field.
        .put("table", "flights-{airport}")
        // Specify which fields to keep from the input data.
        .put("keep", Arrays.asList("name", "id"))
        .build();

    // Build the pipeline.
    pipeline
        // Read in-memory JSON data.
        .apply(Create.of(TABLE_ROWS))
        // Convert the JSON records to Row objects.
        .apply(JsonToRow.withSchema(SCHEMA))
        // Write each Row to Apache Iceberg.
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    // Run the pipeline.
    pipeline.run().waitUntilFinish();
  }
}

Nächste Schritte