Remote inference in Apache Beam

Run in Google Colab View source on GitHub

This example demonstrates how to implement a custom inference call in Apache Beam by using the Google Cloud Vision API.

The prefered way to run inference in Apache Beam is by using the RunInference API. The RunInference API enables you to run models as part of your pipeline in a way that is optimized for machine learning inference. To reduce the number of steps in your pipeline, RunInference supports features like batching. For more infomation about the RunInference API, review the RunInference API.

This notebook creates a custom model handler to make remote inference calls by using the Cloud Vision API. To make remote inference calls to Vertex AI, use the Vertex AI model handler JSON.

Run the Cloud Vision API

You can use the Cloud Vision API to retrieve labels that describe an image. For example, the following image shows a cat with possible labels.

cat-with-labels.png

To run the Google Cloud Vision API on a large set of images, Apache Beam is the ideal tool to handle the workflow. This example demonstates how to retrieve image labels with this API on a small set of images.

The example follows these steps:

  • Read the images.
  • Send the images to an external API to run inference by using the RunInference PTransform.
  • Postprocess the results of your API.

To optimize the calls to the external API, limit the parallel calls to the external remote API by configuring pipeline options. In Apache Beam, each runner provides options to handle the parallelism. The following list includes two examples:

For information about other runners, see the Beam capability matrix

Before you begin

Download and install the dependencies.

!pip install --upgrade pip
!pip install protobuf==3.19.4
!pip install apache-beam[interactive,gcp]>=2.65.0
!pip install google-cloud-vision==3.1.1
!pip install requests

# To use the newly installed version, restart the runtime.
exit()

To use the Cloud Vision API, authenticate with Google Cloud.

# Follow the steps to configure your Google Cloup setup.
gcloud init
gcloud auth application-default login

Run remote inference on Cloud Vision API

This section shows how to run remote inference on the Cloud Vision API.

Download and install Apache Beam and the required modules.

import io
import requests

from google.cloud import vision
from google.cloud.vision_v1.types import Feature
import apache_beam as beam
from apache_beam.ml.inference.base import RemoteModelHandler
from apache_beam.ml.inference.base import RunInference

This example uses images from the MSCoco dataset as a list of image URLs. This data is used as the pipeline input.

image_urls = [
    "http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg",
    "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
    "http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg",
    "http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
    "http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg",
    "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
    "http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg",
]

def read_image(image_url):
  """Read image from url and return image_url, image bytes"""
  response = requests.get(image_url)
  image_bytes = io.BytesIO(response.content).read()
  return image_url, image_bytes

Create a custom model handler

In order to implement remote inference, create a custom model handler. Use the run_inference method to implement the model call and to return its results.

When you run remote inference, prepare to encounter, identify, and handle failure as gracefully as possible. We recommend using the following techniques:

  • Exponential backoff: Retry failed remote calls with exponentially growing pauses between retries. Using exponential backoff ensures that failures don't lead to an overwhelming number of retries in quick succession. The RemoteModelHandler base class handles this logic, with the retry_fn argument determining which errors are retryable. For this example we will always retry.

  • Dead-letter queues: Route failed inferences to a separate PCollection without failing the whole transform. Continue execution without failing the job (batch jobs' default behavior) or retrying indefinitely (streaming jobs' default behavior). This is provided through the with_exception_handling() option for RunInference. This produces tagged outputs for the failed inferences which can be handled separately from successful ones. You can then run custom pipeline logic on the dead-letter (unprocessed messages) queue to log the failure, send an alert, and push the failed message to temporary storage so that it can eventually be reprocessed.

def _always_retry(exception: Exception) -> bool:
  return True

class CloudVisionModelHandler(RemoteModelHandler):
  def __init__(self):
    """DoFn that accepts a batch of images as bytearray
    and sends that batch to the Cloud Vision API for remote inference
    """
    super().__init__(namespace="CloudVisionModelHandler", retry_filter=_always_retry)
  def create_client(self):
    """Initiate the Google Vision API client."""
    client = vision.ImageAnnotatorClient()
    return client

  def request(self, batch, model, inference_args):
    feature = Feature()
    feature.type_ = Feature.Type.LABEL_DETECTION

    # The list of image_urls
    image_urls = [image_url for (image_url, image_bytes) in batch]

    # Create a batch request for all images in the batch.
    images = [vision.Image(content=image_bytes) for (image_url, image_bytes) in batch]
    image_requests = [vision.AnnotateImageRequest(image=image, features=[feature]) for image in images]
    batch_image_request = vision.BatchAnnotateImagesRequest(requests=image_requests)

    # Send the batch request to the remote endpoint.
    responses = model.batch_annotate_images(request=batch_image_request).responses

    return list(zip(image_urls, responses))

Manage batching

When you run inference with your model, either in Apache Beam or in an external API, batch your input to increase the efficiency of the model execution. The RunInference PTransform automatically manages batching by using the BatchElements transform to dynamically group elements together into batches based on the throughput of the pipeline.

If you are designing your own API endpoint, make sure that it can handle batches.

Create the pipeline

This section demonstrates how to chain the pipeline steps together to complete the following tasks:

  • Read data.

  • Transform the data to fit the model input.

  • Run inference with a custom Cloud Vision model handler.

  • Process and display the results.

with beam.Pipeline() as pipeline:
  main, failed = (pipeline | "Create inputs" >> beam.Create(image_urls)
                | "Read images" >> beam.Map(read_image)
                | "Inference" >> RunInference(model_handler=CloudVisionModelHandler()).with_exception_handling()
  )
  _ = main | "Print image_url and annotation" >> beam.Map(print)
  _ = failed.failed_inferences | "Print failed inferences" >> beam.Map(print)
('http://farm3.staticflickr.com/2824/10213933686_6936eb402b_z.jpg', label_annotations {
  mid: "/m/04_sv"
  description: "Motorcycle"
  score: 0.9922548
  topicality: 0.14033242
}
label_annotations {
  mid: "/m/01prls"
  description: "Land vehicle"
  score: 0.99086833
  topicality: 0.0029524593
}
label_annotations {
  mid: "/m/0768fx"
  description: "Automotive lighting"
  score: 0.9853215
  topicality: 0.002913047
}
label_annotations {
  mid: "/m/07yv9"
  description: "Vehicle"
  score: 0.98517245
  topicality: 0.010408105
}
label_annotations {
  mid: "/m/043g5f"
  description: "Fuel tank"
  score: 0.9823826
  topicality: 0.01933147
}
label_annotations {
  mid: "/m/012f08"
  description: "Motor vehicle"
  score: 0.97732854
  topicality: 0.0009314301
}
label_annotations {
  mid: "/m/0h9mv"
  description: "Tire"
  score: 0.9735299
  topicality: 0.0020883244
}
label_annotations {
  mid: "/m/083wq"
  description: "Wheel"
  score: 0.9715105
  topicality: 0.0028435893
}
label_annotations {
  mid: "/m/0h8pb3l"
  description: "Automotive Tire"
  score: 0.96993804
  topicality: 5.827098e-05
}
label_annotations {
  mid: "/m/0h8ls87"
  description: "Automotive Exterior"
  score: 0.9641536
  topicality: 0.00045098987
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
  mid: "/m/02w3_ws"
  description: "Personal care"
  score: 0.853392
  topicality: 0.00013828959
}
label_annotations {
  mid: "/m/02pkr5"
  description: "Plumbing fixture"
  score: 0.8383083
  topicality: 0.012253191
}
label_annotations {
  mid: "/m/0b_zf"
  description: "Plumbing"
  score: 0.726803
  topicality: 0.016276756
}
label_annotations {
  mid: "/m/01j2bj"
  description: "Bathroom"
  score: 0.72486097
  topicality: 0.35419264
}
label_annotations {
  mid: "/m/02jz0l"
  description: "Tap"
  score: 0.6317307
  topicality: 0.00705197
}
label_annotations {
  mid: "/m/0130jx"
  description: "Sink"
  score: 0.5732167
  topicality: 0.07520393
}
label_annotations {
  mid: "/m/054_l"
  description: "Mirror"
  score: 0.5680867
  topicality: 0.08497098
}
label_annotations {
  mid: "/m/0h8lr5r"
  description: "Bathroom Sink"
  score: 0.557554
  topicality: 0.007725588
}
label_annotations {
  mid: "/m/03jvk"
  description: "Household hardware"
  score: 0.5140049
  topicality: 0.00064662547
}
)
('http://farm8.staticflickr.com/7003/6528937031_10e1ce0960_z.jpg', error {
  code: 3
  message: "Bad image data."
}
)
('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error {
  code: 3
  message: "Bad image data."
}
)
('http://farm6.staticflickr.com/5207/5304302785_7b5f763190_z.jpg', error {
  code: 3
  message: "Bad image data."
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
  mid: "/m/02w3_ws"
  description: "Personal care"
  score: 0.853392
  topicality: 0.00013828959
}
label_annotations {
  mid: "/m/02pkr5"
  description: "Plumbing fixture"
  score: 0.8383083
  topicality: 0.012253191
}
label_annotations {
  mid: "/m/0b_zf"
  description: "Plumbing"
  score: 0.726803
  topicality: 0.016276756
}
label_annotations {
  mid: "/m/01j2bj"
  description: "Bathroom"
  score: 0.72486097
  topicality: 0.35419264
}
label_annotations {
  mid: "/m/02jz0l"
  description: "Tap"
  score: 0.6317307
  topicality: 0.00705197
}
label_annotations {
  mid: "/m/0130jx"
  description: "Sink"
  score: 0.5732167
  topicality: 0.07520393
}
label_annotations {
  mid: "/m/054_l"
  description: "Mirror"
  score: 0.5680867
  topicality: 0.08497098
}
label_annotations {
  mid: "/m/0h8lr5r"
  description: "Bathroom Sink"
  score: 0.557554
  topicality: 0.007725588
}
label_annotations {
  mid: "/m/03jvk"
  description: "Household hardware"
  score: 0.5140049
  topicality: 0.00064662547
}
)
('http://farm8.staticflickr.com/7026/6388965173_92664a0d78_z.jpg', label_annotations {
  mid: "/m/02w3_ws"
  description: "Personal care"
  score: 0.853392
  topicality: 0.00013828959
}
label_annotations {
  mid: "/m/02pkr5"
  description: "Plumbing fixture"
  score: 0.8383083
  topicality: 0.012253191
}
label_annotations {
  mid: "/m/0b_zf"
  description: "Plumbing"
  score: 0.726803
  topicality: 0.016276756
}
label_annotations {
  mid: "/m/01j2bj"
  description: "Bathroom"
  score: 0.72486097
  topicality: 0.35419264
}
label_annotations {
  mid: "/m/02jz0l"
  description: "Tap"
  score: 0.6317307
  topicality: 0.00705197
}
label_annotations {
  mid: "/m/0130jx"
  description: "Sink"
  score: 0.5732167
  topicality: 0.07520393
}
label_annotations {
  mid: "/m/054_l"
  description: "Mirror"
  score: 0.5680867
  topicality: 0.08497098
}
label_annotations {
  mid: "/m/0h8lr5r"
  description: "Bathroom Sink"
  score: 0.557554
  topicality: 0.007725588
}
label_annotations {
  mid: "/m/03jvk"
  description: "Household hardware"
  score: 0.5140049
  topicality: 0.00064662547
}
)

Monitor the pipeline

Because monitoring can provide insight into the status and health of the application, consider monitoring and measuring pipeline performance. For information about the available tracking metrics, see RunInference Metrics.