Von Dataflow nach BigQuery schreiben

In diesem Dokument wird beschrieben, wie Sie Daten aus Dataflow in BigQuery schreiben.

Übersicht

In den meisten Anwendungsfällen sollten Sie Managed I/O verwenden, um in BigQuery zu schreiben. Managed I/O bietet Funktionen wie automatische Upgrades und eine einheitliche Konfigurations-API. Beim Schreiben in BigQuery wählt Managed I/O automatisch die beste Schreibmethode für Batch- oder Streamingjobs aus.

Wenn Sie eine erweiterte Leistungsoptimierung benötigen, sollten Sie den BigQueryIO-Connector verwenden. Weitere Informationen finden Sie in diesem Dokument unter BigQueryIO-Connector verwenden.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte für verschiedene Arbeitslasten. Diese Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Schreiben im Speicher 55 Mbit/s 54.000 Elemente pro Sekunde
Avro-Ladevorgang 78 Mbit/s 77.000 Elemente pro Sekunde
Json Load 54 Mbit/s 53.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

BigQueryIO-Connector verwenden

Der BigQuery-E/A-Connector unterstützt die folgenden Methoden zum Schreiben in BigQuery:

  • STORAGE_WRITE_API. In diesem Modus führt der Connector direkte Schreibvorgänge in BigQuery-Speicher über die BigQuery Storage Write API aus. Die Storage Write API kombiniert die Streamingaufnahme und das Laden im Batch in einer einzigen Hochleistungs-API. Dieser Modus stellt eine "Genau einmal"-Semantik sicher.
  • STORAGE_API_AT_LEAST_ONCE. Dieser Modus verwendet auch die Storage Write API, stellt aber eine "Mindestens einmal"-Semantik bereit. Dieser Modus führt bei den meisten Pipelines zu einer geringeren Latenz. Doppelte Schreibvorgänge sind jedoch möglich.
  • FILE_LOADS. In diesem Modus schreibt der Connector die Eingabedaten in Staging-Dateien in Cloud Storage. Anschließend wird ein BigQuery-Ladejob ausgeführt, um die Daten in BigQuery zu laden. Der Modus ist die Standardeinstellung für begrenzte PCollections, die am häufigsten in Batch-Pipelines vorkommen.
  • STREAMING_INSERTS. In diesem Modus verwendet der Connector die Legacy-Streaming-API. Dieser Modus ist der Standard für unbegrenzte PCollections, wird aber für neue Projekte nicht empfohlen.

Beachten Sie bei der Auswahl einer Schreibmethode folgende Punkte:

  • Für Streamingjobs sollten Sie STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE verwenden, da diese Modi direkt in BigQuery-Speicher schreiben, ohne Zwischen-Staging-Dateien zu verwenden.
  • Wenn Sie die Pipeline im "Mindestens einmal"-Streamingmodus ausführen, legen Sie den Schreibmodus auf STORAGE_API_AT_LEAST_ONCE fest. Diese Einstellung ist effizienter und entspricht der Semantik des „Mindestens einmal“-Streamingmodus.
  • Für Dateiladevorgänge und die Storage Write API gelten unterschiedliche Kontingente und Limits.
  • Für Ladejobs wird entweder der freigegebene BigQuery-Slot-Pool oder reservierte Slots verwendet. Wenn Sie reservierte Slots verwenden möchten, führen Sie den Ladejob in einem Projekt mit einer Reservierungszuweisung vom Typ PIPELINE aus. Ladejobs sind kostenlos, wenn Sie den freigegebenen BigQuery-Slot-Pool verwenden. BigQuery gibt jedoch keine Garantien für verfügbare Kapazitäten des gemeinsamen Pools. Weitere Informationen finden Sie unter Einführung in Reservierungen.

Parallelität

  • Bei FILE_LOADS und STORAGE_WRITE_API in Streaming-Pipelines teilt der Connector die Daten in eine Reihe von Dateien oder Streams auf. Im Allgemeinen empfehlen wir, withAutoSharding aufzurufen, um die automatische Fragmentierung zu aktivieren.

  • Bei FILE_LOADS in Batchpipelines schreibt der Connector Daten in partitionierte Dateien, die dann parallel in BigQuery geladen werden.

  • Bei STORAGE_WRITE_API in Batch-Pipelines erstellt jeder Worker einen oder mehrere Streams, die in BigQuery geschrieben werden. Die Anzahl der Streams wird durch die Gesamtzahl der Fragmente bestimmt.

  • Für STORAGE_API_AT_LEAST_ONCE gibt es einen einzigen Standard-Schreibstream. Mehrere Worker hängen Daten an diesen Stream an.

Best Practices

  • Für die Storage Write API gelten Kontingentlimits. Der Connector berücksichtigt diese Grenzwerte für die meisten Pipelines. In einigen Szenarien können die verfügbaren Storage Write API-Streams jedoch erschöpft sein. Dieses Problem kann beispielsweise in einer Pipeline auftreten, die automatische Fragmentierung und Autoskalierung mit einer großen Anzahl von Zielen verwendet, insbesondere bei lang laufenden Jobs mit stark variablen Arbeitslasten. Wenn dieses Problem auftritt, sollten Sie STORAGE_WRITE_API_AT_LEAST_ONCE verwenden, um es zu vermeiden.

  • Verwenden Sie Google Cloud -Messwerte, um die Kontingentnutzung der Storage Write API zu überwachen.

  • Beim Laden von Dateien ist Avro in der Regel schneller als JSON. Wenn Sie Avro verwenden möchten, rufen Sie withAvroFormatFunction auf.

  • Standardmäßig werden Ladejobs im selben Projekt wie der Dataflow-Job ausgeführt. Wenn Sie ein anderes Projekt angeben möchten, rufen Sie withLoadJobProjectId auf.

  • Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann useBeamSchema in Ihrer Pipeline auf, um automatisch zwischen Apache Beam-Row- und BigQuery-TableRow-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unter ExampleModel.java.

  • Wenn Sie Tabellen mit komplexen Schemas laden, die Tausende von Feldern enthalten, sollten Sie vielleicht withMaxBytesPerPartition aufrufen, um für jeden Ladejob eine kleinere maximale Größe festzulegen.

  • Standardmäßig verwendet BigQueryIO Storage Write API-Einstellungen, die für die meisten Pipelines angemessen sind. Wenn Leistungsprobleme auftreten, können Sie die Pipelineoptionen verwenden, um diese Einstellungen zu optimieren. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter Storage Write API optimieren.

Streamingpipelines

Die folgenden Empfehlungen gelten für Streamingpipelines.

  • Für Streamingpipelines empfehlen wir die Verwendung der Storage Write API (STORAGE_WRITE_API oder STORAGE_API_AT_LEAST_ONCE).

  • Eine Streamingpipeline kann Dateiladevorgänge verwenden, dieser Ansatz hat jedoch Nachteile:

    • Zum Schreiben der Dateien ist Windowing erforderlich. Sie können das globale Fenster nicht verwenden.
    • BigQuery lädt Dateien nach dem Best-Effort-Prinzip, wenn der freigegebene Slot-Pool verwendet wird. Es kann eine erhebliche Verzögerung zwischen dem Schreiben eines Datensatzes und seiner Verfügbarkeit in BigQuery geben.
    • Wenn ein Ladejob fehlschlägt, z. B. aufgrund fehlerhafter Daten oder Schemaabweichungen, schlägt die gesamte Pipeline fehl.
  • Verwenden Sie nach Möglichkeit STORAGE_WRITE_API_AT_LEAST_ONCE. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch günstiger und besser skalierbar als STORAGE_WRITE_API.

  • Im Allgemeinen sollten Sie STREAMING_INSERTS nicht verwenden. Streaming-Insert-Anweisungen sind teurer als die Storage Write API und bieten auch keine gute Leistung.

  • Durch die Fragmentierung von Daten kann die Leistung in Streaming-Pipelines verbessert werden. Für die meisten Pipelines ist die automatische Fragmentierung ein guter Ausgangspunkt. Sie können die Fragmentierung jedoch so optimieren:

  • Wenn Sie Streaming-Insert-Anweisungen verwenden, empfehlen wir, retryTransientErrors als Wiederholungsrichtlinie festzulegen.

Batchpipelines

Die folgenden Empfehlungen gelten für Batchpipelines.

  • Für die meisten großen Batchpipelines empfehlen wir, es zuerst mit FILE_LOADS zu versuchen. Eine Batchpipeline kann STORAGE_WRITE_API verwenden. Allerdings wird sie wahrscheinlich bei großen Datenmengen (mehr als 1.000 vCPUs) oder bei gleichzeitig ausgeführten Pipelines Kontingentlimits überschreiten. Apache Beam drosselt die maximale Anzahl von Schreibstreams für STORAGE_WRITE_API-Batchjobs nicht. Dadurch erreicht der Job schließlich die BigQuery Storage API-Limits.

  • Wenn Sie FILE_LOADS verwenden, können Sie entweder den freigegebenen BigQuery-Slot-Pool oder Ihren Pool reservierter Slots erschöpfen. Wenn diese Art von Fehlern auftritt, versuchen Sie die folgenden Ansätze:

    • Reduzieren Sie die maximale Anzahl von Workern oder die Worker-Größe für den Job.
    • Erwerben Sie mehr reservierte Slots.
    • Geeignete Methoden: STORAGE_WRITE_API
  • Kleine bis mittelgroße Pipelines (< 1.000 vCPUs) können von STORAGE_WRITE_API profitieren. Bei diesen kleineren Jobs sollten Sie STORAGE_WRITE_API verwenden, wenn Sie eine Warteschlange für unzustellbare Nachrichten benötigen oder wenn der freigegebene Slot-Pool FILE_LOADS nicht ausreicht.

  • Wenn Sie doppelte Daten tolerieren können, sollten Sie die Verwendung von STORAGE_WRITE_API_AT_LEAST_ONCE in Betracht ziehen. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch möglicherweise günstiger als die Option STORAGE_WRITE_API.

  • Unterschiedliche Schreibmodi können je nach den Eigenschaften Ihrer Pipeline unterschiedlich funktionieren. Experimentieren Sie, um den besten Schreibmodus für Ihre Arbeitslast zu ermitteln.

Fehler auf Zeilenebene behandeln

In diesem Abschnitt wird beschrieben, wie Sie mögliche Fehler auf Zeilenebene behandeln können, z. B. aufgrund falsch formatierter Eingabedaten oder wegen Schemaabweichungen.

Bei der Storage Write API werden alle Zeilen, die nicht geschrieben werden können, in eine separate PCollection eingefügt. Rufen Sie zum Abrufen dieser Sammlung getFailedStorageApiInserts für das WriteResult-Objekt auf. Ein Beispiel für diesen Ansatz finden Sie unter Daten in BigQuery streamen.

Es empfiehlt sich, die Fehler zur späteren Verarbeitung an eine Dead-Letter-Warteschlange oder -Tabelle zu senden. Weitere Informationen zu diesem Muster finden Sie unter BigQueryIO-Muster für unzustellbare Nachrichten.

Bei FILE_LOADS schlägt der Ladejob fehl und die Pipeline löst eine Laufzeitausnahme aus, wenn beim Laden der Daten ein Fehler auftritt. Sie können den Fehler in den Dataflow-Logs oder im BigQuery-Jobverlauf ansehen. Der E/O-Connector gibt keine Informationen zu einzelnen fehlgeschlagenen Zeilen zurück.

Weitere Informationen zur Fehlerbehebung finden Sie unter BigQuery-Connector-Fehler.

Beispiele

Die folgenden Beispiele zeigen, wie Sie mit Dataflow in BigQuery schreiben. In diesen Beispielen wird der Connector BigQueryIO verwendet.

In vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine Batchpipeline erstellt, die einen PCollection<MyData> in BigQuery schreibt, wobei MyData ein benutzerdefinierter Datentyp ist.

Die Methode BigQueryIO.write() gibt einen BigQueryIO.Write<T>-Typ zurück, der zum Konfigurieren des Schreibvorgangs verwendet wird. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter In Tabellen schreiben. In diesem Codebeispiel werden Daten in eine vorhandene Tabelle (CREATE_NEVER) geschrieben und die neuen Zeilen an die Tabelle angehängt (WRITE_APPEND).

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

In neue oder vorhandene Tabellen schreiben

Im folgenden Beispiel wird eine neue Tabelle erstellt, wenn die Zieltabelle nicht vorhanden ist. Dazu wird die Erstellungsanordnung auf CREATE_IF_NEEDED gesetzt. Wenn Sie diese Option verwenden, müssen Sie ein Tabellenschema angeben. Der Connector verwendet dieses Schema, wenn neue Tabellen erstellt werden.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Daten zu BigQuery streamen

Im folgenden Beispiel wird gezeigt, wie Sie Daten mit einer "Genau einmal"-Semantik streamen. Dazu setzen Sie den Schreibmodus auf STORAGE_WRITE_API.

Nicht alle Streamingpipelines erfordern eine "Genau einmal"-Semantik. Beispielsweise können Sie Duplikate eventuell aus der Zieltabelle manuell entfernen. Wenn für Ihr Szenario doppelt vorhandene Datensätze akzeptabel sind, sollten Sie die "Mindestens einmal"-Semantik verwenden. Dazu setzen Sie die Schreibmethode auf STORAGE_API_AT_LEAST_ONCE. Diese Methode ist in der Regel effizienter und führt bei den meisten Pipelines zu einer geringeren Latenz.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Nächste Schritte