Building an ML vision analytics solution with Dataflow and Cloud Vision API

Last reviewed 2021-02-10 UTC

In this tutorial, you'll learn how to deploy a Dataflow pipeline to process large-scale image files with Cloud Vision. Dataflow stores the results in BigQuery so that you can use them to train BigQuery ML pre-built models.

The Dataflow pipeline you create in the tutorial can handle images in large quantities. It's only limited by your Vision quota. You can increase your Vision quota based on your scale requirements.

The tutorial is intended for data engineers and data scientists. It assumes you have basic knowledge of building Dataflow pipelines using Apache Beam's Java SDK, BigQuery Standard SQL, and basic shell scripting. It also assumes that you are familiar with Vision.

Objectives

  • Create an image metadata ingestion pipeline with Pub/Sub notifications for Cloud Storage.
  • Use Dataflow to deploy a real-time vision analytics pipeline.
  • Use Vision to analyze images for a set of feature types.
  • Analyze and train data with BigQuery ML.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. In Cloud Shell, enable the Dataflow, Container Registry, and Vision APIs.

    gcloud services enable dataflow.googleapis.com \
    containerregistry.googleapis.com vision.googleapis.com
    
  5. Set some environment variables. (Replace REGION with one of the available Dataflow regions. us-central1, for example).

    export PROJECT=$(gcloud config get-value project)
    export REGION=REGION
    
  6. Clone the tutorial's Git repository:

    git clone https://github.com/GoogleCloudPlatform/dataflow-vision-analytics.git
    
  7. Go to the repository's root folder:

    cd dataflow-vision-analytics
    

Reference architecture

The following diagram illustrates the flow of the system that you build in this tutorial.

Workflow diagram showing the flow of information for ingest/trigger, process, and store.

As shown in the diagram, the flow is as follows:

  1. Clients upload image files to a Cloud Storage bucket.

  2. For each file upload, the system automatically notifies the client by publishing a message to Pub/Sub.

  3. For each new notification, the Dataflow pipeline does the following:

    1. Reads file metadata from the Pub/Sub message.
    2. Sends each segment to Vision API for annotation processing.
    3. Stores all annotations in a BigQuery table for further analysis.

Creating a Pub/Sub notification for Cloud Storage

In this section, you create a Pub/Sub notification for Cloud Storage. This notification publishes metadata for the image file that is uploaded in the bucket. Based on the metadata, the Dataflow pipeline starts processing the request.

  1. In Cloud Shell, create a Pub/Sub topic:

    export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
    gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
    
  2. Create a Pub/Sub subscription for the topic:

    export  GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
    gcloud pubsub subscriptions create  ${GCS_NOTIFICATION_SUBSCRIPTION}  --topic=${GCS_NOTIFICATION_TOPIC}
    
  3. Create a bucket to store the input image files:

    export IMAGE_BUCKET=${PROJECT}-images
    gsutil mb -c standard -l ${REGION} gs://${IMAGE_BUCKET}
    
  4. Create a Pub/Sub notification for the bucket:

    gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
      -f json gs://${IMAGE_BUCKET}
    

Now that you have configured notifications, the system sends a Pub/Sub message to the topic that you created. This action occurs every time you upload a file to the bucket.

Creating a BigQuery dataset

In this section, you create a BigQuery dataset to store the results output by the Dataflow pipeline. The pipeline automatically creates tables based on vision feature types.

  • In Cloud Shell, create a BigQuery dataset:

    export BIGQUERY_DATASET="vision_analytics"
    bq mk -d --location=US ${BIGQUERY_DATASET}
    

Creating a Dataflow Flex Template

In this section, you create Apache Beam pipeline code and then run the Dataflow pipeline as a Dataflow job using a Dataflow Flex Template.

  1. In Cloud Shell, build the Apache Beam pipeline's code:

    gradle build
    
  2. Create a Docker image for the Dataflow Flex Template:

    gcloud auth configure-docker
    gradle jib \
      --image=gcr.io/${PROJECT}/dataflow-vision-analytics:latest
    
  3. Create a Cloud Storage bucket to store the Dataflow Flex Template:

    export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}-dataflow-template-config
    gsutil mb -c standard -l ${REGION} \
      gs://${DATAFLOW_TEMPLATE_BUCKET}
    
  4. Upload the template's JSON configuration file to the bucket:

    cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json
    {
      "image": "gcr.io/${PROJECT}/dataflow-vision-analytics:latest",
      "sdk_info": {"language": "JAVA"}
    }
    EOF
    

Running the Dataflow pipeline for a set of Vision features

The parameters listed in the following table are specific to this Dataflow pipeline.

Refer to the Dataflow documentation for the complete list of standard Dataflow execution parameters.

Parameter Description

windowInterval

The window time interval (in seconds) for outputting results to BigQuery and Pub/Sub. The default is 5.

batchSize

The number of images to include in a request to the Vision API. The default is 1. You can increase it to a maximum of 16.

subscriberId

The ID of the Pub/Sub subscription that receives input Cloud Storage notifications.

keyRange

The parameter that enables you to improve processing performance for large data sets. A higher value means increased parallelism among workers. The default is 1.

visionApiProjectId

The project ID to use for the Vision API.

datasetName

The reference of the output BigQuery dataset.

features

A list of image-processing features.

labelAnnottationTable, landmarkAnnotationTable, logoAnnotationTable, faceAnnotationTable, imagePropertiesTable, cropHintAnnotationTable, errorLogTable

String parameters with table names for various annotations. The default values are provided for each table.
  1. In Cloud Shell, define a job name for the Dataflow pipeline:

    export JOB_NAME=vision-analytics-pipeline-1
    
  2. Create a file with parameters for the Dataflow pipeline:

    PARAMETERS=params.yaml
    cat << EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: IMAGE_PROPERTIES,LABEL_DETECTION,LANDMARK_DETECTION,LOGO_DETECTION,CROP_HINTS,FACE_DETECTION
      datasetName: ${BIGQUERY_DATASET}
    EOF
    
  3. Run the Dataflow pipeline to process images for these feature types: IMAGE_PROPERTIES, LABEL_DETECTION, LANDMARK_DETECTION, LOGO_DETECTION, CROP_HINTS,FACE_DETECTION.

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    

    This command uses the parameters listed in the preceding table.

  4. Retrieve the ID of the running Dataflow job:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --format "value(id)" --status active)
    
  5. Display the URL of the Dataflow job's web page:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  6. Open the displayed URL in a new browser tab. After a few seconds, the graph for the Dataflow job appears:

    Workflow diagram for the Dataflow job.

    The Dataflow pipeline is now running and waiting to receive input notifications from Pub/Sub.

  7. In Cloud Shell, trigger the Dataflow pipeline by uploading some test files into the input bucket:

    gsutil cp gs://df-vision-ai-test-data/bali.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/faces.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/bubble.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/setagaya.jpeg gs://${IMAGE_BUCKET}
    gsutil cp gs://df-vision-ai-test-data/st_basils.jpeg gs://${IMAGE_BUCKET}
    
  8. In the Google Cloud console, review the custom counters in the Dataflow (in the right panel of the Dataflow job) and verify that it processed all five images:

    List of images returned from the file upload.

  9. In Cloud Shell, validate that the tables were automatically created:

    bq query "select table_name, table_type from \
    ${BIGQUERY_DATASET}.INFORMATION_SCHEMA.TABLES"
    

    The output is as follows:

    +----------------------+------------+
    |      table_name      | table_type |
    +----------------------+------------+
    | face_annotation      | BASE TABLE |
    | label_annotation     | BASE TABLE |
    | crop_hint_annotation | BASE TABLE |
    | landmark_annotation  | BASE TABLE |
    | image_properties     | BASE TABLE |
    +----------------------+------------+
    
  10. View the schema for the landmark_annotation table. If requested, the LANDMARK_DETECTION feature captures the attributes returned from the API call.

    bq show --schema --format=prettyjson ${BIGQUERY_DATASET}.landmark_annotation
    

    The output is as follows:

    [
      {
        "mode": "REQUIRED",
        "name": "gcs_uri",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "mid",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "description",
        "type": "STRING"
      },
      {
        "mode": "REQUIRED",
        "name": "score",
        "type": "FLOAT"
      },
      {
        "fields": [
          {
            "fields": [
              {
                "mode": "REQUIRED",
                "name": "x",
                "type": "FLOAT"
              },
              {
                "mode": "REQUIRED",
                "name": "y",
                "type": "FLOAT"
              }
            ],
            "mode": "REPEATED",
            "name": "vertices",
            "type": "RECORD"
          }
        ],
        "mode": "NULLABLE",
        "name": "bounding_poly",
        "type": "RECORD"
      },
      {
        "mode": "REPEATED",
        "name": "locations",
        "type": "GEOGRAPHY"
      },
      {
        "mode": "REQUIRED",
        "name": "transaction_timestamp",
        "type": "TIMESTAMP"
      }
    ]
    
  11. Stop the pipeline:

    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    

    Although there are no more Pub/Sub notifications to process, the streaming pipeline you created continues to run until you enter this command.

Analyzing a Flickr30K dataset

In this section, you analyze a flickr30K dataset for label and landmark detection.

  1. In Cloud Shell, define a new job name:

    export JOB_NAME=vision-analytics-pipeline-2
    
  2. Change the Dataflow pipeline parameters so that it's optimized for a large dataset. The batchSize and keyRange are increased to allow higher throughput. Dataflow will scale the number of workers as needed:

    cat <<EOF > ${PARAMETERS}
    --parameters:
      autoscalingAlgorithm: THROUGHPUT_BASED
      enableStreamingEngine: "true"
      subscriberId: projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}
      visionApiProjectId: ${PROJECT}
      features: LABEL_DETECTION,LANDMARK_DETECTION
      datasetName: ${BIGQUERY_DATASET}
      batchSize: "16"
      windowInterval: "5"
      keyRange: "2"
    EOF
    
  3. Run the pipeline:

    gcloud dataflow flex-template run ${JOB_NAME} \
    --project=${PROJECT} \
    --region=${REGION} \
    --template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_vision_analytics.json \
    --flags-file ${PARAMETERS}
    
  4. Upload the dataset into an input bucket:

    gsutil -m  cp gs://df-vision-ai-test-data/*  gs://${IMAGE_BUCKET}
    
  5. Retrieve the ID of the running Dataflow job:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}" --region ${REGION} --format "value(id)" --status active)
    
  6. Display the URL of the Dataflow job's web page:

    echo "https://console.cloud.google.com/dataflow/jobs/${REGION}/${JOB_ID}"
    
  7. Open the displayed URL in a new browser tab.

  8. In the Google Cloud console, validate the custom counters in the Dataflow to ensure that all files are processed. All the files normally process in less than 30 minutes.

  9. Filter by custom counters under Process Annotations.

    The output is as follows:

    List of counters returned after you filter by custom counters. Shows the
counter name, the value, and the step.

    The processedFiles metric (31,935) matches the total number of images that were uploaded in the bucket (total file count is 31,936). However, the numberOfRequests metric (1,997) is lower than the number of files that went through the pipeline. This difference is because the pipeline batches up to 16 files per request, as shown in the values of the batchSizeDistribution_* metrics.

  10. Shut down the pipeline:

    JOB_ID=$(gcloud dataflow jobs list --filter "name:${JOB_NAME}"
    --region ${REGION}
    --format "value(id)"
    --status active) \
    gcloud dataflow jobs drain ${JOB_ID} \
    --region ${REGION}
    
  11. In the Google Cloud console, go to the BigQuery Query editor page.

    Go to Query editor

  12. Find the most likely label for each file:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM (
      SELECT
        gcs_uri,
        description,
        score,
        ROW_NUMBER() OVER (PARTITION BY gcs_uri ORDER BY score DESC )
    AS row_num
      FROM
         `vision_analytics.label_annotation`)
    WHERE
      row_num = 1
    ORDER BY
      gcs_uri DESC
    

    The output is as follows. You can see from the response that Landmark is the most likely description for the st_basils.jpeg file.

    List of image file names, descriptions, and scores.

  13. Find the top 10 labels and their max scores:

    SELECT
      description,
      COUNT(*) AS found,
      MAX(score) AS max_score
    FROM
      `vision_analytics.label_annotation`
    GROUP BY
      description
    ORDER BY
      found DESC
    LIMIT 10
    

    The final output looks similar to the following:

    List of the top 10 labels found. It includes the description, the number
of times found, and a max score.

  14. Find the top 10 popular landmarks:

    SELECT
      description,
      COUNT(*) AS count,
      MAX(score) AS max_score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LENGTH(description)>0
    GROUP BY
      description
    ORDER BY
      count DESC
    LIMIT 10
    

    The output is as follows. You can see from the response that Times Square appears to be the most popular destination.

    List of the top 10 most popular landmarks returned by the query. Includes
the description, count, and max score.

  15. Find any image that has a waterfall:

    SELECT
      SPLIT(gcs_uri,'/')[OFFSET(3)] file,
      description,
      score
    FROM
      `vision_analytics.landmark_annotation`
    WHERE
      LOWER(description) LIKE '%fall%'
    ORDER BY score DESC
    

    The output is as follows. It only contains images of waterfalls.

    List of waterfalls. Includes the file name, description, and score.

  16. Find an image of a landmark within 3 kilometers of the Colosseum in Rome (the ST_GEOPOINT function uses the Colosseum's longitude and latitude):

    WITH
      landmarksWithDistances AS (
      SELECT
        gcs_uri,
        description,
        location,
        ST_DISTANCE(location,
          ST_GEOGPOINT(12.492231,
            41.890222)) distance_in_meters,
      FROM
        `vision_analytics.landmark_annotation` landmarks
      CROSS JOIN
        UNNEST(landmarks.locations) AS location )
    SELECT
      SPLIT(gcs_uri,"/")[OFFSET(3)] file,
      description,
        ROUND(distance_in_meters) distance_in_meters,
      location,
      CONCAT("https://storage.cloud.google.com/", SUBSTR(gcs_uri, 6)) AS image_url
    FROM
      landmarksWithDistances
    WHERE
      distance_in_meters < 3000
    ORDER BY
      distance_in_meters
    LIMIT
      100
    

    The output is as follows. You can see that several popular destinations appear in these images:

    List of all images within 3km of the Colosseum in Rome. Includes file
name, description, distance in meters from the Colosseum, and the location.

    The same image can contain multiple locations of the same landmark. This functionality is described in the Vision API documentation. Because one location can indicate the location of the scene in the image, multiple LocationInfo elements can be present. Another location can indicate where the image was taken. Location information is usually present for landmarks.

    You can visualize the data in BigQuery Geo Viz by pasting in the previous query. When you select a point on the map, you see its details. The Image_url attribute contains the link to the image file that you can open in a browser.

    Map of locations and their distance from the Colosseum.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the Google Cloud project

The easiest way to eliminate billing is to delete the Google Cloud project you created for the tutorial.

  1. In the Google Cloud console, go to the Manage resources page.