![]() |
![]() |
This example demonstrates how to implement a custom inference call in Apache Beam by using the Google Cloud Vision API.
The prefered way to run inference in Apache Beam is by using the RunInference API. The RunInference API enables you to run models as part of your pipeline in a way that is optimized for machine learning inference. To reduce the number of steps in your pipeline, RunInference supports features like batching. For more infomation about the RunInference API, review the RunInference API.
This notebook creates a custom model handler to make remote inference calls by using the Cloud Vision API. To make remote inference calls to Vertex AI, use the Vertex AI model handler JSON.
Run the Cloud Vision API
You can use the Cloud Vision API to retrieve labels that describe an image. For example, the following image shows a cat with possible labels.
To run the Google Cloud Vision API on a large set of images, Apache Beam is the ideal tool to handle the workflow. This example demonstates how to retrieve image labels with this API on a small set of images.
The example follows these steps:
- Read the images.
- Send the images to an external API to run inference by using the
RunInference PTransform
. - Postprocess the results of your API.
To optimize the calls to the external API, limit the parallel calls to the external remote API by configuring pipeline options. In Apache Beam, each runner provides options to handle the parallelism. The following list includes two examples:
- With the Direct Runner, use the
direct_num_workers
pipeline option. - With the Google Cloud Dataflow Runner, use the
max_num_workers
pipeline option.
For information about other runners, see the Beam capability matrix
Before you begin
Download and install the dependencies.
!pip install --upgrade pip
!pip install protobuf==3.19.4
!pip install apache-beam[interactive,gcp]>=2.65.0
!pip install google-cloud-vision==3.1.1
!pip install requests
# To use the newly installed version, restart the runtime.
exit()
To use the Cloud Vision API, authenticate with Google Cloud.
# Follow the steps to configure your Google Cloup setup.
gcloud init
gcloud auth application-default login
Run remote inference on Cloud Vision API
This section shows how to run remote inference on the Cloud Vision API.
Download and install Apache Beam and the required modules.
import io
import requests
from google.cloud import vision
from google.cloud.vision_v1.types import Feature
import apache_beam as beam
from apache_beam.ml.inference.base import RemoteModelHandler
from apache_beam.ml.inference.base import RunInference
This example uses images from the MSCoco dataset as a list of image URLs. This data is used as the pipeline input.
image_urls = [
"http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
"http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg",
"http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
"http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
"http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
]
def read_image(image_url):
"""Read image from url and return image_url, image bytes"""
response = requests.get(image_url)
image_bytes = io.BytesIO(response.content).read()
return image_url, image_bytes
Create a custom model handler
In order to implement remote inference, create a custom model handler. Use the run_inference
method to implement the model call and to return its results.
When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:
Exponential backoff: Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The
RemoteModelHandler
base class handles this logic, with theretry_fn
argument determining which errors are retryable. For this example we will always retry.Dead-letter queues: Route failed inferences to a separate
PCollection
without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). This is provided through thewith_exception_handling()
option for RunInference. This produces tagged outputs for the failed inferences which can be handled separately from successful ones. You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed.
def _always_retry(exception: Exception) -> bool:
return True
class CloudVisionModelHandler(RemoteModelHandler):
def __init__(self):
"""DoFn that accepts a batch of images as bytearray
and sends that batch to the Cloud Vision API for remote inference
"""
super().__init__(namespace="CloudVisionModelHandler", retry_filter=_always_retry)
def create_client(self):
"""Initiate the Google Vision API client."""
client = vision.ImageAnnotatorClient()
return client
def request(self, batch, model, inference_args):
feature = Feature()
feature.type_ = Feature.Type.LABEL_DETECTION
# The list of image_urls
image_urls = [image_url for (image_url, image_bytes) in batch]
# Create a batch request for all images in the batch.
images = [vision.Image(content=image_bytes) for (image_url, image_bytes) in batch]
image_requests = [vision.AnnotateImageRequest(image=image, features=[feature]) for image in images]
batch_image_request = vision.BatchAnnotateImagesRequest(requests=image_requests)
# Send the batch request to the remote endpoint.
responses = model.batch_annotate_images(request=batch_image_request).responses
return list(zip(image_urls, responses))
Manage batching
When you run inference with your model, either in Apache Beam or in an external API, batch your input to increase the efficiency of the model execution.
The RunInference PTransform
automatically manages batching by using the BatchElements
transform to dynamically group elements together into batches based on the throughput of the pipeline.
If you are designing your own API endpoint, make sure that it can handle batches.
Create the pipeline
This section demonstrates how to chain the pipeline steps together to complete the following tasks:
Read data.
Transform the data to fit the model input.
Run inference with a custom Cloud Vision model handler.
Process and display the results.
with beam.Pipeline() as pipeline:
main, failed = (pipeline | "Create inputs" >> beam.Create(image_urls)
| "Read images" >> beam.Map(read_image)
| "Inference" >> RunInference(model_handler=CloudVisionModelHandler()).with_exception_handling()
)
_ = main | "Print image_url and annotation" >> beam.Map(print)
_ = failed.failed_inferences | "Print failed inferences" >> beam.Map(print)
('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations { mid: "/m/04_sv" description: "Motorcycle" score: 0.9922548 topicality: 0.14033242 } label_annotations { mid: "/m/01prls" description: "Land vehicle" score: 0.99086833 topicality: 0.0029524593 } label_annotations { mid: "/m/0768fx" description: "Automotive lighting" score: 0.9853215 topicality: 0.002913047 } label_annotations { mid: "/m/07yv9" description: "Vehicle" score: 0.98517245 topicality: 0.010408105 } label_annotations { mid: "/m/043g5f" description: "Fuel tank" score: 0.9823826 topicality: 0.01933147 } label_annotations { mid: "/m/012f08" description: "Motor vehicle" score: 0.97732854 topicality: 0.0009314301 } label_annotations { mid: "/m/0h9mv" description: "Tire" score: 0.9735299 topicality: 0.0020883244 } label_annotations { mid: "/m/083wq" description: "Wheel" score: 0.9715105 topicality: 0.0028435893 } label_annotations { mid: "/m/0h8pb3l" description: "Automotive Tire" score: 0.96993804 topicality: 5.827098e-05 } label_annotations { mid: "/m/0h8ls87" description: "Automotive Exterior" score: 0.9641536 topicality: 0.00045098987 } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/02w3_ws" description: "Personal care" score: 0.853392 topicality: 0.00013828959 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.8383083 topicality: 0.012253191 } label_annotations { mid: "/m/0b_zf" description: "Plumbing" score: 0.726803 topicality: 0.016276756 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.72486097 topicality: 0.35419264 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.6317307 topicality: 0.00705197 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.5732167 topicality: 0.07520393 } label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.5680867 topicality: 0.08497098 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom Sink" score: 0.557554 topicality: 0.007725588 } label_annotations { mid: "/m/03jvk" description: "Household hardware" score: 0.5140049 topicality: 0.00064662547 } ) ('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error { code: 3 message: "Bad image data." } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/02w3_ws" description: "Personal care" score: 0.853392 topicality: 0.00013828959 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.8383083 topicality: 0.012253191 } label_annotations { mid: "/m/0b_zf" description: "Plumbing" score: 0.726803 topicality: 0.016276756 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.72486097 topicality: 0.35419264 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.6317307 topicality: 0.00705197 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.5732167 topicality: 0.07520393 } label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.5680867 topicality: 0.08497098 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom Sink" score: 0.557554 topicality: 0.007725588 } label_annotations { mid: "/m/03jvk" description: "Household hardware" score: 0.5140049 topicality: 0.00064662547 } ) ('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations { mid: "/m/02w3_ws" description: "Personal care" score: 0.853392 topicality: 0.00013828959 } label_annotations { mid: "/m/02pkr5" description: "Plumbing fixture" score: 0.8383083 topicality: 0.012253191 } label_annotations { mid: "/m/0b_zf" description: "Plumbing" score: 0.726803 topicality: 0.016276756 } label_annotations { mid: "/m/01j2bj" description: "Bathroom" score: 0.72486097 topicality: 0.35419264 } label_annotations { mid: "/m/02jz0l" description: "Tap" score: 0.6317307 topicality: 0.00705197 } label_annotations { mid: "/m/0130jx" description: "Sink" score: 0.5732167 topicality: 0.07520393 } label_annotations { mid: "/m/054_l" description: "Mirror" score: 0.5680867 topicality: 0.08497098 } label_annotations { mid: "/m/0h8lr5r" description: "Bathroom Sink" score: 0.557554 topicality: 0.007725588 } label_annotations { mid: "/m/03jvk" description: "Household hardware" score: 0.5140049 topicality: 0.00064662547 } )
Monitor the pipeline
Because monitoring can provide insight into the status and health of the application, consider monitoring and measuring pipeline performance. For information about the available tracking metrics, see RunInference Metrics.