Mit dem Job-Builder können Sie benutzerdefinierte Batch- und Streaming-Dataflow-Jobs erstellen. Sie können Job-Builder-Jobs auch als Apache Beam-YAML-Dateien speichern, um sie freizugeben und wiederzuverwenden.
Neue Pipeline erstellen
So erstellen Sie eine neue Pipeline im Job-Builder:
Rufen Sie in der Google Cloud -Console die Seite Jobs auf.
Klicken Sie auf
Job mit dem Builder erstellen.Geben Sie unter Jobname einen Namen für den Job ein.
Wählen Sie entweder Batch oder Streaming aus.
Wenn Sie Streaming auswählen, wählen Sie einen Fenstermodus aus. Geben Sie dann eine Spezifikation für das Fenster ein:
- Festes Fenster: Geben Sie eine Fenstergröße in Sekunden ein.
- Gleitendes Fenster: Geben Sie eine Fenstergröße und einen Fensterzeitraum in Sekunden ein.
- Sitzungsfenster: Geben Sie eine Sitzungslücke in Sekunden ein.
Weitere Informationen zu Fenstern finden Sie unter Fenster und Fensterfunktionen.
Fügen Sie der Pipeline dann Quellen, Transformationen und Senken hinzu, wie in den folgenden Abschnitten beschrieben.
Quelle zur Pipeline hinzufügen
Eine Pipeline muss mindestens eine Quelle haben. Anfangs ist der Job-Builder mit einer leeren Quelle gefüllt. So konfigurieren Sie die Quelle:
Geben Sie im Feld Quellname einen Namen für die Quelle ein oder verwenden Sie den Standardnamen. Der Name wird im Jobdiagramm angezeigt, wenn Sie den Job ausführen.
Wählen Sie in der Liste Quelltyp den Typ der Datenquelle aus.
Geben Sie je nach Quelltyp zusätzliche Konfigurationsinformationen an. Wenn Sie beispielsweise BigQuery auswählen, geben Sie die Tabelle an, aus der Daten gelesen werden sollen.
Wenn Sie Pub/Sub auswählen, geben Sie ein Nachrichtenschema an. Geben Sie den Namen und den Datentyp der einzelnen Felder ein, die aus Pub/Sub-Nachrichten gelesen werden sollen. In der Pipeline werden alle Felder entfernt, die nicht im Schema angegeben sind.
Optional: Bei einigen Quelltypen können Sie auf Quelldaten in der Vorschau ansehen klicken, um sich eine Vorschau der Quelldaten anzusehen.
Wenn Sie der Pipeline eine weitere Quelle hinzufügen möchten, klicken Sie auf Quelle hinzufügen. Wenn Sie Daten aus mehreren Quellen kombinieren möchten, fügen Sie Ihrer Pipeline eine SQL
- oder Join
-Transformation hinzu.
Der Pipeline eine Transformation hinzufügen
Optional können Sie der Pipeline eine oder mehrere Transformationen hinzufügen. Mit den folgenden Transformationen können Sie Daten aus Quellen und anderen Transformationen bearbeiten, aggregieren oder zusammenführen:
Transformationstyp | Beschreibung | Informationen zu Beam-YAML-Transformationen |
---|---|---|
Filtern (Python) | Datensätze mit einem Python-Ausdruck filtern. | |
SQL-Transformation | Datensätze bearbeiten oder mehrere Eingaben mit einer SQL-Anweisung zusammenführen | |
Felder zuordnen (Python) | Neue Felder oder ganze Datensätze mit Python-Ausdrücken und ‑Funktionen hinzufügen bzw. neu zuordnen. | |
Felder zuordnen (SQL) | Datensatzfelder mit SQL-Ausdrücken hinzufügen oder zuordnen | |
YAML-Transformationen:
|
Alle Transformationen aus dem Beam YAML SDK verwenden. YAML-Transformationskonfiguration: Geben Sie die Konfigurationsparameter für die YAML-Transformation als YAML-Zuordnung an. Die Schlüssel/Wert-Paare werden verwendet, um den Konfigurationsabschnitt der resultierenden Beam-YAML-Transformation zu füllen. Die unterstützten Konfigurationsparameter für jeden Transformationstyp finden Sie in der Beam-YAML-Transformationsdokumentation. Beispielkonfigurationsparameter: Kombinierengroup_by: combine: Beitretentype: equalities: fields: |
|
Log | Logdatensätze in die Worker-Logs des Jobs schreiben. | |
Gruppieren nach |
Datensätze mit Funktionen wie count() und sum() kombinieren.
|
|
Beitreten | Mehrere Eingaben für gleiche Felder verknüpfen. | |
Aufteilen | Datensätze durch Vereinfachen von Arrayfeldern aufteilen. |
So fügen Sie eine Transformation hinzu:
Klicken Sie auf Transformation hinzufügen.
Geben Sie im Feld Transformieren einen Namen für die Transformation ein oder verwenden Sie den Standardnamen. Der Name wird im Jobdiagramm angezeigt, wenn Sie den Job ausführen.
Wählen Sie in der Liste Transformationsart die Art der Transformation aus.
Geben Sie je nach Transformationstyp zusätzliche Konfigurationsinformationen an. Wenn Sie beispielsweise Filter (Python) auswählen, geben Sie einen Python-Ausdruck ein, der als Filter verwendet werden soll.
Wählen Sie den Eingabeschritt für die Transformation aus. Der Eingabeschritt ist die Quelle oder Transformation, deren Ausgabe die Eingabe für diese Transformation liefert.
Senke zur Pipeline hinzufügen
Eine Pipeline muss mindestens ein Ziel haben. Anfangs ist der Job-Builder mit einem leeren Senkenknoten gefüllt. So konfigurieren Sie die Senke:
Geben Sie im Feld Name der Senke einen Namen für die Senke ein oder verwenden Sie den Standardnamen. Der Name wird im Jobdiagramm angezeigt, wenn Sie den Job ausführen.
Wählen Sie in der Liste Sink-Typ den Typ des Sinks aus.
Geben Sie je nach Senkentyp zusätzliche Konfigurationsinformationen an. Wenn Sie beispielsweise die BigQuery-Senke auswählen, wählen Sie die BigQuery-Tabelle aus, in die geschrieben werden soll.
Wählen Sie den Eingabeschritt für die Senke aus. Der Eingabeschritt ist die Quelle oder Transformation, deren Ausgabe die Eingabe für diese Transformation liefert.
Wenn Sie der Pipeline ein weiteres Ziel hinzufügen möchten, klicken Sie auf Ziel hinzufügen.
Pipeline ausführen
Führen Sie die folgenden Schritte aus, um eine Pipeline über den Job-Builder auszuführen:
Optional: Dataflow-Joboptionen festlegen Klicken Sie zum Erweitern des Abschnitts „Dataflow-Optionen“ auf den
Erweiterungspfeil.Klicken Sie auf Job ausführen. Der Job-Builder ruft das Job-Diagramm für den eingereichten Job auf. Mit der Jobgrafik können Sie den Status des Jobs überwachen.
Pipeline vor dem Starten validieren
Bei Pipelines mit komplexer Konfiguration, z. B. Python-Filtern und SQL-Ausdrücken, kann es hilfreich sein, die Pipelinekonfiguration vor dem Start auf Syntaxfehler zu prüfen. Führen Sie die folgenden Schritte aus, um die Pipelinesyntax zu validieren:
- Klicken Sie auf Validieren, um Cloud Shell zu öffnen und den Validierungsdienst zu starten.
- Klicken Sie auf Überprüfung starten.
- Wenn bei der Validierung ein Fehler gefunden wird, wird ein rotes Ausrufezeichen angezeigt.
- Beheben Sie alle erkannten Fehler und bestätigen Sie die Korrekturen, indem Sie auf Validieren klicken. Wenn kein Fehler gefunden wird, wird ein grünes Häkchen angezeigt.
Mit der gcloud CLI ausführen
Sie können Beam YAML-Pipelines auch mit der gcloud CLI ausführen. So führen Sie eine Job-Builder-Pipeline mit der gcloud CLI aus:
Klicken Sie auf YAML speichern, um das Fenster YAML speichern zu öffnen.
Führen Sie eine der folgenden Aktionen aus:
- Wenn Sie in Cloud Storage speichern möchten, geben Sie einen Cloud Storage-Pfad ein und klicken Sie auf Speichern.
- Klicken Sie auf Herunterladen, um eine lokale Datei herunterzuladen.
Führen Sie den folgenden Befehl in der Shell oder im Terminal aus:
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
Ersetzen Sie
YAML_FILE_PATH
durch den Pfad Ihrer YAML-Datei, entweder lokal oder in Cloud Storage.
Nächste Schritte
- Dataflow-Job-Monitoring-Oberfläche verwenden
- YAML-Jobdefinitionen im Job-Builder speichern und laden
- Weitere Informationen zu Beam YAML