El Conector de BigQuery para Apache Spark permite que los científicos de datos combinen el poder del motor SQL perfectamente escalable de BigQuery con las capacidades del aprendizaje automático de Apache Spark. En este instructivo, mostramos cómo usar Dataproc, BigQuery y Apache Spark ML para realizar el aprendizaje automático en un conjunto de datos.
Objetivos
Usa la regresión lineal para compilar un modelo de peso de un recién nacido como una función de cinco factores:- semanas de gestación
- edad de la madre
- edad del padre
- aumento de peso de la madre durante el embarazo
- puntuación de Apgar
Usa las siguientes herramientas:
- BigQuery, para preparar la tabla de entrada de regresión lineal, que se escribe en tu proyecto de Google Cloud
- Python, para consultar y administrar datos en BigQuery
- Apache Spark, para acceder a la tabla de regresión lineal resultante
- Spark ML, para compilar y evaluar el modelo
- Trabajo de PySpark de Dataproc para invocar funciones de Spark ML
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
- Compute Engine
- Dataproc
- BigQuery
Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.
Antes de comenzar
Un clúster de Dataproc tiene los componentes de Spark, incluido Spark ML, instalados. Para configurar un clúster de Dataproc y ejecutar el código en este ejemplo, necesitarás hacer (o haber hecho) lo siguiente:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, Compute Engine APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Enable the Dataproc, BigQuery, Compute Engine APIs.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
- Crear un clúster de Dataproc en tu proyecto. Tu clúster debe ejecutar una versión de Dataproc con Spark 2.0 o superior, (incluye las bibliotecas de aprendizaje automático).
Crear un subconjunto de datos de BigQuery natality
En esta sección, creas un conjunto de datos en tu proyecto y, luego, creas una tabla en el conjunto de datos al que copias el subconjunto de datos de tasa de natalidad del conjunto de datos de BigQuery de natalidad disponible públicamente. Más adelante en este instructivo, usarás los datos del subconjunto en esta tabla para predecir en peso del recién nacido en función de la edad de la madre, del padre y las semanas de gestación.
Puedes crear el subconjunto de datos con la consola de Google Cloud o ejecutar una secuencia de comandos de Python en tu máquina local.
Console
Crea un conjunto de datos en tu proyecto.
- Ve a la IU web de BigQuery.
- En el panel de navegación izquierdo, haz clic en el nombre del proyecto y, luego, en CREAR CONJUNTO DE DATOS.
- En el diálogo Crear conjunto de datos, realice lo siguiente:
- Para el ID de conjunto de datos (Dataset ID), ingresa "natality_regression".
- En Ubicación de los datos, puedes seleccionar una ubicación para el conjunto de datos. La ubicación predeterminada es
US multi-region
. Una vez que se crea un conjunto de datos, la ubicación no se puede cambiar. - En Vencimiento predeterminado de la tabla, selecciona una de las siguientes opciones:
- Nunca (predeterminado): Debes borrar la tabla manualmente.
- Cantidad de días: La tabla se borrará después de la cantidad de días especificada a partir del momento de su creación.
- En Encriptación, elige una de las siguientes opciones:
- propiedad de Google y administrada por Google con la tecnología de Google Cloud (predeterminada).
- Clave administrada por el cliente: Consulta la documentación sobre cómo proteger datos con claves de Cloud KMS.
- Haga clic en Crear conjunto de datos.
Ejecuta una consulta en el conjunto de datos público de natalidad y, luego, guarda los resultados de la consulta en una tabla nueva en tu conjunto de datos.
- Copia y pega la siguiente consulta en el Editor de consultas y, luego, haz clic en Ejecutar.
CREATE OR REPLACE TABLE natality_regression.regression_input as SELECT weight_pounds, mother_age, father_age, gestation_weeks, weight_gain_pounds, apgar_5min FROM `bigquery-public-data.samples.natality` WHERE weight_pounds IS NOT NULL AND mother_age IS NOT NULL AND father_age IS NOT NULL AND gestation_weeks IS NOT NULL AND weight_gain_pounds IS NOT NULL AND apgar_5min IS NOT NULL
- Cuando la consulta finalice (en aproximadamente un minuto), los resultados se guardarán como tabla "regression_input" de BigQuery en el conjunto de datos
natality_regression
de tu proyecto.
- Copia y pega la siguiente consulta en el Editor de consultas y, luego, haz clic en Ejecutar.
Python
Antes de probar este ejemplo, sigue las instrucciones de configuración para Python que se encuentran en la guía de inicio rápido de Dataproc sobre el uso de bibliotecas cliente. Si deseas obtener más información, consulta la documentación de referencia de la API de Python de Dataproc.
Para autenticarte en Dataproc, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.
Consulta la documentación sobre cómo configurar el entorno de desarrollo de Python para obtener instrucciones sobre cómo instalar Python y la Biblioteca cliente de Google Cloud para Python (necesaria para ejecutar el código). Se recomienda instalar y usar un
virtualenv
de Python.Copia y pega el código
natality_tutorial.py
que aparece a continuación en un shellpython
en tu máquina local. Presiona la tecla<return>
en el shell para ejecutar el código y crear un conjunto de datos de BigQuery "natality_regression" en tu proyecto predeterminado deGoogle Cloud con una tabla "regression_input" que se propaga con un subconjunto de los datosnatality
públicos.Confirma la creación del conjunto de datos
natality_regression
y la tablaregression_input
.
Ejecuta una regresión lineal
En esta sección, ejecutarás una regresión lineal de PySpark mediante el envío del trabajo al servicio de Dataproc con la consola de Google Cloud o a través del comando gcloud
desde una terminal local.
Console
Copia y pega el siguiente código en un archivo
natality_sparkml.py
nuevo en tu máquina local."""Run a linear regression using Apache Spark ML. In the following PySpark (Spark Python API) code, we take the following actions: * Load a previously created linear regression (BigQuery) input table into our Cloud Dataproc Spark cluster as an RDD (Resilient Distributed Dataset) * Transform the RDD into a Spark Dataframe * Vectorize the features on which the model will be trained * Compute a linear regression using Spark ML """ from pyspark.context import SparkContext from pyspark.ml.linalg import Vectors from pyspark.ml.regression import LinearRegression from pyspark.sql.session import SparkSession # The imports, above, allow us to access SparkML features specific to linear # regression as well as the Vectors types. # Define a function that collects the features of interest # (mother_age, father_age, and gestation_weeks) into a vector. # Package the vector in a tuple containing the label (`weight_pounds`) for that # row. def vector_from_inputs(r): return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]), float(r["father_age"]), float(r["gestation_weeks"]), float(r["weight_gain_pounds"]), float(r["apgar_5min"]))) sc = SparkContext() spark = SparkSession(sc) # Read the data from BigQuery as a Spark Dataframe. natality_data = spark.read.format("bigquery").option( "table", "natality_regression.regression_input").load() # Create a view so that Spark SQL queries can be run against the data. natality_data.createOrReplaceTempView("natality") # As a precaution, run a query in Spark SQL to ensure no NULL values exist. sql_query = """ SELECT * from natality where weight_pounds is not null and mother_age is not null and father_age is not null and gestation_weeks is not null """ clean_data = spark.sql(sql_query) # Create an input DataFrame for Spark ML using the above function. training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label", "features"]) training_data.cache() # Construct a new LinearRegression object and fit the training data. lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal") model = lr.fit(training_data) # Print the model summary. print("Coefficients:" + str(model.coefficients)) print("Intercept:" + str(model.intercept)) print("R^2:" + str(model.summary.r2)) model.summary.residuals.show()
Copia el archivo
natality_sparkml.py
local a un bucket de Cloud Storage en tu proyecto.gcloud storage cp natality_sparkml.py gs://bucket-name
Ejecuta la regresión desde la página Enviar un trabajo de Dataproc.
En el campo Archivo principal de Python, inserta la URI
gs://
del depósito de Cloud Storage donde se encuentra la copia del archivonatality_sparkml.py
.Selecciona
PySpark
como el Tipo de trabajo.Inserta
gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
en el campo Archivos jar. Esto hace que spark-bigquery-connector esté disponible para la aplicación PySpark en el tiempo de ejecución a fin de permitirle leer datos de BigQuery en un DataFrame de Spark.Completa los campos ID de trabajo, Región y Clúster.
Haz clic en Enviar para ejecutar el trabajo en tu clúster.
Cuando el trabajo finalice, el resumen del modelo del resultado de la regresión lineal aparecerá en la ventana de detalles del trabajo de Dataproc.
gcloud
Copia y pega el siguiente código en un archivo
natality_sparkml.py
nuevo en tu máquina local."""Run a linear regression using Apache Spark ML. In the following PySpark (Spark Python API) code, we take the following actions: * Load a previously created linear regression (BigQuery) input table into our Cloud Dataproc Spark cluster as an RDD (Resilient Distributed Dataset) * Transform the RDD into a Spark Dataframe * Vectorize the features on which the model will be trained * Compute a linear regression using Spark ML """ from pyspark.context import SparkContext from pyspark.ml.linalg import Vectors from pyspark.ml.regression import LinearRegression from pyspark.sql.session import SparkSession # The imports, above, allow us to access SparkML features specific to linear # regression as well as the Vectors types. # Define a function that collects the features of interest # (mother_age, father_age, and gestation_weeks) into a vector. # Package the vector in a tuple containing the label (`weight_pounds`) for that # row. def vector_from_inputs(r): return (r["weight_pounds"], Vectors.dense(float(r["mother_age"]), float(r["father_age"]), float(r["gestation_weeks"]), float(r["weight_gain_pounds"]), float(r["apgar_5min"]))) sc = SparkContext() spark = SparkSession(sc) # Read the data from BigQuery as a Spark Dataframe. natality_data = spark.read.format("bigquery").option( "table", "natality_regression.regression_input").load() # Create a view so that Spark SQL queries can be run against the data. natality_data.createOrReplaceTempView("natality") # As a precaution, run a query in Spark SQL to ensure no NULL values exist. sql_query = """ SELECT * from natality where weight_pounds is not null and mother_age is not null and father_age is not null and gestation_weeks is not null """ clean_data = spark.sql(sql_query) # Create an input DataFrame for Spark ML using the above function. training_data = clean_data.rdd.map(vector_from_inputs).toDF(["label", "features"]) training_data.cache() # Construct a new LinearRegression object and fit the training data. lr = LinearRegression(maxIter=5, regParam=0.2, solver="normal") model = lr.fit(training_data) # Print the model summary. print("Coefficients:" + str(model.coefficients)) print("Intercept:" + str(model.intercept)) print("R^2:" + str(model.summary.r2)) model.summary.residuals.show()
Copia el archivo
natality_sparkml.py
local a un bucket de Cloud Storage en tu proyecto.gcloud storage cp natality_sparkml.py gs://bucket-name
Envía el trabajo de Pyspark al servicio de Dataproc mediante la ejecución del comando
gcloud
, que se muestra a continuación, desde una ventana de la terminal en tu máquina local.- El valor de la marca --jars hace que spark-bigquery-connector esté disponible para el trabajo de PySpark en el entorno de ejecución a fin de que pueda leer datos de BigQuery en un DataFrame de Spark.
gcloud dataproc jobs submit pyspark \ gs://your-bucket/natality_sparkml.py \ --cluster=cluster-name \ --region=region \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- El valor de la marca --jars hace que spark-bigquery-connector esté disponible para el trabajo de PySpark en el entorno de ejecución a fin de que pueda leer datos de BigQuery en un DataFrame de Spark.
Cuando finaliza el trabajo, el resultado de la regresión lineal (resumen del modelo) aparece en la ventana de la terminal.
<<< # Imprime el resumen del modelo. ... print "Coefficients:" + str(model.coefficients) Coefficients:[0.0166657454602,-0.00296751984046,0.235714392936,0.00213002070133,-0.00048577251587] <<< print "Intercept:" + str(model.intercept) Intercept:-2.26130330748 <<< print "R^2:" + str(model.summary.r2) R^2:0.295200579035 <<< model.summary.residuals.show() +--------------------+ | residuals| +--------------------+ | -0.7234737533344147| | -0.985466980630501| | -0.6669710598385468| | 1.4162434829714794| |-0.09373154375186754| |-0.15461747949235072| | 0.32659061654192545| | 1.5053877697929803| | -0.640142797263989| | 1.229530260294963| |-0.03776160295256...| | -0.5160734239126814| | -1.5165972740062887| | 1.3269085258245008| | 1.7604670124710626| | 1.2348130901905972| | 2.318660276655887| | 1.0936947030883175| | 1.0169768511417363| | -1.7744915698181583| +--------------------+ solo se muestran las 20 filas principales.
Limpia
Una vez que completes el instructivo, puedes limpiar los recursos que creaste para que dejen de usar la cuota y generar cargos. En las siguientes secciones, se describe cómo borrar o desactivar estos recursos.
Borra el proyecto
La manera más fácil de eliminar la facturación es borrar el proyecto que creaste para el instructivo.
Para borrar el proyecto, sigue estos pasos:
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
Cómo borrar el clúster de Dataproc
Consulta Borra un clúster.
¿Qué sigue?
- Consulta las sugerencias de ajuste de trabajo de Spark