目标
使用 Java、Scala 或 Python 编写简单的 wordcount Spark 作业,然后在 Dataproc 集群上运行该作业。
费用
在本文档中,您将使用 Google Cloud的以下收费组件:
- Compute Engine
- Dataproc
- Cloud Storage
您可使用价格计算器根据您的预计使用情况来估算费用。
准备工作
运行以下步骤为运行本教程中的代码做准备。
设置项目。如有必要,请设置一个启用了 Dataproc、Compute Engine 和 Cloud Storage API 并在本地机器上安装了 Google Cloud CLI 的项目。
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataproc, Compute Engine, and Cloud Storage APIs.
-
Create a service account:
-
In the Google Cloud console, go to the Create service account page.
Go to Create service account - Select your project.
-
In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.
In the Service account description field, enter a description. For example,
Service account for quickstart
. - Click Create and continue.
-
Grant the Project > Owner role to the service account.
To grant the role, find the Select a role list, then select Project > Owner.
- Click Continue.
-
Click Done to finish creating the service account.
Do not close your browser window. You will use it in the next step.
-
-
Create a service account key:
- In the Google Cloud console, click the email address for the service account that you created.
- Click Keys.
- Click Add key, and then click Create new key.
- Click Create. A JSON key file is downloaded to your computer.
- Click Close.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
创建 Cloud Storage 存储分区。您需要使用 Cloud Storage 来保存教程数据。如果您没有可用的 Cloud Storage 存储分区,请在项目中创建新存储分区。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
设置本地环境变量。在本地机器上设置环境变量。设置您的 Google Cloud 项目 ID 以及您将在本教程中使用的 Cloud Storage 存储桶的名称。此外,还要提供现有或新 Dataproc 集群的名称和区域。您可以在下一步中创建要用于本教程的集群。
PROJECT=project-id
BUCKET_NAME=bucket-name
CLUSTER=cluster-name
REGION=cluster-region Example: "us-central1"
创建 Dataproc 集群。运行以下命令,在指定的 Compute Engine 地区中创建单节点 Dataproc 集群。
gcloud dataproc clusters create ${CLUSTER} \ --project=${PROJECT} \ --region=${REGION} \ --single-node
将公共数据复制到 Cloud Storage 存储分区。将公开数据莎士比亚作品文本片段复制到 Cloud Storage 存储桶的
input
文件夹中:gcloud storage cp gs://pub/shakespeare/rose.txt \ gs://${BUCKET_NAME}/input/rose.txt
设置 Java (Apache Maven)、Scala (SBT) 或 Python 开发环境。
准备 Spark wordcount 作业
在下面选择一个标签页,按照步骤准备作业软件包或文件以提交到集群。您可以准备以下作业类型之一:
- Java 中的 Spark 作业使用 Apache Maven 创建一个 JAR 软件包
- Scala 中的 Spark 作业使用 SBT 创建一个 JAR 软件包
- Python (PySpark) 中的 Spark 作业
Java
- 将
pom.xml
文件复制到本地机器。 以下pom.xml
文件指定 Scala 和 Spark 库依赖项,它们具有provided
范围,指示 Dataproc 集群将在运行时提供这些库。pom.xml
文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为gs://
的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>dataproc.codelab</groupId> <artifactId>word-count</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>Scala version, for example,
2.11.8
</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_Scala major.minor.version, for example,2.11
</artifactId> <version>Spark version, for example,2.3.1
</version> <scope>provided</scope> </dependency> </dependencies> </project> - 将下面列出的
WordCount.java
代码复制到您的本地机器。- 创建一组路径为
src/main/java/dataproc/codelab
的目录:mkdir -p src/main/java/dataproc/codelab
- 将
WordCount.java
复制到您的本地机器的src/main/java/dataproc/codelab
:cp WordCount.java src/main/java/dataproc/codelab
WordCount.java 是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。
package dataproc.codelab; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class WordCount { public static void main(String[] args) { if (args.length != 2) { throw new IllegalArgumentException("Exactly 2 arguments are required: <inputUri> <outputUri>"); } String inputPath = args[0]; String outputPath = args[1]; JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("Word Count")); JavaRDD<String> lines = sparkContext.textFile(inputPath); JavaRDD<String> words = lines.flatMap( (String line) -> Arrays.asList(line.split(" ")).iterator() ); JavaPairRDD<String, Integer> wordCounts = words.mapToPair( (String word) -> new Tuple2<>(word, 1) ).reduceByKey( (Integer count1, Integer count2) -> count1 + count2 ); wordCounts.saveAsTextFile(outputPath); } }
- 创建一组路径为
- 构建软件包。
如果构建成功,则系统会创建mvn clean package
target/word-count-1.0.jar
。 - 将软件包暂存到 Cloud Storage。
gcloud storage cp target/word-count-1.0.jar \ gs://${BUCKET_NAME}/java/word-count-1.0.jar
Scala
- 将
build.sbt
文件复制到本地机器。 以下build.sbt
文件指定 Scala 和 Spark 库依赖项,它们具有provided
范围,指示 Dataproc 集群将在运行时提供这些库。build.sbt
文件未指定 Cloud Storage 依赖项,因为连接器实现了标准 HDFS 接口。当 Spark 作业访问 Cloud Storage 集群文件时(URI 开头为gs://
的文件),系统会自动使用 Cloud Storage 连接器访问 Cloud Storage 中的文件scalaVersion := "Scala version, for example,
2.11.8
" name := "word-count" organization := "dataproc.codelab" version := "1.0" libraryDependencies ++= Seq( "org.scala-lang" % "scala-library" % scalaVersion.value % "provided", "org.apache.spark" %% "spark-core" % "Spark version, for example,2.3.1
" % "provided" ) - 将
word-count.scala
复制到本地机器。 这是 Java 中一个简单的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。package dataproc.codelab import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { if (args.length != 2) { throw new IllegalArgumentException( "Exactly 2 arguments are required: <inputPath> <outputPath>") } val inputPath = args(0) val outputPath = args(1) val sc = new SparkContext(new SparkConf().setAppName("Word Count")) val lines = sc.textFile(inputPath) val words = lines.flatMap(line => line.split(" ")) val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) wordCounts.saveAsTextFile(outputPath) } }
- 构建软件包。
如果构建成功,则系统会创建sbt clean package
target/scala-2.11/word-count_2.11-1.0.jar
。 - 将软件包暂存到 Cloud Storage。
gcloud storage cp target/scala-2.11/word-count_2.11-1.0.jar \ gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar
Python
- 将
word-count.py
复制到本地机器。 这是 Python 中一个简单的使用 PySpark 的 Spark 作业,可从 Cloud Storage 读取文本文件、执行字数统计,然后将文本文件结果写入 Cloud Storage。#!/usr/bin/env python import pyspark import sys if len(sys.argv) != 3: raise Exception("Exactly 2 arguments are required: <inputUri> <outputUri>") inputUri=sys.argv[1] outputUri=sys.argv[2] sc = pyspark.SparkContext() lines = sc.textFile(sys.argv[1]) words = lines.flatMap(lambda line: line.split()) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda count1, count2: count1 + count2) wordCounts.saveAsTextFile(sys.argv[2])
提交作业
运行以下 gcloud
命令,将 Wordcount 作业提交到 Dataproc 集群。
Java
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/java/word-count-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Scala
gcloud dataproc jobs submit spark \ --cluster=${CLUSTER} \ --class=dataproc.codelab.WordCount \ --jars=gs://${BUCKET_NAME}/scala/word-count_2.11-1.0.jar \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
Python
gcloud dataproc jobs submit pyspark word-count.py \ --cluster=${CLUSTER} \ --region=${REGION} \ -- gs://${BUCKET_NAME}/input/ gs://${BUCKET_NAME}/output/
查看输出
作业完成后,运行以下 gcloud CLI 命令以查看字数统计输出。
gcloud storage cat gs://${BUCKET_NAME}/output/*
字数统计输出应类似于以下内容:
(a,2) (call,1) (What's,1) (sweet.,1) (we,1) (as,1) (name?,1) (any,1) (other,1) (rose,1) (smell,1) (name,1) (would,1) (in,1) (which,1) (That,1) (By,1)
清理
完成本教程后,您可以清理您创建的资源,让它们停止使用配额,以免产生费用。以下部分介绍如何删除或关闭这些资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的项目。
要删除项目,请执行以下操作:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
删除 Dataproc 集群
您可能希望只删除项目中的集群,而不是项目。
删除 Cloud Storage 存储分区
Google Cloud 控制台
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
命令行
-
删除存储分区:
gcloud storage buckets delete BUCKET_NAME
后续步骤
- 请参阅 Spark 作业微调提示