Datenqualitätsaufgaben verwenden

In diesem Dokument wird beschrieben, wie Sie in Dataplex Universal Catalog Datenqualitätsaufgaben erstellen, mit denen Sie Datenqualitätsprüfungen für Ihre integrierten und externen BigQuery-Tabellen planen und ausführen können.

Weitere Informationen finden Sie unter Datenqualitätsaufgaben – Übersicht.

Hinweis

In diesem Dokument wird davon ausgegangen, dass Sie bereits einen Dataplex Universal Catalog-Lake haben, in dem Sie die Datenqualitätsaufgabe erstellen können.

Google-APIs und ‑Dienste aktivieren

  1. Aktivieren Sie die Dataproc API.

    API aktivieren

  2. Aktivieren Sie den privaten Google-Zugriff für Ihr Netzwerk und Subnetzwerk. Für das Netzwerk, das Sie für Datenqualitätsaufgaben in Dataplex Universal Catalog verwenden möchten, muss der private Google-Zugriff aktiviert sein. Wenn Sie beim Erstellen der Datenqualitätsaufgabe in Dataplex Universal Catalog kein Netzwerk oder Subnetzwerk angeben, verwendet Dataplex Universal Catalog das Standardsubnetz. In diesem Fall müssen Sie den privaten Google-Zugriff für das Standardsubnetz aktivieren.

Spezifikationsdatei erstellen

Dataplex Universal Catalog verwendet Open-Source-CloudDQ als Treiberprogramm. Die Anforderungen an die Datenqualitätsprüfung in Dataplex Universal Catalog werden in CloudDQ-YAML-Spezifikationsdateien definiert.

Als Eingabe für die Datenqualitätsaufgabe können Sie eine einzelne YAML-Datei oder ein einzelnes ZIP-Archiv mit einer oder mehreren YAML-Dateien verwenden. Es wird empfohlen, die Anforderungen an die Datenqualitätsprüfung in separaten YAML-Spezifikationsdateien zu erfassen, mit einer Datei pro Abschnitt.

So bereiten Sie eine Spezifikationsdatei vor:

  1. Erstellen Sie eine oder mehrere CloudDQ-YAML-Spezifikationsdateien, in denen die Anforderungen an die Datenqualitätsprüfung definiert werden. Weitere Informationen zur erforderlichen Syntax finden Sie im Abschnitt Spezifikationsdatei in diesem Dokument.

    Speichern Sie die YAML-Spezifikationsdatei im .yml- oder .yaml-Format. Wenn Sie mehrere YAML-Spezifikationsdateien erstellen, speichern Sie alle in einem einzigen ZIP-Archiv.

  2. Cloud Storage-Bucket erstellen
  3. Laden Sie die Spezifikationsdatei in den Cloud Storage-Bucket hoch.

Spezifikationsdatei

Ihre CloudDQ-YAML-Spezifikationsdatei muss folgende Abschnitte enthalten:

  • Regeln (definiert im rules-YAML-Knoten der obersten Ebene): Eine Liste der auszuführenden Regeln. Sie können diese Regeln aus vordefinierten Regeltypen wie NOT_NULL und REGEX erstellen oder sie mit benutzerdefinierten SQL-Anweisungen wie CUSTOM_SQL_EXPR und CUSTOM_SQL_STATEMENT erweitern. Die Anweisung CUSTOM_SQL_EXPR kennzeichnet alle Zeilen, die custom_sql_expr mit False als Fehler ausgewertet hat. Die Anweisung CUSTOM_SQL_STATEMENT kennzeichnet alle Werte, die von der gesamten Anweisung als Fehler zurückgegeben werden.

  • Zeilenfilter (definiert im row_filters-YAML-Knoten der obersten Ebene): SQL-Ausdrücke, die einen booleschen Wert zurückgeben, der Filter definiert, um eine Teilmenge der Daten aus der zugrunde liegenden Entität zur Prüfung abzurufen.

  • Regelbindungen (definiert im rule_bindings-YAML-Knoten der obersten Ebene): Definiert rules und rule filters für die Tabellen.

  • Regeldimensionen (definiert im rule_dimensions-YAML-Knoten der obersten Ebene): Definiert die Liste der zulässigen Dimensionen für Datenqualitätsregeln, die in einer Regel im entsprechenden dimension-Feld definiert werden können.

    Beispiel:

    rule_dimensions:
      - consistency
      - correctness
      - duplication
      - completeness
      - conformance

    Das Feld dimension ist für eine Regel optional. Der Abschnitt zu den Regeldimensionen ist obligatorisch, wenn dimension in einer beliebigen Regel aufgeführt ist.

Weitere Informationen finden Sie im CloudDQ-Referenzhandbuch und in den beispielhaften Spezifikationsdateien.

Dataset zum Speichern der Ergebnisse erstellen

  • Erstellen Sie ein BigQuery-Dataset, um die Ergebnisse zu speichern.

    Das Dataset muss sich in derselben Region wie die Tabellen befinden, für die Sie die Datenqualitätsaufgabe ausführen.

    Dataplex Universal Catalog verwendet dieses Dataset und erstellt oder verwendet eine Tabelle Ihrer Wahl, um die Ergebnisse zu speichern.

Dienstkonto erstellen

Erstellen Sie ein Dienstkonto mit den folgenden IAM-Rollen (Identity and Access Management) und ‑Berechtigungen:

Erweiterte Einstellungen verwenden

Diese Schritte sind optional:

  • BigQuery führt Datenqualitätsprüfungen standardmäßig im aktuellen Projekt aus. Sie können aber auch ein anderes Projekt auswählen. Verwenden Sie dazu das Argument --gcp_project_id TASK_ARGS für das Attribut --execution-args der Aufgabe.

  • Wenn sich die zum Ausführen von BigQuery-Abfragen angegebene Projekt-ID von dem Projekt unterscheidet, in dem das Dienstkonto (durch --execution-service-account angegeben) erstellt wird, muss die Organisationsrichtlinie, die die projektübergreifende Dienstkontonutzung deaktiviert (iam.disableServiceAccountCreation), deaktiviert sein. Achten Sie außerdem darauf, dass das Dienstkonto auf den BigQuery-Jobzeitplan in dem Projekt zugreifen kann, in dem BigQuery-Abfragen ausgeführt werden.

Beschränkungen

Alle Tabellen, die für eine bestimmte Datenqualitätsaufgabe angegeben sind, müssen zur selben Google Cloud-Region gehören.

Zeitplan für eine Datenqualitätsaufgabe festlegen

Console

  1. Rufen Sie in der Google Cloud Console die Dataplex Universal Catalog-Seite Verarbeiten auf.

    Zur Seite Verarbeiten

  2. Klicken Sie auf  Aufgabe erstellen.
  3. Klicken Sie auf der Karte Datenqualität prüfen auf Aufgabe erstellen.
  4. Wählen Sie als Dataplex-Lake Ihren Lake aus.
  5. Geben Sie als ID eine ID ein.
  6. Führen Sie im Abschnitt Spezifikation der Datenqualität folgende Schritte aus:
    1. Klicken Sie im Feld GCS-Datei auswählen auf Durchsuchen.
    2. Wählen Sie Ihren Cloud Storage-Bucket aus.

    3. Klicken Sie auf Auswählen.

  7. Führen Sie im Abschnitt Ergebnistabelle folgende Schritte aus:

    1. Klicken Sie im Feld BigQuery-Dataset auswählen auf Durchsuchen.

    2. Wählen Sie das BigQuery-Dataset aus, in dem die Prüfungsergebnisse gespeichert werden sollen.

    3. Klicken Sie auf Auswählen.

    4. Geben Sie im Feld BigQuery-Tabelle den Namen der Tabelle ein, in der die Ergebnisse gespeichert werden sollen. Wenn die Tabelle nicht vorhanden ist, wird sie von Dataplex Universal Catalog erstellt. Verwenden Sie nicht den Namen dq_summary, da er für interne Verarbeitungsaufgaben reserviert ist.

  8. Wählen Sie im Abschnitt Dienstkonto ein Dienstkonto aus dem Menü Nutzerdienstkonto aus.

  9. Klicken Sie auf Weiter.

  10. Konfigurieren Sie im Abschnitt Zeitplan festlegen den Zeitplan für das Ausführen der Datenqualitätsaufgabe.

  11. Klicken Sie auf Erstellen.

gcloud-CLI

Das folgende Beispiel zeigt die Ausführung einer Datenqualitätsaufgabe, bei der der gcloud CLI-Befehl für Dataplex Universal Catalog-Aufgaben verwendet wird:

export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH"

# Google Cloud project where the Dataplex Universal Catalog task is created.
export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT"

# Google Cloud region for the Dataplex Universal Catalog lake.
export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID"

# Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region.
export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}"

# The Dataplex Universal Catalog lake where your task is created.
export DATAPLEX_LAKE_ID="DATAPLEX_LAKE_ID"

# The service account used for running the task. Ensure that this service account
has sufficient IAM permissions on your project, including
BigQuery Data Editor, BigQuery Job User,
Dataplex Universal Catalog Editor, Dataproc Worker, and Service
Usage Consumer.
export DATAPLEX_TASK_SERVICE_ACCOUNT="DATAPLEX_TASK_SERVICE_ACCOUNT"

# If you want to use a different dataset for storing the intermediate data quality summary results
and the BigQuery views associated with each rule binding, use the following:
export CLOUDDQ_BIGQUERY_DATASET="CLOUDDQ_BIGQUERY_DATASET"

# The BigQuery dataset where the final results of the data quality checks are stored.
This could be the same as CLOUDDQ_BIGQUERY_DATASET.
export TARGET_BQ_DATASET="TARGET_BQ_DATASET"

# The BigQuery table where the final results of the data quality checks are stored.
export TARGET_BQ_TABLE="TARGET_BQ_TABLE"

# The unique identifier for the task.
export TASK_ID="TASK_ID"

gcloud dataplex tasks create \
    --location="${DATAPLEX_REGION_ID}" \
    --lake="${DATAPLEX_LAKE_ID}" \
    --trigger-type=ON_DEMAND \
    --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \
    --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \
    --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \
    --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \
    "$TASK_ID"
Parameter Beschreibung
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH Der Cloud Storage-Pfad zu Ihrer YAML-Konfigurationseingabe für die Datenqualitätsaufgabe. Sie können eine einzelne YAML-Datei im Format .yml oder .yaml oder ein ZIP-Archiv mit mehreren YAML-Dateien haben.
GOOGLE_CLOUD_PROJECT Das Google Cloud -Projekt, in dem die Dataplex Universal Catalog-Aufgabe und die BigQuery-Jobs erstellt werden.
DATAPLEX_REGION_ID Die Region des Dataplex Universal Catalog-Lake, in der die Datenqualitätsaufgabe erstellt wird.
SERVICE_ACCOUNT Das Dienstkonto, das zum Ausführen der Aufgabe verwendet wird. Dieses Dienstkonto muss die erforderlichen IAM-Berechtigungen haben, wie im Abschnitt Hinweis beschrieben.

Für --execution-args müssen die folgenden Argumente als positionierte Argumente in dieser Reihenfolge übergeben werden:

Argument Beschreibung
clouddq-executable.zip Vorkompilierte ausführbare Datei, die über spark-file-uris aus einem öffentlichen Cloud Storage-Bucket übergeben wurde.
ALL Alle Regelbindungen werden ausgeführt. Alternativ können Sie bestimmte Regelbindungen als durch Kommas getrennte Liste angeben. Beispiel: RULE_1,RULE_2.
gcp-project-id Projekt-ID, unter der die BigQuery-Abfragen ausgeführt werden.
gcp-region-id Region zum Ausführen der BigQuery-Jobs zur Prüfung der Datenqualität. Diese Region sollte mit der Region für gcp-bq-dataset-id und target_bigquery_summary_table übereinstimmen.
gcp-bq-dataset-id BigQuery-Dataset, in dem die rule_binding-Ansichten und die zusammenfassenden Zwischenergebnisse der Datenqualitätsprüfung gespeichert werden.
target-bigquery-summary-table Tabellen-ID-Referenz der BigQuery-Tabelle, in der die Endergebnisse der Datenqualitätsprüfungen gespeichert werden. Verwenden Sie nicht den ID-Wert dq_summary, da er für interne Verarbeitungsaufgaben reserviert ist.
--summary_to_stdout (Optional) Wenn dieses Flag übergeben wird, werden alle Zellen mit Prüfungsergebnissen, die bei der letzten Ausführung in der Tabelle dq_summary erstellt wurden, als JSON-Einträge in Cloud Logging und stdout protokolliert.

API

  1. Ersetzen Sie Folgendes:

    PROJECT_ID = "Your Dataplex Universal Catalog Project ID"
    REGION = "Your Dataplex Universal Catalog lake region"
    LAKE_ID = "Your Dataplex Universal Catalog lake ID"
    SERVICE_ACC = "Your service account used for reading the data"
    DATAPLEX_TASK_ID = "Unique task ID for the data quality task"
    BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification"
    GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project"
    GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional
    GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results"
    TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
  2. Senden Sie eine HTTP-POST-Anfrage:
    POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID}
    {
    "spark": {
        "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py",
        "file_uris": [  f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip",
                        f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum",
                        f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip"
                    ]
    },
    "execution_spec": {
        "args": {
            "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}"
        },
        "service_account": "SERVICE_ACC"
    },
    "trigger_spec": {
    "type": "ON_DEMAND"
    },
    "description": "${DATAPLEX_TASK_DESCRIPTION}"
    }

Beispiel-Airflow-DAG für Datenqualitätsaufgabe in Dataplex Universal Catalog

Geplante Datenqualitätsaufgabe überwachen

Hier erfahren Sie, wie Sie Ihre Aufgabe im Blick behalten.

Ergebnisse aufrufen

Die Ergebnisse der Datenqualitätsprüfungen werden im BigQuery-Dataset und in der Übersichtstabelle gespeichert, die Sie angegeben haben, wie unter Dataset zum Speichern der Ergebnisse erstellen beschrieben. Die Übersichtstabelle enthält eine Ausgabezusammenfassung für die verschiedenen Kombinationen aus Regelbindung und Regel je Prüfung. Die Ausgabe in der Übersichtstabelle umfasst die folgenden Informationen:

Spaltenname Beschreibung
dataplex_lake (String) ID des Dataplex Universal Catalog-Lake, der die zu prüfende Tabelle enthält.
dataplex_zone (String) ID der Dataplex Universal Catalog-Zone, die die zu prüfende Tabelle enthält.
dataplex_asset_id (String) ID des Dataplex Universal Catalog-Assets, das die zu prüfende Tabelle enthält.
execution_ts (Zeitstempel) Zeitstempel für die Ausführung der Prüfungsabfrage.
rule_binding_id (String) ID der Regelbindung, für die Prüfungsergebnisse gemeldet werden.
rule_id (String) ID der Regel in der Regelbindung, für die Prüfungsergebnisse gemeldet werden.
dimension (String) Datenqualitätsdimension der rule_id. Dieser Wert kann nur einer der Werte sein, die im rule_dimensions-YAML-Knoten angegeben sind.
table_id (String) ID der Entität, für die Prüfungsergebnisse gemeldet werden. Diese ID wird unter dem entity-Parameter der jeweiligen Regelbindung angegeben.
column_id (String) ID der Spalte, für die Prüfungsergebnisse gemeldet werden. Diese ID wird unter dem column-Parameter der jeweiligen Regelbindung angegeben.
last_modified (Zeitstempel) Zeitstempel der letzten Änderung der zu prüfenden table_id.
metadata_json_string (String) Schlüssel/Wert-Paare des Inhalts des Metadatenparameters, der in der Regelbindung oder während der Datenqualitätsprüfung angegeben wurde.
configs_hashsum (String) Hash-Summe des JSON-Dokuments, das die Regelbindung und alle zugehörigen Regeln, Regelbindungen, Zeilenfilter und Entitätskonfigurationen enthält. configs_hashsum ermöglicht das Tracking, wenn der Inhalt einer rule_binding-ID oder einer der referenzierten Konfigurationen geändert wurde.
dq_run_id (String) Eindeutige ID des Eintrags.
invocation_id (String) ID der Datenqualitätsprüfung. Alle Datenqualitätszusammenfassungen, die innerhalb derselben Datenqualitätsprüfung generiert wurden, haben dieselbe invocation_id.
progress_watermark (Boolesch) Legt fest, ob dieser Eintrag von der Datenqualitätsprüfung berücksichtigt wird, um den Höchstwert für die inkrementelle Prüfung zu festzustellen. Bei FALSE wird der entsprechende Eintrag beim Festlegen des Höchstwerts ignoriert. Diese Information ist nützlich, wenn Sie testweise Datenqualitätsprüfungen ausführen, die den Höchstwert nicht erhöhen sollen. Dataplex Universal Catalog füllt dieses Feld standardmäßig mit TRUE aus. Der Wert kann jedoch überschrieben werden, wenn das Argument --progress_watermark den Wert FALSE hat.
rows_validated (Ganzzahl) Gesamtzahl der Einträge, die nach dem Anwenden von row_filters und möglichen Höchstwertfiltern auf die Spalte incremental_time_filter_column_id geprüft wurden, falls angegeben.
complex_rule_validation_errors_count (Gleitkommazahl) Anzahl der Zeilen, die von einer CUSTOM_SQL_STATEMENT-Regel zurückgegeben werden.
complex_rule_validation_success_flag (Boolesch) Erfolgsstatus von CUSTOM_SQL_STATEMENT-Regeln.
success_count (Ganzzahl) Gesamtzahl der Einträge, die die Prüfung bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
success_percentage (Gleitkommazahl) Prozentsatz der geprüften Einträge, die die Prüfung bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
failed_count (Ganzzahl) Gesamtzahl der Einträge, die die Prüfung nicht bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
failed_percentage (Gleitkommazahl) Prozentsatz der geprüften Einträge, die die Prüfung nicht bestanden haben. Dieses Feld ist für CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
null_count (Ganzzahl) Gesamtzahl der Einträge, die während der Prüfung den Wert Null zurückgegeben haben. Dieses Feld ist für NOT_NULL- und CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
null_percentage (Gleitkommazahl) Prozentsatz der geprüften Einträge, die während der Prüfung den Wert Null zurückgegeben haben. Dieses Feld ist für NOT_NULL- und CUSTOM_SQL_STATEMENT-Regeln auf NULL gesetzt.
failed_records_query Für jede Regel, die fehlschlägt, wird in dieser Spalte eine Abfrage gespeichert, mit der Sie die fehlgeschlagenen Einträge abrufen können. Weitere Informationen finden Sie in diesem Dokument unter Fehler in fehlgeschlagenen Regeln mit failed_records_query beheben.

Für BigQuery-Entitäten wird für jede rule_binding eine Ansicht erstellt, die die SQL-Prüfungslogik der letzten Ausführung enthält. Sie finden diese Ansichten im BigQuery-Dataset, das im Argument --gcp-bq-dataset-id angegeben ist.

Kostenoptimierung

Datenqualitätsaufgaben werden als BigQuery-Jobs in Ihrem Projekt ausgeführt. Damit die Kosten für die Ausführung von Datenqualitätsjobs unter Kontrolle bleiben, zahlen Sie die BigQuery-Preise für das Projekt, in dem Ihre BigQuery-Jobs ausgeführt werden. Weitere Informationen finden Sie unter Einführung in die Arbeitslastverwaltung.

Inkrementelle Prüfungen

Oft werden Tabellen regelmäßig um neue Partitionen (neue Zeilen) ergänzt. Wenn Sie alte Partitionen nicht bei jeder Ausführung neu prüfen möchten, können Sie inkrementelle Prüfungen verwenden.

Für inkrementelle Prüfungen muss in Ihrer Tabelle eine Spalte vom Typ TIMESTAMP oder DATETIME vorhanden sein, in der der Spaltenwert monoton ansteigt. Sie können die Spalten verwenden, nach denen Ihre BigQuery-Tabelle partitioniert ist.

Falls Sie eine inkrementelle Prüfung festlegen möchten, geben Sie einen Wert für incremental_time_filter_column_id=TIMESTAMP/DATETIME type column als Teil einer Regelbindung an.

Wenn Sie eine Spalte angeben, werden in der Datenqualitätsaufgabe nur Zeilen mit einem TIMESTAMP-Wert berücksichtigt, der größer ist als der Zeitstempel der letzten Datenqualitätsaufgabe, die ausgeführt wurde.

Beispielhafte Spezifikationsdateien

Wenn Sie diese Beispiele verwenden möchten, erstellen Sie ein BigQuery-Dataset namens sales. Erstellen Sie dann eine Faktentabelle mit dem Namen sales_orders und fügen Sie Beispieldaten hinzu, indem Sie mit den folgenden GoogleSQL-Anweisungen eine Abfrage ausführen:

CREATE OR REPLACE TABLE sales.sales_orders
(
 id STRING NOT NULL,
 last_modified_timestamp TIMESTAMP,
 customer_id STRING,
 item_id STRING,
 amount NUMERIC,
 transaction_currency STRING
);

INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")

Beispiel 1

Im folgenden Codebeispiel werden Datenqualitätsprüfungen zum Prüfen dieser Werte erstellt:

  • amount: Die Werte sind null oder positive Zahlen.
  • item_id: Ein alphanumerischer String mit fünf alphabetischen Zeichen, gefolgt von 15 Ziffern.
  • transaction_currency: Ein zulässiger Währungstyp, wie durch eine statische Liste definiert. In der statischen Liste dieses Beispiels werden als Währungstypen GBP und JPY zugelassen. Diese Prüfung gilt nur für Zeilen, die als international markiert sind.
# The following `NONE` row filter is required.
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 # This filters for rows marked as international (INTNL).
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can apply to multiple tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

# Rule bindings associate rules to columns within tables.
rule_bindings:
  TRANSACTION_AMOUNT_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

  TRANSACTION_VALID_ITEM_ID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

  TRANSACTION_CURRENCY_VALID:
   entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Projekt-ID.
  • DATASET_ID: Die Dataset-ID.

Beispiel 2

Wenn die zu prüfende Tabelle Teil eines Dataplex Universal Catalog-Lake ist, können Sie die Tabellen mit der Lake- oder Zonennotation angeben. So können Sie Ergebnisse nach Lake oder Zone aggregieren. Sie können beispielsweise eine Punktzahl auf Zonenebene vergeben.

Wenn Sie dieses Beispiel verwenden möchten, erstellen Sie einen Dataplex Universal Catalog-Lake mit der Lake-ID operations und der Zonen-ID procurement. Fügen Sie dann die Tabelle sales_orders als Asset zur Zone hinzu.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}
rule_bindings:
 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Ersetzen Sie Folgendes:

  • PROJECT_ID: Ihre Projekt-ID.
  • REGION_ID: Die Regions-ID des Dataplex Universal Catalog-Lake, in dem sich die Tabelle befindet, zum Beispiel us-central1.

Beispiel 3

In diesem Beispiel wird Beispiel 2 um eine benutzerdefinierte SQL-Prüfung erweitert, um festzustellen, ob die ID-Werte eindeutig sind.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   rule_ids:
     - VALID_CURRENCY_ID

Beispiel 4

In diesem Beispiel wird Beispiel 3 durch das Hinzufügen inkrementeller Prüfungen mit der Spalte last_modified_timestamp erweitert. Sie können inkrementelle Prüfungen für eine oder mehrere Regelbindungen hinzufügen.

# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
 dataplex:
   projects: PROJECT_ID
   locations: REGION_ID
   lakes: operations
   zones: procurement

# You have to define a NONE row filter
row_filters:
 NONE:
   filter_sql_expr: |-
      True
 INTERNATIONAL_ITEMS:
   filter_sql_expr: |-
      REGEXP_CONTAINS(item_id, 'INTNL')

# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
  - consistency
  - correctness
  - duplication
  - completeness
  - conformance
  - integrity

# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
 NO_DUPLICATES_IN_COLUMN_GROUPS:
   rule_type: CUSTOM_SQL_STATEMENT
   dimension: duplication
   params:
     custom_sql_arguments:
       - column_names
     custom_sql_statement: |-
       select a.*
       from data a
       inner join (
         select
           $column_names
         from data
         group by $column_names
         having count(*) > 1
       ) duplicates
       using ($column_names)

 VALUE_ZERO_OR_POSITIVE:
   rule_type: CUSTOM_SQL_EXPR
   dimension: correctness
   params:
     custom_sql_expr: |-
       $column >= 0

 VALID_ITEM_ID:
   rule_type: REGEX
   dimension: conformance
   params:
     pattern: |-
       [A-Z]{5}[0-9]{15}

 VALID_CURRENCY_ID:
   rule_type: CUSTOM_SQL_EXPR
   dimension: integrity
   params:
     custom_sql_expr: |-
      $column in ('GBP', 'JPY')

#rule bindings associate rules to {table, column}

rule_bindings:
 TRANSACTIONS_UNIQUE:
   entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
   column_id: id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - NO_DUPLICATES_IN_COLUMN_GROUPS:
         column_names: "id"

 TRANSACTION_AMOUNT_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
   column_id: amount
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALUE_ZERO_OR_POSITIVE

 TRANSACTION_VALID_ITEM_ID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: item_id
   row_filter_id: NONE
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_ITEM_ID

 TRANSACTION_CURRENCY_VALID:
   entity_uri: dataplex://zones/procurement/entities/sales_orders
   column_id: transaction_currency
   row_filter_id: INTERNATIONAL_ITEMS
   incremental_time_filter_column_id: last_modified_timestamp
   rule_ids:
     - VALID_CURRENCY_ID

Fehler in fehlgeschlagenen Regeln mit failed_records_query beheben

Für jede Regel, die fehlschlägt, wird in der Übersichtstabelle eine Abfrage in der Spalte failed_records_query gespeichert, mit der Sie die fehlgeschlagenen Einträge abrufen können.

Zum Debuggen können Sie auch reference columns in Ihrer YAML-Datei verwenden. Damit können Sie die Ausgabe von failed_records_query mit den Originaldaten zusammenführen, um den gesamten Eintrag zu erhalten. Sie können beispielsweise eine primary_key-Spalte oder eine zusammengesetzte primary_key-Spalte als Referenzspalte angeben.

Referenzspalten angeben

Wenn Sie Referenzspalten generieren möchten, können Sie Ihrer YAML-Spezifikation Folgendes hinzufügen:

  1. Den Abschnitt reference_columns. Darin können Sie einen oder mehrere Referenzspaltensätze erstellen, wobei jeder Satz eine oder mehrere Spalten angibt.

  2. Den Abschnitt rule_bindings. In diesem Abschnitt können Sie einer Regelbindung eine Zeile hinzufügen, in der eine Referenzspalten-ID (reference_columns_id) für die Regeln in dieser Regelbindung angegeben wird. Dabei sollte es sich um einen der im Abschnitt reference_columns angegebenen Referenzspaltensätze handeln.

In der folgenden YAML-Datei wird beispielsweise ein reference_columns-Abschnitt angegeben und es werden die drei Spalten id, last_modified_timestamp und item_id als Teil des Satzes ORDER_DETAILS_REFERENCE_COLUMNS definiert. Im folgenden Beispiel wird die Tabelle sales_orders verwendet.

reference_columns:
  ORDER_DETAILS_REFERENCE_COLUMNS:
    include_reference_columns:
      - id
      - last_modified_timestamp
      - item_id
rules:
  VALUE_ZERO_OR_POSITIVE:
  rule_type: CUSTOM_SQL_EXPR
  params:
    custom_sql_expr: |-

row_filters:
NONE:
  filter_sql_expr: |-
      True

rule_bindings:
TRANSACTION_AMOUNT_VALID:
  entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
  column_id: amount
  row_filter_id: NONE
  reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
  rule_ids:
    - VALUE_ZERO_OR_POSITIVE

Abfrage fehlgeschlagener Einträge verwenden

Bei der Abfrage fehlgeschlagener Einträge wird für jeden Eintrag, in dem eine Regel fehlgeschlagen ist, eine Zeile generiert. Sie enthält den Spaltennamen und den Wert, die den Fehler ausgelöst haben, sowie die Werte für die Referenzspalten. Außerdem beinhaltet sie Metadaten, mit denen Sie sich auf die Ausführung der Datenqualitätsaufgabe beziehen können.

Sie sehen hier ein Beispiel der Ausgabe einer Abfrage fehlgeschlagener Einträge für die YAML-Datei, die unter Referenzspalten angeben beschrieben wird. Es wird ein Fehler für die Spalte amount und ein fehlgeschlagener Wert von -10 angezeigt. Außerdem wird der entsprechende Wert für die Referenzspalte erfasst.

_dq_validation_invocation_id _dq_validation_rule_binding_id _dq_validation_rule_id _dq_validation_column_id _dq_validation_column_value _dq_validation_dimension _dq_validation_simple_rule_row_is_valid _dq_validation_complex_rule_validation_errors_count _dq_validation_complex_rule_validation_success_flag id last_modified_timestamp item_id
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 TRANSACTION_AMOUNT_VALID VALUE_ZERO_OR_POSITIVE amount -10 FALSE order1 2022-01-22T02:30:06.321Z bad_item_id

Abfragen fehlgeschlagener Einträge für CUSTOM_SQL_STATEMENT-Regeln verwenden

Für CUSTOM_SQL_STATEMENT-Regeln enthalten Abfragen fehlgeschlagener Einträge die Spalte custom_sql_statement_validation_errors. Die Spalte custom_sql_statement_validation_errors ist eine verschachtelte Spalte mit Feldern, die der Ausgabe Ihrer SQL-Anweisung entsprechen. Referenzspalten sind nicht Teil der Abfragen fehlgeschlagener Einträge für CUSTOM_SQL_STATEMENT-Regeln.

Ihre CUSTOM_SQL_STATEMENT-Regel könnte beispielsweise so aussehen:

rules:
  TEST_RULE:
    rule_type: CUSTOM_SQL_STATEMENT
    custom_sql_arguments:
      - existing_id
      - replacement_id
    params:
     CUSTOM_SQL_STATEMENT: |-
       (SELECT product_name, product_key FROM data
       where $existing_id != $replacement_id)
Die Ergebnisse für dieses Beispiel enthalten eine oder mehrere Zeilen für die Spalte custom_sql_statement_validation_errors – eine Zeile pro Ergebnis mit existing_id!=replacement_id.

Wenn der Inhalt einer Zelle in dieser Spalte in JSON wiedergegeben wird, sieht er so aus:

{
  "custom_sql_statement_valdation_errors" :{
    "product_name"="abc"
    "product_key"="12345678"
    "_rule_binding_id"="your_rule_binding"
  }
}

Sie können diese Ergebnisse mit einem verschachtelten Verweis wie join on custom_sql_statement_valdation_errors.product_key mit der Originaltabelle verknüpfen.

Nächste Schritte