Spark と SparkSQL のカスタムタスクのスケジュールを設定する

Dataplex Universal Catalog では、カスタムコードの実行のスケジュールを、1 回限りの実行、定期的なスケジュール、オンデマンドのいずれかに設定できます。オンデマンドはプレビュー版であり、API でのみ使用できます。顧客データ変換のスケジュールは、Spark(Java)、PySpark(Spark バージョン 3.2 に限定)、Spark SQL を使用して設定できます。Dataplex Universal Catalog は、サーバーレス Spark 処理と組み込みのサーバーレス スケジューラを使用してコードを実行します。

用語

タスク
Dataplex Universal Catalog タスクとは、Dataplex Universal Catalog にスケジュールに沿って実行させる作業を指します。このタスクは、コード、パラメータ、スケジュールをカプセル化します。
ジョブ

ジョブは、Dataplex Universal Catalog タスクの 1 回の実行を指します。たとえば、タスクのスケジュールが毎日実行されるように設定されている場合、Dataplex Universal Catalog はジョブを毎日作成します。

2023 年 5 月 10 日以降に作成されたジョブの場合、[トリガー] フィールドにジョブの実行トリガーのタイプが表示されます。

ジョブ実行トリガーのタイプは次のとおりです。

  • RUN_REQUEST: RunTask API の呼び出しによってジョブが実行されたことを示します。

  • TASK_CONFIG: タスクの TriggerSpec 構成が原因でジョブが実行されたことを示します。

スケジュール モード

Dataplex Universal Catalog では、次のスケジュール モードがサポートされています。

1 回だけ実行
このモードは、タスクを 1 回だけ実行するために使用します。すぐに実行することも、後で設定した時刻に実行することもできます。タスクをすぐに実行した場合、実行が始まるまで最大 2 分かかる場合があります。
スケジュールに従って実行する
このモードは、タスクを一定の頻度で実行するために使用します。サポートされている繰り返しパターンは、毎日、毎週、毎月、またはカスタムです。
オンデマンドで実行

このモードは、以前に作成したタスクをオンデマンドで実行するために使用します。オンデマンド実行モードは、RunTask API でのみサポートされています。ジョブがオンデマンドで実行されると、Dataplex Universal Catalog は既存のパラメータを使用してジョブを作成します。ジョブを実行するには、ExecutionSpec 引数とラベルを指定します。

始める前に

  1. Dataproc API を有効にします。

    Dataproc API を有効にする

  2. ネットワークとサブネットワークでプライベート Google アクセスを有効にします。Dataplex Universal Catalog タスクで使用するネットワークで、プライベート Google アクセスを有効にします。Dataplex Universal Catalog タスクの作成時にネットワークやサブネットワークを指定しない場合、Dataplex Universal Catalog はデフォルトのサブネットワークを使用します。その場合、デフォルトのサブネットワークでプライベート Google アクセスを有効にする必要があります。

  3. サービス アカウントを作成します。Dataplex Universal Catalog タスクをスケジュールするには、サービス アカウントが必要です。サービス アカウントは、タスクを実行するプロジェクトに属している必要があります。このサービス アカウントには次の権限が必要です。

    • 処理中の BigQuery や Cloud Storage データへのアクセス権。

    • タスクを実行するプロジェクトに対する Dataproc ワーカーロール権限。

    • タスクがレイクに接続されている Dataproc Metastore インスタンスの読み取りや更新を行う必要がある場合、サービス アカウントには Dataproc Metastore 閲覧者または編集者のロールが必要です。このロールは、Dataplex Universal Catalog レイクが設定されているプロジェクトで付与する必要があります。

    • タスクが Spark SQL ジョブの場合は、サービス アカウントに Dataplex Universal Catalog デベロッパーのロールが必要です。このロールは、Dataplex Universal Catalog レイクが設定されているプロジェクトで付与する必要があります。

    • タスクが Spark SQL ジョブの場合は、結果が書き込まれるバケットに対する Cloud Storage 管理者権限が必要です。

    • Spark SQL タスクとカスタム Spark タスクのスケジュールを設定して実行するには、Dataplex Universal Catalog メタデータ リーダー(roles/dataplex.metadataReader)、Dataplex Universal Catalog 閲覧者(roles/dataplex.viewer)、Dataproc Metastore メタデータ ユーザー(roles/metastore.metadataUser)の IAM ロールがサービス アカウントに対して付与されている必要があります。

  4. ジョブを送信するユーザーに、サービス アカウントのサービス アカウント ユーザーロール(roles/iam.serviceAccountUser)を付与します。手順については、サービス アカウントへのアクセス権を管理するをご覧ください。

  5. Dataplex Universal Catalog レイク サービス アカウントに、サービス アカウントを使用する権限を付与します。Dataplex Universal Catalog レイク サービス アカウントは、Google Cloud コンソールの [レイクの詳細] ページで確認できます。

  6. Dataplex Universal Catalog レイクを含むプロジェクトが、タスクが実行されるプロジェクトと異なる場合は、Dataplex Universal Catalog レイクのサービス アカウントに、タスクを実行するプロジェクトの Dataproc 編集者のロールを付与します。

  7. 必要なコード アーティファクト(JAR、Python、SQL スクリプト ファイル)またはアーカイブ ファイル(.jar.tar.tar.gz.tgz.zip)を Cloud Storage パスに配置します。

  8. サービス アカウントに、これらのコード アーティファクトを保存する Cloud Storage バケットに対する必要な storage.objects.get 権限が付与されていることを確認してください。

Spark(Java または Python)タスクをスケジュールする

コンソール

  1. Google Cloud コンソールで、[Dataplex Universal Catalog] ページに移動します。

    Dataplex Universal Catalog に移動

  2. [プロセス] ビューに移動します。

  3. [タスクを作成] をクリックします。

  4. [カスタム Spark タスクの作成] で、[タスクの作成] をクリックします。

  5. Dataplex Universal Catalog レイクを選択します。

  6. タスク名を指定します。

  7. タスクの [ID] を作成します。

  8. [タスクの構成] セクションの [タイプ] で、[Spark] または [PySpark] を選択します。

  9. 関連する引数を入力します。

  10. [サービス アカウント] フィールドに、カスタム Spark タスクが実行できるユーザー サービス アカウントを入力します。

  11. [続行] をクリックします。

  12. 省略可: スケジュールを設定する: [1 回実行] または [繰り返し] を選択します。必須フィールドに入力します。

  13. [続行] をクリックします。

  14. 省略可: [リソースのカスタマイズ] と [その他の設定の追加] を行います。

  15. [作成] をクリックします。

gcloud

gcloud CLI コマンドを使用して Spark(Java / Python)タスクのスケジュールを設定できます。次の表に、使用する必須パラメータとオプション パラメータを示します。

パラメータ 説明
--lake Dataplex Universal Catalog サービスのレイクリソースのレイク ID。
--location Dataplex Universal Catalog サービスのロケーション。
--spark-main-class ドライバのメインクラス。クラスを含む jar ファイルは、デフォルトの CLASSPATH に存在する必要があります。
--spark-main-jar-file-uri メインクラスを含む jar ファイルの Cloud Storage URI。
--spark-archive-uris 省略可: 各エグゼキュータの作業ディレクトリに抽出されるアーカイブの Cloud Storage URI。サポートされているファイル形式: .jar.tar.tar.gz.tgz.zip
--spark-file-uris 省略可: 各エグゼキュータの作業ディレクトリに配置されるファイルの Cloud Storage URI。
--batch-executors-count 省略可: ジョブ エグゼキュータの合計数。デフォルト値は 2 です。
--batch-max-executors-count 省略可: 構成可能なエグゼキュータの最大数。デフォルト値は 1,000 です。batch-max-executors-countbatch-executors-count より大きい場合、Dataplex Universal Catalog は自動スケーリングを有効にします。
--container-image-java-jars 省略可: クラスパスに追加する Java JAR のリスト。有効な入力には、Jar バイナリへの Cloud Storage URI が含まれます。
例: gs://bucket-name/my/path/to/file.jar
--container-image-properties 省略可: プロパティキー(prefix:property 形式で指定します)。
例: core:hadoop.tmp.dir
詳細については、クラスタ プロパティをご覧ください。
--vpc-network-tags 省略可: ジョブに適用するネットワーク タグのリスト。
--vpc-network-name 省略可: ジョブが実行される Virtual Private Cloud ネットワーク。デフォルトでは、Dataplex Universal Catalog はプロジェクト内の Default という名前の VPC ネットワークを使用します。
--vpc-network-name または --vpc-sub-network-name のいずれか 1 つだけを使用する必要があります。
--vpc-sub-network-name 省略可: ジョブが実行される VPC サブネットワーク。
--vpc-sub-network-name または --vpc-network-name のいずれか 1 つだけを使用する必要があります。
--trigger-type ユーザー指定タスクのトリガータイプ。値は次のいずれかにする必要があります。
ON_DEMAND - タスクの作成後 1 回だけタスクが実行されます。
RECURRING - タスクはスケジュールに沿って定期的に実行されます。
--trigger-start-time 省略可: タスクの初回実行時刻。形式は、「{year}-{month}-{day}T{hour}:{min}:{sec}Z」で、タイムゾーンは UTC です。たとえば、「2017-01-15T01:30:00Z」は 2017 年 1 月 15 日 01:30 UTC をエンコードしたものです。この値が指定されていない場合、タスクは、送信後(トリガータイプが ON_DEMAND の場合)、または指定されたスケジュール(トリガータイプが RECURRING の場合)で実行されます。
--trigger-disabled 省略可: タスクの実行を止めます。このパラメータは、すでに実行中のタスクをキャンセルするのではなく、RECURRING タスクを一時的に無効にします。
--trigger-max-retires 省略可: 中止するまでの再試行回数。失敗したタスクを再試行しない場合は、値を 0 に設定します。
--trigger-schedule タスクを定期的に実行するための Cron スケジュール
--description 省略可: タスクの説明。
--display-name 省略可: タスクの表示名。
--labels 省略可: 追加するラベルの KEY=VALUE ペアのリスト。
--execution-args 省略可: ドライバに渡す引数。引数には、Key-Value ペアを混在させることが可能です。実行時の引数として、Key-Value ペアのカンマ区切りのリストを渡すことができます。位置引数を渡すには、キーを TASK_ARGS に設定し、値をすべての位置引数のカンマ区切り文字列に設定します。カンマ以外の区切り文字を使用する場合は、エスケープをご覧ください。
key-value と位置引数が一緒に渡される場合は、TASK_ARGS が最後の引数として渡されます。
--execution-service-account タスクの実行に使用するサービス アカウント。
--max-job-execution-lifetime 省略可: ジョブ実行の有効期限が切れるまでの最長期間。
--container-image 省略可: ジョブ ランタイム環境のカスタム コンテナ イメージ。指定しない場合は、デフォルトのコンテナ イメージが使用されます。
--kms-key 省略可: 暗号化に使用する Cloud KMS 鍵。形式は次のとおりです。
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Java の例

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

PySpark の例

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

タスクを作成するには、API Explorer を使用します。

Spark SQL タスクのスケジュールを設定する

gcloud

Spark SQL タスクのスケジュールを設定するには、Spark(Java または Python)タスクをスケジュール設定すると同じ gcloud CLI コマンドを実行しますが、次のパラメータを追加します。

パラメータ 説明
--spark-sql-script SQL クエリテキスト。spark-sql-script または spark-sql-script-file が必要です。
--spark-sql-script-file クエリファイルへの参照。この値は、クエリファイルの Cloud Storage URI か、SQL スクリプト コンテンツへのパスです。spark-sql-script または spark-sql-script-file が必要です。
--execution-args Spark SQL タスクの場合、次の引数は必須であり、位置引数として渡す必要があります。
--output_location, <GCS uri of the output directory>
--output_format, <output file format>
サポートされている形式は、CSV ファイル、JSON ファイル、parquet、orc です。
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

タスクを作成するには、API Explorer を使用します。

タスクをモニタリングする

コンソール

  1. Google Cloud コンソールで、[Dataplex Universal Catalog] ページに移動します。

    Dataplex Universal Catalog に移動

  2. [プロセス] ビューに移動します。

  3. [タスク] タブには、タスク テンプレート タイプでフィルタされたタスクのリストが表示されます。

  4. [名前] 列で、表示するタスクをクリックします。

  5. 表示するタスクの [ジョブ ID] をクリックします。

    Google Cloud コンソールで Dataproc ページが開き、モニタリングと出力の詳細を表示できます。

gcloud

次の表に、タスクをモニタリングするための gcloud CLI コマンドを一覧表示します。

アクション gcloud CLI コマンド
タスクの一覧表示 gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
タスクの詳細の表示 gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
タスクのジョブの一覧表示 gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
ジョブの詳細の表示 gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

Dataplex Universal Catalog は、Dataproc Serverless(バッチ)でジョブを実行します。Dataplex Universal Catalog ジョブの実行ログを表示する手順は、次のとおりです。

  1. Dataproc Serverless(バッチ)のジョブ ID を取得します。次のコマンドを実行します。

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. ログを表示します。前のコマンドの実行で取得したジョブ ID を使用して、次のコマンドを実行します。

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

タスクまたはジョブget または list するには、API Explorer を使用します。

スケジュールを管理する

Google Cloud コンソールでは、Dataplex Universal Catalog 内でタスクのスケジュールの編集、タスクの削除、進行中のジョブのキャンセルができます。次の表に、これらのアクションに対する gcloud CLI コマンドを一覧表示します。

アクション gcloud CLI コマンド
タスクのスケジュールを編集する gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
タスクを削除する gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
ジョブをキャンセルする gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

次のステップ