使用 Python 版 Cloud 客户端库

本教程提供了一项 Cloud Shell 演示,该演示使用 Python 版 Google Cloud 客户端库以编程方式调用 Dataproc gRPC API 来创建集群并将作业提交到该集群。

以下部分介绍 GitHub GoogleCloudPlatform/python-dataproc 代码库中包含的演示代码操作。

运行 Cloud Shell 演示

点击在 Cloud Shell 中打开 (Open in Cloud Shell) 以运行演示。

在 Cloud Shell 中打开

了解代码

应用默认凭据

本教程中的 Cloud Shell 演示使用 Google Cloud 项目凭据提供身份验证。在本地运行代码时,推荐的做法是使用服务账号凭据对代码进行身份验证。

创建 Dataproc 集群

创建集群时,设置了以下值:

  • 将在其中创建集群的项目
  • 将要创建集群的区域
  • 集群的名称
  • 集群配置,用于指定一个主节点和两个主要工作器

默认配置设置用于其余集群设置。您可以替换默认集群配置设置。例如,您可以添加次要虚拟机(默认值 = 0)或为集群指定非默认 VPC 网络。如需了解详情,请参阅 CreateCluster

def quickstart(project_id, region, cluster_name, gcs_bucket, pyspark_file):
    # Create the cluster client.
    cluster_client = dataproc_v1.ClusterControllerClient(
        client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    )

    # Create the cluster config.
    cluster = {
        "project_id": project_id,
        "cluster_name": cluster_name,
        "config": {
            "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-2"},
            "worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-2"},
        },
    }

    # Create the cluster.
    operation = cluster_client.create_cluster(
        request={"project_id": project_id, "region": region, "cluster": cluster}
    )
    result = operation.result()

    print(f"Cluster created successfully: {result.cluster_name}")

提交作业

以下值已设置为提交作业:

  • 将在其中创建集群的项目
  • 将要创建集群的区域
  • 作业配置,用于指定 PySpark 作业的集群名称和 Cloud Storage 文件路径 (URI)

如需了解详情,请参阅 SubmitJob

# Create the job client.
job_client = dataproc_v1.JobControllerClient(
    client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the job config.
job = {
    "placement": {"cluster_name": cluster_name},
    "pyspark_job": {"main_python_file_uri": f"gs://{gcs_bucket}/{spark_filename}"},
}

operation = job_client.submit_job_as_operation(
    request={"project_id": project_id, "region": region, "job": job}
)
response = operation.result()

# Dataproc job output is saved to the Cloud Storage bucket
# allocated to the job. Use regex to obtain the bucket and blob info.
matches = re.match("gs://(.*?)/(.*)", response.driver_output_resource_uri)

output = (
    storage.Client()
    .get_bucket(matches.group(1))
    .blob(f"{matches.group(2)}.000000000")
    .download_as_bytes()
    .decode("utf-8")
)

print(f"Job finished successfully: {output}\r\n")

删除集群

以下值已设置为删除集群:

  • 将在其中创建集群的项目
  • 将要创建集群的区域
  • 集群的名称

如需了解详情,请参阅 DeleteCluster

# Delete the cluster once the job has terminated.
operation = cluster_client.delete_cluster(
    request={
        "project_id": project_id,
        "region": region,
        "cluster_name": cluster_name,
    }
)
operation.result()

print(f"Cluster {cluster_name} successfully deleted.")