Crea una canalización de Dataflow con Go
En esta página, se muestra cómo usar el SDK de Apache Beam para Go a fin de compilar un programa que defina una canalización. Luego, deberás ejecutar la canalización de manera local y en el servicio de Dataflow. Para obtener una introducción a la canalización de WordCount, consulta el video Cómo usar WordCount en Apache Beam.
Antes de comenzar
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, and Cloud Resource Manager APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin
), which contains theserviceusage.services.enable
permission. Learn how to grant roles.gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Otorga roles a tu cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Reemplaza
PROJECT_ID
con el ID del proyecto. - Reemplaza
PROJECT_NUMBER
por el número del proyecto. Para encontrar el número de tu proyecto, consulta Identifica proyectos o usa el comandogcloud projects describe
. - Reemplaza
SERVICE_ACCOUNT_ROLE
por cada rol individual.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Estándar). -
Define la ubicación de almacenamiento de la siguiente manera:
US
(Estados Unidos). -
Sustituye
BUCKET_NAME
por un nombre de segmento único. No incluyas información sensible en el nombre del segmento, ya que este espacio de nombres es público y visible para todos los usuarios. - Copia el ID del proyecto de Google Cloud y el nombre del bucket de Cloud Storage. Necesitarás estos valores más adelante en esta guía de inicio rápido.
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
Configura tu entorno de desarrollo
El SDK de Apache Beam es un modelo de programación de código abierto para canalizaciones de datos. Debes definir una canalización con un programa de Apache Beam y, luego, elegir un ejecutor, como Dataflow, para ejecutar tu canalización.
Te recomendamos que uses la versión más reciente de Go cuando trabajes con el SDK de Apache Beam para Go. Si no tienes instalada la versión más reciente de Go, usa la guía de descarga e instalación de Go para descargar e instalar Go en tu sistema operativo específico.
Para verificar la versión de Go que instalaste, ejecuta el siguiente comando en tu terminal local:
go version
Ejecuta el ejemplo de recuento de palabras de Beam
El SDK de Apache Beam para Go incluye un ejemplo de canalización wordcount
.
El ejemplo de wordcount
realiza lo siguiente:
- Lee un archivo de texto como entrada. De manera predeterminada, lee un archivo de texto ubicado en un bucket de Cloud Storage con el nombre del recurso
gs://dataflow-samples/shakespeare/kinglear.txt
. - Analiza cada línea en palabras.
- Realiza un recuento de frecuencia en las palabras con asignación de token.
Para ejecutar la versión más reciente del ejemplo de wordcount
de Beam en tu máquina local, sigue estos pasos:
Usa el comando
git clone
para clonar el repositorio de GitHubapache/beam
:git clone https://github.com/apache/beam.git
Cambia al directorio
beam/sdks/go
:cd beam/sdks/go
Usa el siguiente comando para ejecutar la canalización:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
La marca
input
especifica el archivo que se leerá y la marcaoutput
especifica el nombre del archivo para el resultado del recuento de frecuencia.
Una vez completada la canalización, ve los resultados de salida:
more outputs*
Para salir, presiona q.
Modifica el código de canalización
La canalización wordcount
de Beam distingue mayúsculas de minúsculas. En los pasos siguientes, se muestra cómo crear tu propio módulo de Go, modificar la canalización de wordcount
para que no distinga mayúsculas de minúsculas, y ejecutarla en Dataflow.
Crea un módulo de Go
Para realizar cambios en el código de la canalización, sigue estos pasos.
Crea un directorio para tu módulo de Go en la ubicación que elijas:
mkdir wordcount
cd wordcount
Crea un módulo de Go. Para este ejemplo, usa
example/dataflow
como la ruta de acceso del módulo.go mod init example/dataflow
Descarga la copia más reciente del código de
wordcount
del repositorio de GitHub de Apache Beam. Coloca este archivo en el directoriowordcount
que creaste.Si usas un sistema operativo que no sea Linux, debes obtener el paquete
unix
de Go. Este paquete es necesario para ejecutar canalizaciones en el servicio de Dataflow.go get -u golang.org/x/sys/unix
Asegúrate de que el archivo
go.mod
coincida con el código fuente del módulo:go mod tidy
Ejecuta la canalización sin modificar
Verifica que la canalización de wordcount
sin modificar se ejecute de forma local.
Desde la terminal, compila y ejecuta la canalización de manera local:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Observa los resultados de salida:
more outputs*
Para salir, presiona q.
Cambia el código de canalización
Si deseas cambiar la canalización para que no distinga mayúsculas de minúsculas, modifica el código a fin de aplicar la función strings.ToLower
a todas las palabras.
En el editor que prefieras, abre el archivo
wordcount.go
.Examina el bloque
init
(se quitaron los comentarios para mayor claridad):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }
Agrega una línea nueva para registrar la función
strings.ToLower
:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }
Examina la función
CountWords
:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }
Para dejar en minúsculas las palabras, agrega un ParDo que aplique
strings.ToLower
a cada palabra:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }
Guarda el archivo.
Ejecuta la canalización actualizada de forma local
Ejecuta la canalización wordcount
actualizada de forma local y verifica que el resultado haya cambiado.
Compila y ejecuta la canalización
wordcount
modificada:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
Ve los resultados de la canalización modificada: Todas las palabras deben estar en minúsculas.
more outputs*
Para salir, presiona q.
Ejecuta la canalización en el servicio de Dataflow
Para ejecutar el ejemplo de wordcount
actualizado en el servicio de Dataflow, usa el siguiente comando:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
Reemplaza lo siguiente:
BUCKET_NAME
: el nombre del bucket de Cloud Storage.PROJECT_ID
: el ID del proyecto de Google Cloud.DATAFLOW_REGION
: la región en la que deseas implementar el trabajo de Dataflow. Por ejemplo,europe-west1
. Para obtener una lista de las ubicaciones disponibles, consulta Ubicaciones de Dataflow. La marca--region
anula la región predeterminada que está configurada en el servidor de metadatos, el cliente local o las variables de entorno.
Ve tus resultados
Puedes ver una lista de tus trabajos de Dataflow en la consola de Google Cloud. En la consola de Google Cloud, ve a la página Trabajos de Dataflow.
En la página Trabajos, se muestran detalles del trabajo wordcount
, incluido un estado En ejecución primero y, luego, Correcto.
Cuando ejecutas una canalización con Dataflow, los resultados se almacenan en un bucket de Cloud Storage. Para ver los resultados, usa la consola de Google Cloud o la terminal local.
Console
Para ver los resultados en la consola de Google Cloud, ve a la página Buckets de Cloud Storage.
En la lista de buckets de tu proyecto, haz clic en el bucket de almacenamiento que creaste antes. Los archivos de salida que creó tu trabajo se muestran en el directorio results
.
Terminal
Consulta los resultados desde tu terminal o mediante Cloud Shell.
Para enumerar los archivos de salida, usa el comando
gcloud storage ls
:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
Reemplaza
BUCKET_NAME
por el nombre del bucket de Cloud Storage de salida especificado.Para ver los resultados en los archivos de salida, usa el comando
gcloud storage cat
:gcloud storage cat gs://BUCKET_NAME/results/outputs*
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Google Cloud que tiene los recursos.
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Si conservas tu proyecto, revoca los roles que otorgaste a la cuenta de servicio predeterminada de Compute Engine. Ejecuta el siguiente comando una vez para cada uno de los siguientes roles de IAM:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke