Dataplex Universal Catalog では、カスタムコードの実行のスケジュールを、1 回限りの実行、定期的なスケジュール、オンデマンドのいずれかに設定できます。オンデマンドはプレビュー版であり、API でのみ使用できます。顧客データ変換のスケジュールは、Spark(Java)、PySpark(Spark バージョン 3.2 に限定)、Spark SQL を使用して設定できます。Dataplex Universal Catalog は、サーバーレス Spark 処理と組み込みのサーバーレス スケジューラを使用してコードを実行します。
用語
- タスク
- Dataplex Universal Catalog タスクとは、Dataplex Universal Catalog にスケジュールに沿って実行させる作業を指します。このタスクは、コード、パラメータ、スケジュールをカプセル化します。
- ジョブ
ジョブは、Dataplex Universal Catalog タスクの 1 回の実行を指します。たとえば、タスクのスケジュールが毎日実行されるように設定されている場合、Dataplex Universal Catalog はジョブを毎日作成します。
2023 年 5 月 10 日以降に作成されたジョブの場合、[トリガー] フィールドにジョブの実行トリガーのタイプが表示されます。
ジョブ実行トリガーのタイプは次のとおりです。
RUN_REQUEST:
RunTask
API の呼び出しによってジョブが実行されたことを示します。TASK_CONFIG: タスクの
TriggerSpec
構成が原因でジョブが実行されたことを示します。
スケジュール モード
Dataplex Universal Catalog では、次のスケジュール モードがサポートされています。
- 1 回だけ実行
- このモードは、タスクを 1 回だけ実行するために使用します。すぐに実行することも、後で設定した時刻に実行することもできます。タスクをすぐに実行した場合、実行が始まるまで最大 2 分かかる場合があります。
- スケジュールに従って実行する
- このモードは、タスクを一定の頻度で実行するために使用します。サポートされている繰り返しパターンは、毎日、毎週、毎月、またはカスタムです。
- オンデマンドで実行
このモードは、以前に作成したタスクをオンデマンドで実行するために使用します。オンデマンド実行モードは、
RunTask
API でのみサポートされています。ジョブがオンデマンドで実行されると、Dataplex Universal Catalog は既存のパラメータを使用してジョブを作成します。ジョブを実行するには、ExecutionSpec
引数とラベルを指定します。
始める前に
Dataproc API を有効にします。
ネットワークとサブネットワークでプライベート Google アクセスを有効にします。Dataplex Universal Catalog タスクで使用するネットワークで、プライベート Google アクセスを有効にします。Dataplex Universal Catalog タスクの作成時にネットワークやサブネットワークを指定しない場合、Dataplex Universal Catalog はデフォルトのサブネットワークを使用します。その場合、デフォルトのサブネットワークでプライベート Google アクセスを有効にする必要があります。
サービス アカウントを作成します。Dataplex Universal Catalog タスクをスケジュールするには、サービス アカウントが必要です。サービス アカウントは、タスクを実行するプロジェクトに属している必要があります。このサービス アカウントには次の権限が必要です。
処理中の BigQuery や Cloud Storage データへのアクセス権。
タスクを実行するプロジェクトに対する Dataproc ワーカーロール権限。
タスクがレイクに接続されている Dataproc Metastore インスタンスの読み取りや更新を行う必要がある場合、サービス アカウントには Dataproc Metastore 閲覧者または編集者のロールが必要です。このロールは、Dataplex Universal Catalog レイクが設定されているプロジェクトで付与する必要があります。
タスクが Spark SQL ジョブの場合は、サービス アカウントに Dataplex Universal Catalog デベロッパーのロールが必要です。このロールは、Dataplex Universal Catalog レイクが設定されているプロジェクトで付与する必要があります。
タスクが Spark SQL ジョブの場合は、結果が書き込まれるバケットに対する Cloud Storage 管理者権限が必要です。
Spark SQL タスクとカスタム Spark タスクのスケジュールを設定して実行するには、Dataplex Universal Catalog メタデータ リーダー(
roles/dataplex.metadataReader
)、Dataplex Universal Catalog 閲覧者(roles/dataplex.viewer
)、Dataproc Metastore メタデータ ユーザー(roles/metastore.metadataUser
)の IAM ロールがサービス アカウントに対して付与されている必要があります。
ジョブを送信するユーザーに、サービス アカウントのサービス アカウント ユーザーロール(
roles/iam.serviceAccountUser
)を付与します。手順については、サービス アカウントへのアクセス権を管理するをご覧ください。Dataplex Universal Catalog レイク サービス アカウントに、サービス アカウントを使用する権限を付与します。Dataplex Universal Catalog レイク サービス アカウントは、Google Cloud コンソールの [レイクの詳細] ページで確認できます。
Dataplex Universal Catalog レイクを含むプロジェクトが、タスクが実行されるプロジェクトと異なる場合は、Dataplex Universal Catalog レイクのサービス アカウントに、タスクを実行するプロジェクトの Dataproc 編集者のロールを付与します。
必要なコード アーティファクト(JAR、Python、SQL スクリプト ファイル)またはアーカイブ ファイル(
.jar
、.tar
、.tar.gz
、.tgz
、.zip
)を Cloud Storage パスに配置します。サービス アカウントに、これらのコード アーティファクトを保存する Cloud Storage バケットに対する必要な
storage.objects.get
権限が付与されていることを確認してください。
Spark(Java または Python)タスクをスケジュールする
コンソール
Google Cloud コンソールで、[Dataplex Universal Catalog] ページに移動します。
[プロセス] ビューに移動します。
[タスクを作成] をクリックします。
[カスタム Spark タスクの作成] で、[タスクの作成] をクリックします。
Dataplex Universal Catalog レイクを選択します。
タスク名を指定します。
タスクの [ID] を作成します。
[タスクの構成] セクションの [タイプ] で、[Spark] または [PySpark] を選択します。
関連する引数を入力します。
[サービス アカウント] フィールドに、カスタム Spark タスクが実行できるユーザー サービス アカウントを入力します。
[続行] をクリックします。
省略可: スケジュールを設定する: [1 回実行] または [繰り返し] を選択します。必須フィールドに入力します。
[続行] をクリックします。
省略可: [リソースのカスタマイズ] と [その他の設定の追加] を行います。
[作成] をクリックします。
gcloud
gcloud CLI コマンドを使用して Spark(Java / Python)タスクのスケジュールを設定できます。次の表に、使用する必須パラメータとオプション パラメータを示します。
パラメータ | 説明 |
---|---|
--lake |
Dataplex Universal Catalog サービスのレイクリソースのレイク ID。 |
--location |
Dataplex Universal Catalog サービスのロケーション。 |
--spark-main-class |
ドライバのメインクラス。クラスを含む jar ファイルは、デフォルトの CLASSPATH に存在する必要があります。 |
--spark-main-jar-file-uri |
メインクラスを含む jar ファイルの Cloud Storage URI。 |
--spark-archive-uris |
省略可: 各エグゼキュータの作業ディレクトリに抽出されるアーカイブの Cloud Storage URI。サポートされているファイル形式: .jar 、.tar 、.tar.gz 、.tgz 、.zip |
--spark-file-uris |
省略可: 各エグゼキュータの作業ディレクトリに配置されるファイルの Cloud Storage URI。 |
--batch-executors-count |
省略可: ジョブ エグゼキュータの合計数。デフォルト値は 2 です。 |
--batch-max-executors-count |
省略可: 構成可能なエグゼキュータの最大数。デフォルト値は 1,000 です。batch-max-executors-count が batch-executors-count より大きい場合、Dataplex Universal Catalog は自動スケーリングを有効にします。 |
--container-image-java-jars |
省略可: クラスパスに追加する Java JAR のリスト。有効な入力には、Jar バイナリへの Cloud Storage URI が含まれます。 例: gs://bucket-name/my/path/to/file.jar 。 |
--container-image-properties |
省略可: プロパティキー(prefix:property 形式で指定します)。例: core:hadoop.tmp.dir 。詳細については、クラスタ プロパティをご覧ください。 |
--vpc-network-tags |
省略可: ジョブに適用するネットワーク タグのリスト。 |
--vpc-network-name |
省略可: ジョブが実行される Virtual Private Cloud ネットワーク。デフォルトでは、Dataplex Universal Catalog はプロジェクト内の Default という名前の VPC ネットワークを使用します。--vpc-network-name または --vpc-sub-network-name のいずれか 1 つだけを使用する必要があります。 |
--vpc-sub-network-name |
省略可: ジョブが実行される VPC サブネットワーク。--vpc-sub-network-name または --vpc-network-name のいずれか 1 つだけを使用する必要があります。 |
--trigger-type |
ユーザー指定タスクのトリガータイプ。値は次のいずれかにする必要があります。ON_DEMAND - タスクの作成後 1 回だけタスクが実行されます。RECURRING - タスクはスケジュールに沿って定期的に実行されます。
|
--trigger-start-time |
省略可: タスクの初回実行時刻。形式は、「{year}-{month}-{day}T{hour}:{min}:{sec}Z」で、タイムゾーンは UTC です。たとえば、「2017-01-15T01:30:00Z」は 2017 年 1 月 15 日 01:30 UTC をエンコードしたものです。この値が指定されていない場合、タスクは、送信後(トリガータイプが ON_DEMAND の場合)、または指定されたスケジュール(トリガータイプが RECURRING の場合)で実行されます。 |
--trigger-disabled |
省略可: タスクの実行を止めます。このパラメータは、すでに実行中のタスクをキャンセルするのではなく、RECURRING タスクを一時的に無効にします。 |
--trigger-max-retires |
省略可: 中止するまでの再試行回数。失敗したタスクを再試行しない場合は、値を 0 に設定します。 |
--trigger-schedule |
タスクを定期的に実行するための Cron スケジュール。 |
--description |
省略可: タスクの説明。 |
--display-name |
省略可: タスクの表示名。 |
--labels |
省略可: 追加するラベルの KEY=VALUE ペアのリスト。 |
--execution-args |
省略可: ドライバに渡す引数。引数には、Key-Value ペアを混在させることが可能です。実行時の引数として、Key-Value ペアのカンマ区切りのリストを渡すことができます。位置引数を渡すには、キーを TASK_ARGS に設定し、値をすべての位置引数のカンマ区切り文字列に設定します。カンマ以外の区切り文字を使用する場合は、エスケープをご覧ください。key-value と位置引数が一緒に渡される場合は、TASK_ARGS が最後の引数として渡されます。 |
--execution-service-account |
タスクの実行に使用するサービス アカウント。 |
--max-job-execution-lifetime |
省略可: ジョブ実行の有効期限が切れるまでの最長期間。 |
--container-image |
省略可: ジョブ ランタイム環境のカスタム コンテナ イメージ。指定しない場合は、デフォルトのコンテナ イメージが使用されます。 |
--kms-key |
省略可: 暗号化に使用する Cloud KMS 鍵。形式は次のとおりです。projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name} |
Java の例
glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>
PySpark の例
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>
REST
タスクを作成するには、API Explorer を使用します。
Spark SQL タスクのスケジュールを設定する
gcloud
Spark SQL タスクのスケジュールを設定するには、Spark(Java または Python)タスクをスケジュール設定すると同じ gcloud CLI コマンドを実行しますが、次のパラメータを追加します。
パラメータ | 説明 |
---|---|
--spark-sql-script |
SQL クエリテキスト。spark-sql-script または spark-sql-script-file が必要です。 |
--spark-sql-script-file |
クエリファイルへの参照。この値は、クエリファイルの Cloud Storage URI か、SQL スクリプト コンテンツへのパスです。spark-sql-script または spark-sql-script-file が必要です。 |
--execution-args |
Spark SQL タスクの場合、次の引数は必須であり、位置引数として渡す必要があります。--output_location, <GCS uri of the output directory> --output_format, <output file format> サポートされている形式は、CSV ファイル、JSON ファイル、parquet、orc です。 |
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>
REST
タスクを作成するには、API Explorer を使用します。
タスクをモニタリングする
コンソール
Google Cloud コンソールで、[Dataplex Universal Catalog] ページに移動します。
[プロセス] ビューに移動します。
[タスク] タブには、タスク テンプレート タイプでフィルタされたタスクのリストが表示されます。
[名前] 列で、表示するタスクをクリックします。
表示するタスクの [ジョブ ID] をクリックします。
Google Cloud コンソールで Dataproc ページが開き、モニタリングと出力の詳細を表示できます。
gcloud
次の表に、タスクをモニタリングするための gcloud CLI コマンドを一覧表示します。
アクション | gcloud CLI コマンド |
---|---|
タスクの一覧表示 | gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id> |
タスクの詳細の表示 | gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
タスクのジョブの一覧表示 | gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> |
ジョブの詳細の表示 | gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
Dataplex Universal Catalog は、Dataproc Serverless(バッチ)でジョブを実行します。Dataplex Universal Catalog ジョブの実行ログを表示する手順は、次のとおりです。
Dataproc Serverless(バッチ)のジョブ ID を取得します。次のコマンドを実行します。
gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
ログを表示します。前のコマンドの実行で取得したジョブ ID を使用して、次のコマンドを実行します。
gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
REST
スケジュールを管理する
Google Cloud コンソールでは、Dataplex Universal Catalog 内でタスクのスケジュールの編集、タスクの削除、進行中のジョブのキャンセルができます。次の表に、これらのアクションに対する gcloud CLI コマンドを一覧表示します。
アクション | gcloud CLI コマンド |
---|---|
タスクのスケジュールを編集する | gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id> |
タスクを削除する | gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id> |
ジョブをキャンセルする | gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id> |
次のステップ
- Dataproc テンプレートを確認する。
- 事前ビルド済みのテンプレートを試して、Dataplex Universal Catalog Cloud Storage アセットから BigQuery にデータを段階的に移動する。
- Dataplex Universal Catalog タスクのアラートと通知の設定を確認する。