Pub/Sub handler to process Cloud Storage events
Stay organized with collections
Save and categorize content based on your preferences.
This tutorial demonstrates using Cloud Run, Cloud Vision API, and ImageMagick to detect and blur offensive images uploaded to a Cloud Storage bucket.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[],[],null,["# Pub/Sub handler to process Cloud Storage events\n\nThis tutorial demonstrates using Cloud Run, Cloud Vision API, and ImageMagick to detect and blur offensive images uploaded to a Cloud Storage bucket.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Process images from Cloud Storage tutorial](/run/docs/tutorials/image-processing)\n- [Processing images asynchronously](/anthos/run/archive/docs/tutorials/image-processing)\n\nCode sample\n-----------\n\n### Go\n\n\nTo authenticate to Cloud Run, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n // Sample image-processing is a Cloud Run service which performs asynchronous processing on images.\n package main\n\n import (\n \t\"encoding/json\"\n \t\"io\"\n \t\"log\"\n \t\"net/http\"\n \t\"os\"\n\n \t\"github.com/GoogleCloudPlatform/golang-samples/run/image-processing/imagemagick\"\n )\n\n func main() {\n \thttp.HandleFunc(\"/\", HelloPubSub)\n \t// Determine port for HTTP service.\n \tport := os.Getenv(\"PORT\")\n \tif port == \"\" {\n \t\tport = \"8080\"\n \t}\n \t// Start HTTP server.\n \tlog.Printf(\"Listening on port %s\", port)\n \tif err := http.ListenAndServe(\":\"+port, nil); err != nil {\n \t\tlog.Fatal(err)\n \t}\n }\n\n // PubSubMessage is the payload of a Pub/Sub event.\n // See the documentation for more details:\n // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\n type PubSubMessage struct {\n \tMessage struct {\n \t\tData []byte `json:\"data,omitempty\"`\n \t\tID string `json:\"id\"`\n \t} `json:\"message\"`\n \tSubscription string `json:\"subscription\"`\n }\n\n // HelloPubSub receives and processes a Pub/Sub push message.\n func HelloPubSub(w http.ResponseWriter, r *http.Request) {\n \tvar m PubSubMessage\n \tbody, err := io.ReadAll(r.Body)\n \tif err != nil {\n \t\tlog.Printf(\"ioutil.ReadAll: %v\", err)\n \t\thttp.Error(w, \"Bad Request\", http.StatusBadRequest)\n \t\treturn\n \t}\n \tif err := json.Unmarshal(body, &m); err != nil {\n \t\tlog.Printf(\"json.Unmarshal: %v\", err)\n \t\thttp.Error(w, \"Bad Request\", http.StatusBadRequest)\n \t\treturn\n \t}\n\n \tvar e imagemagick.GCSEvent\n \tif err := json.Unmarshal(m.Message.Data, &e); err != nil {\n \t\tlog.Printf(\"json.Unmarshal: %v\", err)\n \t\thttp.Error(w, \"Bad Request\", http.StatusBadRequest)\n \t\treturn\n \t}\n\n \tif e.Name == \"\" || e.Bucket == \"\" {\n \t\tlog.Printf(\"invalid GCSEvent: expected name and bucket\")\n \t\thttp.Error(w, \"Bad Request\", http.StatusBadRequest)\n \t\treturn\n \t}\n\n \tif err := imagemagick.BlurOffensiveImages(r.Context(), e); err != nil {\n \t\tlog.Printf(\"imagemagick.BlurOffensiveImages: %v\", err)\n \t\thttp.Error(w, \"Internal Server Error\", http.StatusInternalServerError)\n \t}\n }\n\n### Java\n\n\nTo authenticate to Cloud Run, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.gson.JsonObject;\n import com.google.gson.JsonParser;\n import java.util.Base64;\n import org.springframework.http.HttpStatus;\n import org.springframework.http.ResponseEntity;\n import org.springframework.web.bind.annotation.RequestBody;\n import org.springframework.web.bind.annotation.RequestMapping;\n import org.springframework.web.bind.annotation.RequestMethod;\n import org.springframework.web.bind.annotation.RestController;\n\n // PubsubController consumes a Pub/Sub message.\n @RestController\n public class PubSubController {\n @RequestMapping(value = \"/\", method = RequestMethod.POST)\n public ResponseEntity\u003cString\u003e receiveMessage(@RequestBody Body body) {\n // Get PubSub message from request body.\n Body.Message message = body.getMessage();\n if (message == null) {\n String msg = \"Bad Request: invalid Pub/Sub message format\";\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.BAD_REQUEST);\n }\n\n // Decode the Pub/Sub message.\n String pubSubMessage = message.getData();\n JsonObject data;\n try {\n String decodedMessage = new String(Base64.getDecoder().decode(pubSubMessage));\n data = JsonParser.parseString(decodedMessage).getAsJsonObject();\n } catch (Exception e) {\n String msg = \"Error: Invalid Pub/Sub message: data property is not valid base64 encoded JSON\";\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.BAD_REQUEST);\n }\n\n // Validate the message is a Cloud Storage event.\n if (data.get(\"name\") == null || data.get(\"bucket\") == null) {\n String msg = \"Error: Invalid Cloud Storage notification: expected name and bucket properties\";\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.BAD_REQUEST);\n }\n\n try {\n ImageMagick.blurOffensiveImages(data);\n } catch (Exception e) {\n String msg = String.format(\"Error: Blurring image: %s\", e.getMessage());\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.INTERNAL_SERVER_ERROR);\n }\n return new ResponseEntity\u003c\u003e(HttpStatus.OK);\n }\n }\n\n### Node.js\n\n\nTo authenticate to Cloud Run, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n const express = require('express');\n const app = express();\n\n // This middleware is available in Express v4.16.0 onwards\n app.use(express.json());\n\n const image = require('./image');\n\n app.post('/', async (req, res) =\u003e {\n if (!req.body) {\n const msg = 'no Pub/Sub message received';\n console.error(`error: ${msg}`);\n res.status(400).send(`Bad Request: ${msg}`);\n return;\n }\n if (!req.body.message || !req.body.message.data) {\n const msg = 'invalid Pub/Sub message format';\n console.error(`error: ${msg}`);\n res.status(400).send(`Bad Request: ${msg}`);\n return;\n }\n\n // Decode the Pub/Sub message.\n const pubSubMessage = req.body.message;\n let data;\n try {\n data = Buffer.from(pubSubMessage.data, 'base64').toString().trim();\n data = JSON.parse(data);\n } catch (err) {\n const msg =\n 'Invalid Pub/Sub message: data property is not valid base64 encoded JSON';\n console.error(`error: ${msg}: ${err}`);\n res.status(400).send(`Bad Request: ${msg}`);\n return;\n }\n\n // Validate the message is a Cloud Storage event.\n if (!data.name || !data.bucket) {\n const msg =\n 'invalid Cloud Storage notification: expected name and bucket properties';\n console.error(`error: ${msg}`);\n res.status(400).send(`Bad Request: ${msg}`);\n return;\n }\n\n try {\n await image.blurOffensiveImages(data);\n res.status(204).send();\n } catch (err) {\n console.error(`error: Blurring image: ${err}`);\n res.status(500).send();\n }\n });\n\n### Python\n\n\nTo authenticate to Cloud Run, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import base64\n import json\n import os\n\n from flask import Flask, request\n\n import image\n\n\n app = Flask(__name__)\n\n\n @app.route(\"/\", methods=[\"POST\"])\n def index():\n \"\"\"Receive and parse Pub/Sub messages containing Cloud Storage event data.\"\"\"\n envelope = request.get_json()\n if not envelope:\n msg = \"no Pub/Sub message received\"\n print(f\"error: {msg}\")\n return f\"Bad Request: {msg}\", 400\n\n if not isinstance(envelope, dict) or \"message\" not in envelope:\n msg = \"invalid Pub/Sub message format\"\n print(f\"error: {msg}\")\n return f\"Bad Request: {msg}\", 400\n\n # Decode the Pub/Sub message.\n pubsub_message = envelope[\"message\"]\n\n if isinstance(pubsub_message, dict) and \"data\" in pubsub_message:\n try:\n data = json.loads(base64.b64decode(pubsub_message[\"data\"]).decode())\n\n except Exception as e:\n msg = (\n \"Invalid Pub/Sub message: \"\n \"data property is not valid base64 encoded JSON\"\n )\n print(f\"error: {e}\")\n return f\"Bad Request: {msg}\", 400\n\n # Validate the message is a Cloud Storage event.\n if not data[\"name\"] or not data[\"bucket\"]:\n msg = (\n \"Invalid Cloud Storage notification: \"\n \"expected name and bucket properties\"\n )\n print(f\"error: {msg}\")\n return f\"Bad Request: {msg}\", 400\n\n try:\n image.blur_offensive_images(data)\n return (\"\", 204)\n\n except Exception as e:\n print(f\"error: {e}\")\n return (\"\", 500)\n\n return (\"\", 500)\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=cloudrun)."]]