Menulis dari Dataflow ke Apache Iceberg

Untuk menulis dari Dataflow ke Apache Iceberg, gunakan konektor I/O terkelola.

I/O Terkelola mendukung kemampuan berikut untuk Apache Iceberg:

Katalog
  • Hadoop
  • Hive
  • Katalog berbasis REST
  • Metastore BigQuery (memerlukan Apache Beam SDK 2.62.0 atau yang lebih baru jika tidak menggunakan Runner v2)
Kemampuan baca Pembacaan batch
Kemampuan tulis
  • Operasi tulis batch
  • Penulisan streaming
  • Tujuan dinamis
  • Pembuatan tabel dinamis

Untuk tabel BigQuery untuk Apache Iceberg, gunakan konektor BigQueryIO dengan BigQuery Storage API. Tabel harus sudah ada; pembuatan tabel dinamis tidak didukung.

Dependensi

Tambahkan dependensi berikut ke project Anda:

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-managed</artifactId>
  <version>${beam.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-iceberg</artifactId>
  <version>${beam.version}</version>
</dependency>

Konfigurasi

I/O Terkelola menggunakan parameter konfigurasi berikut untuk Apache Iceberg:

Konfigurasi baca dan tulis Jenis data Deskripsi
table string ID tabel Apache Iceberg. Contoh: "db.table1".
catalog_name string Nama katalog. Contoh: "local".
catalog_properties map Peta properti konfigurasi untuk katalog Apache Iceberg. Properti yang diperlukan bergantung pada katalog. Untuk informasi selengkapnya, lihat CatalogUtil dalam dokumentasi Apache Iceberg.
config_properties map Kumpulan properti konfigurasi Hadoop opsional. Untuk informasi selengkapnya, lihat CatalogUtil dalam dokumentasi Apache Iceberg.
Konfigurasi tulis Jenis data Deskripsi
triggering_frequency_seconds bilangan bulat Untuk pipeline operasi tulis streaming, frekuensi saat sink mencoba membuat snapshot, dalam detik.

Tujuan dinamis

I/O Terkelola untuk Apache Iceberg mendukung tujuan dinamis. Daripada menulis ke satu tabel tetap, konektor dapat memilih tabel tujuan secara dinamis berdasarkan nilai kolom dalam data yang masuk.

Untuk menggunakan tujuan dinamis, berikan template untuk parameter konfigurasi table. Untuk informasi selengkapnya, lihat Tujuan dinamis.

Contoh

Contoh berikut menunjukkan cara menggunakan I/O Terkelola untuk menulis ke Apache Iceberg.

Menulis ke tabel Apache Iceberg

Contoh berikut menulis data JSON dalam memori ke tabel Apache Iceberg.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;
import org.apache.beam.sdk.values.PCollectionRowTuple;

public class ApacheIcebergWrite {
  static final List<String> TABLE_ROWS = Arrays.asList(
      "{\"id\":0, \"name\":\"Alice\"}",
      "{\"id\":1, \"name\":\"Bob\"}",
      "{\"id\":2, \"name\":\"Charles\"}"
  );

  static final String CATALOG_TYPE = "hadoop";

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addStringField("name")
      .addInt64Field("id")
      .build();

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);

    @Description("The name of the table to write to")
    String getTableName();

    void setTableName(String value);
  }

  public static void main(String[] args) {

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    //   --tableName= $TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", CATALOG_TYPE)
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", options.getTableName())
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        .build();

    // Build the pipeline.
    pipeline.apply(Create.of(TABLE_ROWS))
        .apply(JsonToRow.withSchema(SCHEMA))
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    pipeline.run().waitUntilFinish();
  }
}

Menulis dengan tujuan dinamis

Contoh berikut menulis ke berbagai tabel Apache Iceberg berdasarkan kolom dalam data input.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.JsonToRow;

public class ApacheIcebergDynamicDestinations {

  // The schema for the table rows.
  public static final Schema SCHEMA = new Schema.Builder()
      .addInt64Field("id")
      .addStringField("name")
      .addStringField("airport")
      .build();

  // The data to write to table, formatted as JSON strings.
  static final List<String> TABLE_ROWS = List.of(
      "{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
      "{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
      "{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
  );

  public interface Options extends PipelineOptions {
    @Description("The URI of the Apache Iceberg warehouse location")
    String getWarehouseLocation();

    void setWarehouseLocation(String value);

    @Description("The name of the Apache Iceberg catalog")
    String getCatalogName();

    void setCatalogName(String value);
  }

  // Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table
  // where Dataflow writes each record. The JSON data contains a field named "airport". The
  // Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline pipeline = Pipeline.create(options);

    // Configure the Iceberg source I/O
    Map catalogConfig = ImmutableMap.<String, Object>builder()
        .put("warehouse", options.getWarehouseLocation())
        .put("type", "hadoop")
        .build();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("catalog_name", options.getCatalogName())
        .put("catalog_properties", catalogConfig)
        // Route the incoming records based on the value of the "airport" field.
        .put("table", "flights-{airport}")
        // Specify which fields to keep from the input data.
        .put("keep", Arrays.asList("name", "id"))
        .build();

    // Build the pipeline.
    pipeline
        // Read in-memory JSON data.
        .apply(Create.of(TABLE_ROWS))
        // Convert the JSON records to Row objects.
        .apply(JsonToRow.withSchema(SCHEMA))
        // Write each Row to Apache Iceberg.
        .apply(Managed.write(Managed.ICEBERG).withConfig(config));

    // Run the pipeline.
    pipeline.run().waitUntilFinish();
  }
}

Langkah berikutnya