Die Vorlage „Pub/Sub Proto für BigQuery“ ist eine Streamingpipeline, die Proto-Daten aus einem Pub/Sub-Abo in eine BigQuery-Tabelle schreibt.
Alle Fehler beim Schreiben in die BigQuery-Tabelle werden in ein Pub/Sub-Thema für nicht verarbeitete Datensätze gestreamt.
Sie können eine benutzerdefinierte JavaScript-Funktion (UDF) zum Transformieren von Daten bereitstellen. Fehler während der Ausführung der UDF können entweder an ein separates Pub/Sub-Thema oder an dasselbe nicht verarbeitete Thema wie die BigQuery-Fehler gesendet werden.
Bevor Sie eine Dataflow-Pipeline für dieses Szenario ausführen, sollten Sie prüfen, ob ein Pub/Sub-BigQuery-Abo mit einer nutzerdefinierten Funktion Ihren Anforderungen entspricht.
Pipelineanforderungen
- Das Pub/Sub-Eingabeabo muss vorhanden sein.
- Die Schemadatei für die Proto-Einträge muss in Cloud Storage hinterlegt sein.
- Das Pub/Sub-Ausgabethema muss vorhanden sein.
- Das BigQuery-Ausgabe-Dataset muss vorhanden sein.
- Wenn die BigQuery-Tabelle vorhanden ist, muss sie ein Schema haben, das mit den Proto-Daten unabhängig vom
createDisposition
-Wert übereinstimmt.
Vorlagenparameter
Erforderliche Parameter
- protoSchemaPath (Cloud Storage-Pfad zur Proto-Schemadatei): Cloud Storage-Pfad zu einer eigenständigen Deskriptor-Set-Datei. Beispiel: gs://MyBucket/schema.pb.
schema.pb
kann generiert werden, indem Sie--descriptor_set_out=schema.pb
demprotoc
-Befehl hinzufügen, mit dem die Protos kompiliert werden. Mit dem Flag--include_imports
kann sichergestellt werden, dass die Datei unabhängig ist. - fullMessageName (Vollständiger Proto-Nachrichtenname): Der vollständige Nachrichtenname (z. B. package.name.MessageName). Wenn die Nachricht in einer anderen Nachricht verschachtelt ist, geben Sie alle Nachrichten mit dem Trennzeichen „.“ an (z. B. package.name.OuterMessage.InnerMessage). „package.name“ sollte aus der
package
-Anweisung und nicht aus derjava_package
-Anweisung stammen. - inputSubscription (Pub/Sub-Eingabeabo): Das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll, im Format „projects/your-project-id/subscriptions/your-subscription-name“ (Beispiel: projects/your-project-id/subscriptions/your-subscription-name).
- outputTableSpec (BigQuery-Ausgabetabelle): Der Speicherort der BigQuery-Tabelle, in die die Ausgabe geschrieben werden soll. Der Name muss das Format
<project>:<dataset>.<table_name>
haben. Das Schema der Tabelle muss mit Eingabeobjekten übereinstimmen. - outputTopic (Pub/Sub-Ausgabethema): Der Name des Themas, in dem Daten veröffentlicht werden sollen, im Format „projects/your-project-id/topics/your-topic-name“ (Beispiel: projects/your-project-id/topics/your-topic-name).
Optionale Parameter
- preserveProtoFieldNames (Proto-Feldnamen beibehalten): Flag, mit dem gesteuert wird, ob Proto-Feldnamen beibehalten oder in „lowerCamelCase“ konvertiert werden sollen. Wenn die Tabelle bereits vorhanden ist, sollte dies auf dem Schema der Tabelle basieren. Andernfalls werden die Spaltennamen der erstellten Tabelle festgelegt. „True“, um die Proto-Schreibweise mit Unterstrichen beizubehalten. Bei „false“ werden Felder in „lowerCamelCase“ umgewandelt. (Standardeinstellung: false).
- bigQueryTableSchemaPath (BigQuery-Tabellenschemapfad): Cloud Storage-Pfad zur BigQuery-JSON-Schemadatei. Wenn dies nicht festgelegt ist, wird das Schema aus dem Proto-Schema abgeleitet. (Beispiel: gs://MyBucket/bq_schema.json).
- udfOutputTopic (Pub/Sub-Ausgabethema für UDF-Fehler): Ein optionales Ausgabethema, an das UDF-Fehler gesendet werden können. Wenn diese Option nicht festgelegt ist, werden Fehler in dasselbe Thema wie die BigQuery-Fehler geschrieben. Beispiel: projects/your-project-id/topics/your-topic-name.
- writeDisposition (WriteDisposition für BigQuery): BigQuery-WriteDisposition. Beispiele: WRITE_APPEND, WRITE_EMPTY oder WRITE_TRUNCATE. Die Standardeinstellung ist WRITE_APPEND.
- createDisposition (CreateDisposition für BigQuery): BigQuery-CreateDisposition. Beispiel: CREATE_IF_NEEDED, CREATE_NEVER. Die Standardeinstellung ist CREATE_IF_NEEDED.
- javascriptTextTransformGcsPath (Cloud Storage-Pfad zur JavaScript-UDF-Quelle): Das Cloud Storage-Pfadmuster für den JavaScript-Code, der Ihre benutzerdefinierten Funktionen enthält. Beispiel: gs://Ihr-Bucket/Ihre-Funktion.js.
- javascriptTextTransformFunctionName (UDF-JavaScript-Funktionsname): Der Name der Funktion, die aus Ihrer JavaScript-Datei aufgerufen werden soll. Verwenden Sie nur Buchstaben, Ziffern und Unterstriche. (Beispiel: „transform“ oder „transform_udf1“).
- javascriptTextTransformReloadIntervalMinutes (Intervall für automatisches Neuladen von JavaScript-UDFs (Minuten)): Definieren Sie das Intervall, in dem die Worker möglicherweise nach JavaScript-UDF-Änderungen suchen, um die Dateien neu zu laden. Die Standardeinstellung ist 0.
- useStorageWriteApi (BigQuery Storage Write API verwenden): Wenn „true“, verwendet die Pipeline beim Schreiben der Daten in BigQuery die Storage Write API (siehe https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api). Der Standardwert ist „false“. Wenn Sie die Storage Write API im genau einmaligen Modus verwenden, müssen Sie die folgenden Parameter festlegen: „Anzahl der Streams für die BigQuery Storage Write API“ und „Triggerhäufigkeit in Sekunden für die BigQuery Storage Write API“. Wenn Sie den Dataflow-Modus „Mindestens einmal“ aktivieren oder den Parameter „useStorageWriteApiAtLeastOnce“ auf „true“ setzen, müssen Sie die Anzahl der Streams oder die Triggerhäufigkeit nicht festlegen.
- useStorageWriteApiAtLeastOnce (Mindestens einmal-Semantik in der BigQuery Storage Write API verwenden): Dieser Parameter wird nur wirksam, wenn „BigQuery Storage Write API verwenden“ aktiviert ist. Wenn diese Option aktiviert ist, wird für die Storage Write API die "Mindestens einmal"-Semantik verwendet. Andernfalls wird die "Genau einmal"-Semantik verwendet. Die Standardeinstellung ist "false".
- numStorageWriteApiStreams (Anzahl der Streams für die BigQuery Storage Write API): Die Anzahl der Streams definiert die Parallelität der Write-Transformation von BigQueryIO und entspricht ungefähr der Anzahl der Streams der Storage Write API, die von der Pipeline verwendet werden. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api. Die Standardeinstellung ist 0.
- storageWriteApiTriggeringFrequencySec (Triggerhäufigkeit in Sekunden für die BigQuery Storage Write API): Die Triggerhäufigkeit legt fest, wie schnell die Daten für Abfragen in BigQuery sichtbar sind. Die empfohlenen Werte finden Sie unter https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api.
Benutzerdefinierte Funktion
Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.
Funktionsspezifikation
UDFs haben die folgende Spezifikation:
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub Proto to BigQuery templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Ersetzen Sie Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlREGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: der Proto-Nachrichtenname (z. B.package.name.MessageName
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleUNPROCESSED_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlLOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
VERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
SCHEMA_PATH
: der Cloud Storage-Pfad zur Avro-Schemadatei (z. B.gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: der Proto-Nachrichtenname (z. B.package.name.MessageName
)SUBSCRIPTION_NAME
: der Name des Pub/Sub-EingabeabosBIGQUERY_TABLE
: der Name der BigQuery-AusgabetabelleUNPROCESSED_TOPIC
: Das Pub/Sub-Thema, das für die Warteschlange für nicht verarbeitete Datensätze verwendet werden soll
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.