Run in Google Colab | View source on GitHub |
This example demonstrates how to implement a custom inference call in Apache Beam by using the Google Cloud Vision API.
The prefered way to run inference in Apache Beam is by using the RunInference API. The RunInference API enables you to run models as part of your pipeline in a way that is optimized for machine learning inference. To reduce the number of steps in your pipeline, RunInference supports features like batching. For more infomation about the RunInference API, review the RunInference API.
This notebook creates a custom model handler to make remote inference calls by using the Cloud Vision API. To make remote inference calls to Vertex AI, use the Vertex AI model handler JSON.
Run the Cloud Vision API
You can use the Cloud Vision API to retrieve labels that describe an image. For example, the following image shows a cat with possible labels.
To run the Google Cloud Vision API on a large set of images, Apache Beam is the ideal tool to handle the workflow. This example demonstates how to retrieve image labels with this API on a small set of images.
The example follows these steps:
- Read the images.
- Send the images to an external API to run inference by using the
RunInference PTransform
. - Postprocess the results of your API.
To optimize the calls to the external API, limit the parallel calls to the external remote API by configuring pipeline options. In Apache Beam, each runner provides options to handle the parallelism. The following list includes two examples:
- With the Direct Runner, use the
direct_num_workers
pipeline option. - With the Google Cloud Dataflow Runner, use the
max_num_workers
pipeline option.
For information about other runners, see the Beam capability matrix
Before you begin
Download and install the dependencies.
!pip install --upgrade pip
!pip install protobuf==3.19.4
!pip install apache-beam[interactive,gcp]>=2.40.0
!pip install google-cloud-vision==3.1.1
!pip install requests
# To use the newly installed version, restart the runtime.
exit()
To use the Cloud Vision API, authenticate with Google Cloud.
# Follow the steps to configure your Google Cloup setup.
gcloud init
gcloud auth application-default login
Run remote inference on Cloud Vision API
This section shows how to run remote inference on the Cloud Vision API.
Download and install Apache Beam and the required modules.
from typing import List
import io
import os
import requests
from google.cloud import vision
from google.cloud.vision_v1.types import Feature
import apache_beam as beam
from apache_beam.ml.inference.base import ModelHandler
from apache_beam.ml.inference.base import RunInference
This example uses images from the MSCoco dataset as a list of image URLs. This data is used as the pipeline input.
image_urls = [
"http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
"http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg",
"http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
"http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
]
def read_image(image_url):
"""Read image from url and return image_url, image bytes"""
response = requests.get(image_url)
image_bytes = io.BytesIO(response.content).read()
return image_url, image_bytes
Create a custom model handler
In order to implement remote inference, create a custom model handler. Use the run_inference
method to implement the model call and to return its results.
When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:
Exponential backoff: Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession.
Dead-letter queues: Route failed inferences to a separate
PCollection
without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed.
class CloudVisionModelHandler(ModelHandler):
"""DoFn that accepts a batch of images as bytearray
and sends that batch to the Cloud Vision API for remote inference"""
def load_model(self):
"""Initiate the Google Vision API client."""
client = vision.ImageAnnotatorClient()
return client
def run_inference(self, batch, model, inference):
feature = Feature()
feature.type_ = Feature.Type.LABEL_DETECTION
# The list of image_urls
image_urls = [image_url for (image_url, image_bytes) in batch]
# Create a batch request for all images in the batch.
images = [vision.Image(content=image_bytes) for (image_url, image_bytes) in batch]
image_requests = [vision.AnnotateImageRequest(image=image, features=[feature]) for image in images]
batch_image_request = vision.BatchAnnotateImagesRequest(requests=image_requests)
# Send the batch request to the remote endpoint.
responses = model.batch_annotate_images(request=batch_image_request).responses
return list(zip(image_urls, responses))
Manage batching
When you run inference with your model, either in Apache Beam or in an external API, batch your input to increase the efficiency of the model execution.
The RunInference PTransform
automatically manages batching by using the BatchElements
transform to dynamically group elements together into batches based on the throughput of the pipeline.
If you are designing your own API endpoint, make sure that it can handle batches.
Create the pipeline
This section demonstrates how to chain the pipeline steps together to complete the following tasks:
Read data.
Transform the data to fit the model input.
Run inference with a custom Cloud Vision model handler.
Process and display the results.
with beam.Pipeline() as pipeline:
_ = (pipeline | "Create inputs" >> beam.Create(image_urls)
| "Read images" >> beam.Map(read_image)
| "Inference" >> RunInference(model_handler=CloudVisionModelHandler())
| "Print image_url and annotation" >> beam.Map(print)
)
('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations { mid: "/m/083wq" description: "Wheel" score: 0.977976143 topicality: 0.977976143 } label_annotations { mid: "/m/0h9mv" description: "Tire" score: 0.977934957 topicality: 0.977934957 } label_annotations { mid: "/m/043g5f" description: "Fuel tank" score: 0.958490431 topicality: 0.958490431 } label_annotations { mid: "/m/05s2s" description: "Plant" score: 0.95674181 topicality: 0.95674181 } label_annotations { mid: "/m/0h8lk_j" description: "Automotive fuel system" score: 0.941456497 topicality: 0.941456497 } label_annotations { mid: "/m/07yv9" description: "Vehicle" score: 0.936428607 topicality: 0.936428607 } label_annotations { mid: "/m/02qwkrn" description: "Vehicle brake" score: 0.905624092 topicality: 0.905624092 } label_annotations { mid: "/m/0h8pb3l" description: "Automotive tire" score: 0.897686064 topicality: 0.897686064 } label_annotations { mid: "/m/0768fx" description: "Automotive lighting" score: 0.897505879 topicality: 0.897505879 } label_annotations { mid: "/m/0h8p7_l" description: "Automotive exhaust" score: 0.877965152 topicality: 0.877965152 } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.969698846 topicality: 0.969698846 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.962297797 topicality: 0.962297797 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom sink" score: 0.933002412 topicality: 0.933002412 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.930314779 topicality: 0.930314779 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.920037031 topicality: 0.920037031 } label_annotations { mid: "/m/02dgv" description: "Door" score: 0.890176594 topicality: 0.890176594 } label_annotations { mid: "/m/09ggk" description: "Purple" score: 0.878831089 topicality: 0.878831089 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.866840482 topicality: 0.866840482 } label_annotations { mid: "/m/04wnmd" description: "Fixture" score: 0.862223864 topicality: 0.862223864 } label_annotations { mid: "/m/09qqq" description: "Wall" score: 0.809348285 topicality: 0.809348285 } ) ('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.969698846 topicality: 0.969698846 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.962297797 topicality: 0.962297797 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom sink" score: 0.933002412 topicality: 0.933002412 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.930314779 topicality: 0.930314779 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.920037031 topicality: 0.920037031 } label_annotations { mid: "/m/02dgv" description: "Door" score: 0.890176594 topicality: 0.890176594 } label_annotations { mid: "/m/09ggk" description: "Purple" score: 0.878831089 topicality: 0.878831089 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.866840482 topicality: 0.866840482 } label_annotations { mid: "/m/04wnmd" description: "Fixture" score: 0.862223864 topicality: 0.862223864 } label_annotations { mid: "/m/09qqq" description: "Wall" score: 0.809348285 topicality: 0.809348285 } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.969698846 topicality: 0.969698846 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.962297797 topicality: 0.962297797 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom sink" score: 0.933002412 topicality: 0.933002412 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.930314779 topicality: 0.930314779 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.920037031 topicality: 0.920037031 } label_annotations { mid: "/m/02dgv" description: "Door" score: 0.890176594 topicality: 0.890176594 } label_annotations { mid: "/m/09ggk" description: "Purple" score: 0.878831089 topicality: 0.878831089 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.866840482 topicality: 0.866840482 } label_annotations { mid: "/m/04wnmd" description: "Fixture" score: 0.862223864 topicality: 0.862223864 } label_annotations { mid: "/m/09qqq" description: "Wall" score: 0.809348285 topicality: 0.809348285 } )
Monitor the pipeline
Because monitoring can provide insight into the status and health of the application, consider monitoring and measuring pipeline performance. For information about the available tracking metrics, see RunInference Metrics.