啟用事件導向函式重試功能

本文說明如何為事件驅動的 Cloud Run 函式啟用重試功能。自動重試功能不適用於 HTTP 函式。

事件導向函式無法完成的原因

在極少數情況下,函式可能會因內部錯誤而提早結束,根據預設,函式可能會也可能不會自動重試。

更常見的是,事件驅動函式可能會因函式程式碼本身擲回的錯誤而無法順利完成。發生這種情況的原因可能包括:

  • 函式包含錯誤且執行階段擲回例外狀況。
  • 函式無法連上服務端點,或在嘗試連線時逾時。
  • 這個函式會刻意擲回例外狀況 (例如,當參數驗證失敗時)。
  • Node.js 函式會傳回已遭拒絕的承諾,或將非 null 值傳遞至回呼。

在上述任何情況下,函式都會停止執行並傳回錯誤。產生訊息的事件觸發事件具有重試政策,您可以自訂這些政策,以滿足函式的需求。

重試的語意

Cloud Run 函式會為事件來源發出的每個事件,提供至少一次的事件導向函式執行作業。重試的設定方式取決於您建立函式的做法:

  • Google Cloud 控制台或透過 Cloud Run Admin API 建立的函式,需要另外建立及管理事件觸發事件。觸發事件具有預設重試行為,您可以根據函式的需要自訂這些行為。
  • 使用 Cloud Functions v2 API 建立的函式會隱含建立必要的事件觸發條件,例如 Pub/Sub 主題或 Eventarc 觸發條件。根據預設,這些觸發事件的重試功能會停用,但您可以使用 Cloud Functions v2 API 重新啟用。

使用 Cloud Run 建立的事件驅動函式

在 Google Cloud 控制台或使用 Cloud Run 管理員 API 建立的函式,需要分別建立及管理事件觸發事件。強烈建議您查看每種觸發事件類型的預設行為:

  • Eventarc 重試政策的預設訊息保留時間為 24 小時,並採用指數輪詢方式的延遲機制。請參閱 Eventarc 說明文件,瞭解如何重試事件
  • Pub/Sub 預設會為所有訂閱項目使用「立即重新傳送」政策。請參閱 Pub/Sub 說明文件,瞭解如何處理訊息失敗重試要求

使用 Cloud Functions v2 API 建立的事件導向函式

使用 Cloud Functions 第 2 版 API 建立的函式,例如使用 Cloud Functions gcloud CLI、REST API 或 Terraform,會代您建立及管理事件觸發事件。根據預設,如果函式叫用作業因錯誤而終止,系統就不會再次叫用該函式,並且會捨棄事件。在事件驅動函式上啟用重試功能後,Cloud Run 函式會重試失敗的函式叫用,直到成功完成或重試時間窗口到期為止。

如果未為函式啟用重試功能 (這是預設值),函式一律會回報已成功執行,且記錄中可能會顯示 200 OK 回應碼。即使函式發生錯誤,也會發生這種情況。為清楚指出函式遇到錯誤時的情況,請務必適當地回報錯誤

啟用或停用重試

如要啟用或停用重試,您可以使用 Google Cloud CLI。根據預設,重試功能會停用。

透過 Google Cloud CLI 設定重試

如要使用 Google Cloud CLI 啟用重試功能,請在部署函式時加入 --retry 標記:

gcloud functions deploy FUNCTION_NAME --retry FLAGS...

如要停用重試功能,請在沒有 --retry 標記的情況下重新部署函式:

gcloud functions deploy FUNCTION_NAME FLAGS...

重試視窗

重試時段會在 24 小時後失效。Cloud Run 函式會使用指數輪詢策略重試新建立的事件導向函式,輪詢間隔會以 10 到 600 秒的間隔遞增。

最佳做法

本節將說明使用重試的最佳做法。

使用重試來處理暫時性錯誤

由於函式會持續重試,直到執行成功為止,因此在啟用重試功能前,請先透過測試從程式碼中排除錯誤 (例如錯誤)。重試最適合用於處理間歇性或暫時性失敗,因為這類失敗在重試時很可能會解決,例如不穩定的服務端點或逾時。

設定結束條件以避免無限的重試循環

最佳做法是在使用重試時,保護函式避免持續迴圈。方法是在函式開始處理前加入明確的結束條件。請注意,只有在函式順利啟動且能夠評估結束條件時,這項技巧才會生效。

一種簡單又有效的方法,就是捨棄時間戳記超過特定時間的事件。這有助於在失敗持續發生或持續時間比預期長時,避免執行過多作業。

例如,下面的程式碼片段會捨棄 10 秒之前的所有事件:

Node.js

const functions = require('@google-cloud/functions-framework');

/**
 * Cloud Event Function that only executes within
 * a certain time period after the triggering event
 *
 * @param {object} event The Cloud Functions event.
 * @param {function} callback The callback function.
 */
functions.cloudEvent('avoidInfiniteRetries', (event, callback) => {
  const eventAge = Date.now() - Date.parse(event.time);
  const eventMaxAge = 10000;

  // Ignore events that are too old
  if (eventAge > eventMaxAge) {
    console.log(`Dropping event ${event} with age ${eventAge} ms.`);
    callback();
    return;
  }

  // Do what the function is supposed to do
  console.log(`Processing event ${event} with age ${eventAge} ms.`);

  // Retry failed function executions
  const failed = false;
  if (failed) {
    callback('some error');
  } else {
    callback();
  }
});

Python

from datetime import datetime, timezone

# The 'python-dateutil' package must be included in requirements.txt.
from dateutil import parser

import functions_framework


@functions_framework.cloud_event
def avoid_infinite_retries(cloud_event):
    """Cloud Event Function that only executes within a certain
    time period after the triggering event.

    Args:
        cloud_event: The cloud event associated with the current trigger
    Returns:
        None; output is written to Stackdriver Logging
    """
    timestamp = cloud_event["time"]

    event_time = parser.parse(timestamp)
    event_age = (datetime.now(timezone.utc) - event_time).total_seconds()
    event_age_ms = event_age * 1000

    # Ignore events that are too old
    max_age_ms = 10000
    if event_age_ms > max_age_ms:
        print("Dropped {} (age {}ms)".format(cloud_event["id"], event_age_ms))
        return "Timeout"

    # Do what the function is supposed to do
    print("Processed {} (age {}ms)".format(cloud_event["id"], event_age_ms))
    return  # To retry the execution, raise an exception here

Go


// Package tips contains tips for writing Cloud Functions in Go.
package tips

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"github.com/cloudevents/sdk-go/v2/event"
)

func init() {
	functions.CloudEvent("FiniteRetryPubSub", FiniteRetryPubSub)
}

// MessagePublishedData contains the full Pub/Sub message
// See the documentation for more details:
// https://cloud.google.com/eventarc/docs/cloudevents#pubsub
type MessagePublishedData struct {
	Message PubSubMessage
}

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// FiniteRetryPubSub demonstrates how to avoid inifinite retries.
func FiniteRetryPubSub(ctx context.Context, e event.Event) error {
	var msg MessagePublishedData
	if err := e.DataAs(&msg); err != nil {
		return fmt.Errorf("event.DataAs: %w", err)
	}

	// Ignore events that are too old.
	expiration := e.Time().Add(10 * time.Second)
	if time.Now().After(expiration) {
		log.Printf("event timeout: halting retries for expired event '%q'", e.ID())
		return nil
	}

	// Add your message processing logic.
	return processTheMessage(msg)
}

Java


import com.google.cloud.functions.CloudEventsFunction;
import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.logging.Logger;

public class RetryTimeout implements CloudEventsFunction {
  private static final Logger logger = Logger.getLogger(RetryTimeout.class.getName());
  private static final long MAX_EVENT_AGE = 10_000;

  /**
   * Cloud Event Function that only executes within
   * a certain time period after the triggering event
   */
  @Override
  public void accept(CloudEvent event) throws Exception {
    ZonedDateTime utcNow = ZonedDateTime.now(ZoneOffset.UTC);
    ZonedDateTime timestamp = event.getTime().atZoneSameInstant(ZoneOffset.UTC);

    long eventAge = Duration.between(timestamp, utcNow).toMillis();

    // Ignore events that are too old
    if (eventAge > MAX_EVENT_AGE) {
      logger.info(String.format("Dropping event with timestamp %s.", timestamp));
      return;
    }

    // Process events that are recent enough
    // To retry this invocation, throw an exception here
    logger.info(String.format("Processing event with timestamp %s.", timestamp));
  }
}

C#

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace TimeBoundedRetries;

public class Function : ICloudEventFunction<MessagePublishedData>
{
    private static readonly TimeSpan MaxEventAge = TimeSpan.FromSeconds(10);
    private readonly ILogger _logger;

    // Note: for additional testability, use an injectable clock abstraction.
    public Function(ILogger<Function> logger) =>
        _logger = logger;

    public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
    {
        string textData = data.Message.TextData;

        DateTimeOffset utcNow = DateTimeOffset.UtcNow;

        // Every PubSub CloudEvent will contain a timestamp.
        DateTimeOffset timestamp = cloudEvent.Time.Value;
        DateTimeOffset expiry = timestamp + MaxEventAge;

        // Ignore events that are too old.
        if (utcNow > expiry)
        {
            _logger.LogInformation("Dropping PubSub message '{text}'", textData);
            return Task.CompletedTask;
        }

        // Process events that are recent enough.
        // If this processing throws an exception, the message will be retried until either
        // processing succeeds or the event becomes too old and is dropped by the code above.
        _logger.LogInformation("Processing PubSub message '{text}'", textData);
        return Task.CompletedTask;
    }
}

Ruby

require "functions_framework"

FunctionsFramework.cloud_event "avoid_infinite_retries" do |event|
  # Use the event timestamp to determine the event age.
  event_age_secs = Time.now - event.time.to_time
  event_age_ms = (event_age_secs * 1000).to_i

  max_age_ms = 10_000
  if event_age_ms > max_age_ms
    # Ignore events that are too old.
    logger.info "Dropped #{event.id} (age #{event_age_ms}ms)"

  else
    # Do what the function is supposed to do.
    logger.info "Handling #{event.id} (age #{event_age_ms}ms)..."
    failed = true

    # Raise an exception to signal failure and trigger a retry.
    raise "I failed!" if failed
  end
end

PHP


/**
 * This function shows an example method for avoiding infinite retries in
 * Google Cloud Functions. By default, functions configured to automatically
 * retry execution on failure will be retried indefinitely - causing an
 * infinite loop. To avoid this, we stop retrying executions (by not throwing
 * exceptions) for any events that are older than a predefined threshold.
 */

use Google\CloudFunctions\CloudEvent;

function avoidInfiniteRetries(CloudEvent $event): void
{
    $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');

    $eventId = $event->getId();

    // The maximum age of events to process.
    $maxAge = 10; // 10 seconds

    // The age of the event being processed.
    $eventAge = time() - strtotime($event->getTime());

    // Ignore events that are too old
    if ($eventAge > $maxAge) {
        fwrite($log, 'Dropping event ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);
        return;
    }

    // Do what the function is supposed to do
    fwrite($log, 'Processing event: ' . $eventId . ' with age ' . $eventAge . ' seconds' . PHP_EOL);

    // infinite_retries failed function executions
    $failed = true;
    if ($failed) {
        throw new Exception('Event ' . $eventId . ' failed; retrying...');
    }
}

區分可重試的函式和致命錯誤

如果函式已啟用重試功能,任何未處理的錯誤都會觸發重試。請確認程式碼會擷取任何不應導致重試的錯誤。

Node.js

const functions = require('@google-cloud/functions-framework');

/**
 * Register a Cloud Event Function that demonstrates
 * how to toggle retries using a promise
 *
 * @param {object} event The Cloud Event for the function trigger.
 */
functions.cloudEvent('retryPromise', cloudEvent => {
  // The Pub/Sub event payload is passed as the CloudEvent's data payload.
  // See the documentation for more details:
  // https://cloud.google.com/eventarc/docs/cloudevents#pubsub
  const base64PubsubMessage = cloudEvent.data.message.data;
  const jsonString = Buffer.from(base64PubsubMessage, 'base64').toString();

  const tryAgain = JSON.parse(jsonString).retry;

  if (tryAgain) {
    throw new Error('Retrying...');
  } else {
    console.error('Not retrying...');
    return Promise.resolve();
  }
});

/**
 * Cloud Event Function that demonstrates
 * how to toggle retries using a callback
 *
 * @param {object} event The Cloud Event for the function trigger.
 * @param {function} callback The callback function.
 */
functions.cloudEvent('retryCallback', (cloudEvent, callback) => {
  // The Pub/Sub event payload is passed as the CloudEvent's data payload.
  // See the documentation for more details:
  // https://cloud.google.com/eventarc/docs/cloudevents#pubsub
  const base64PubsubMessage = cloudEvent.data.message.data;
  const jsonString = Buffer.from(base64PubsubMessage, 'base64').toString();

  const tryAgain = JSON.parse(jsonString).retry;
  const err = new Error('Error!');

  if (tryAgain) {
    console.error('Retrying:', err);
    callback(err);
  } else {
    console.error('Not retrying:', err);
    callback();
  }
});

Python

import base64
import json

import functions_framework
from google.cloud import error_reporting


error_client = error_reporting.Client()


@functions_framework.cloud_event
def retry_or_not(cloud_event):
    """Cloud Event Function that demonstrates how to toggle retries.

    Args:
        cloud_event: The cloud event with a Pub/Sub data payload
    Returns:
        None; output is written to Stackdriver Logging
    """

    # The Pub/Sub event payload is passed as the CloudEvent's data payload.
    # See the documentation for more details:
    # https://cloud.google.com/eventarc/docs/cloudevents#pubsub
    encoded_pubsub_message = cloud_event.data["message"]["data"]

    # Retry based on a user-defined parameter
    try_again = json.loads(base64.b64decode(encoded_pubsub_message).decode())["retry"]

    try:
        raise RuntimeError("I failed you")
    except RuntimeError:
        error_client.report_exception()
        if try_again:
            raise  # Raise the exception and try again
        else:
            pass  # Swallow the exception and don't retry

Go

package tips

import (
	"context"
	"errors"
	"fmt"
	"log"

	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
	"github.com/cloudevents/sdk-go/v2/event"
)

func init() {
	functions.CloudEvent("RetryPubSub", RetryPubSub)
}

// MessagePublishedData contains the full Pub/Sub message
// See the documentation for more details:
// https://cloud.google.com/eventarc/docs/cloudevents#pubsub
type MessagePublishedData struct {
	Message PubSubMessage
}

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// RetryPubSub demonstrates how to toggle using retries.
func RetryPubSub(ctx context.Context, e event.Event) error {
	var msg MessagePublishedData
	if err := e.DataAs(&msg); err != nil {
		return fmt.Errorf("event.DataAs: %w", err)
	}

	name := string(msg.Message.Data)
	if name == "" {
		name = "World"
	}

	// A misconfigured client will stay broken until the function is redeployed.
	client, err := MisconfiguredDataClient()
	if err != nil {
		log.Printf("MisconfiguredDataClient (retry denied):  %v", err)
		// A nil return indicates that the function does not need a retry.
		return nil
	}

	// Runtime error might be resolved with a new attempt.
	if err = FailedWriteOperation(client, name); err != nil {
		log.Printf("FailedWriteOperation (retry expected): %v", err)
		// A non-nil return indicates that a retry is needed.
		return err
	}

	return nil
}

Java


import com.google.cloud.functions.CloudEventsFunction;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import functions.eventpojos.PubSubBody;
import io.cloudevents.CloudEvent;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Logger;

public class RetryPubSub implements CloudEventsFunction {
  private static final Logger logger = Logger.getLogger(RetryPubSub.class.getName());

  // Use Gson (https://github.com/google/gson) to parse JSON content.
  private static final Gson gson = new Gson();

  @Override
  public void accept(CloudEvent event) throws Exception {
    if (event.getData() == null) {
      logger.warning("No data found in event!");
      return;
    }

    // Extract Cloud Event data and convert to PubSubBody
    String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
    PubSubBody body = gson.fromJson(cloudEventData, PubSubBody.class);

    String encodedData = body.getMessage().getData();
    String decodedData =
        new String(Base64.getDecoder().decode(encodedData), StandardCharsets.UTF_8);

    // Retrieve and decode PubSubMessage data into a JsonElement.
    // Function is expecting a user-supplied JSON message which determines whether
    // to retry or not.
    JsonElement jsonPubSubMessageElement = gson.fromJson(decodedData, JsonElement.class);

    boolean retry = false;
    // Get the value of the "retry" JSON parameter, if one exists
    if (jsonPubSubMessageElement != null && jsonPubSubMessageElement.isJsonObject()) {
      JsonObject jsonPubSubMessageObject = jsonPubSubMessageElement.getAsJsonObject();

      if (jsonPubSubMessageObject.has("retry")
          && jsonPubSubMessageObject.get("retry").getAsBoolean()) {
        retry = true;
      }
    }

    // Retry if appropriate
    if (retry) {
      // Throwing an exception causes the execution to be retried
      throw new RuntimeException("Retrying...");
    } else {
      logger.info("Not retrying...");
    }
  }
}

C#

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace Retry;

public class Function : ICloudEventFunction<MessagePublishedData>
{
    private readonly ILogger _logger;

    public Function(ILogger<Function> logger) =>
        _logger = logger;

    public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
    {
        bool retry = false;
        string text = data.Message?.TextData;

        // Get the value of the "retry" JSON parameter, if one exists.
        if (!string.IsNullOrEmpty(text))
        {
            JsonElement element = JsonSerializer.Deserialize<JsonElement>(data.Message.TextData);

            retry = element.TryGetProperty("retry", out var property) &&
                property.ValueKind == JsonValueKind.True;
        }

        // Throwing an exception causes the execution to be retried.
        if (retry)
        {
            throw new InvalidOperationException("Retrying...");
        }
        else
        {
            _logger.LogInformation("Not retrying...");
        }
        return Task.CompletedTask;
    }
}

Ruby

require "functions_framework"

FunctionsFramework.cloud_event "retry_or_not" do |event|
  try_again = event.data["retry"]

  begin
    # Simulate a failure
    raise "I failed!"
  rescue RuntimeError => e
    logger.warn "Caught an error: #{e}"
    if try_again
      # Raise an exception to return a 500 and trigger a retry.
      logger.info "Trying again..."
      raise ex
    else
      # Return normally to end processing of this event.
      logger.info "Giving up."
    end
  end
end

PHP


use Google\CloudFunctions\CloudEvent;

function tipsRetry(CloudEvent $event): void
{
    $cloudEventData = $event->getData();
    $pubSubData = $cloudEventData['message']['data'];

    $json = json_decode(base64_decode($pubSubData), true);

    // Determine whether to retry the invocation based on a parameter
    $tryAgain = $json['some_parameter'];

    if ($tryAgain) {
        /**
         * Functions with automatic retries enabled should throw exceptions to
         * indicate intermittent failures that a retry might fix. In this
         * case, a thrown exception will cause the original function
         * invocation to be re-sent.
         */
        throw new Exception('Intermittent failure occurred; retrying...');
    }

    /**
     * If a function with retries enabled encounters a non-retriable
     * failure, it should return *without* throwing an exception.
     */
    $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');
    fwrite($log, 'Not retrying' . PHP_EOL);
}

讓可重試的事件導向函式具備冪等性

可重試的事件導向函式必須是冪等函式。以下是讓這類函式不重複的一般準則:

  • 許多外部 API (例如 Stripe) 都允許您以參數的形式提供冪等性鍵。如果您使用的是這類 API,請使用事件 ID 做為不重複性鍵。
  • 冪等性與至少一次遞送相容,因為這可確保重試作業的安全性。因此,撰寫可靠程式碼的一般最佳做法,就是結合不重複性和重試。
  • 請確保您的程式碼為內部冪等。例如:
    • 請確認異動可發生多次,且不會影響結果。
    • 在變更狀態之前,查詢交易中的資料庫狀態。
    • 確保所有連帶影響本身也是冪等的。
  • 在函式外部強制執行交易檢查,不受程式碼影響。例如,在某處記錄狀態,記錄特定事件 ID 已處理。
  • 請在頻外處理重複的函式呼叫。例如,您可以使用單獨的清理程序,在重複的函式呼叫後進行清理。

設定重試政策

視 Cloud Run 函式的需要而定,您可能需要直接設定重試政策。這樣一來,您就可以設定下列任意組合:

  • 將重試期間從 7 天縮短至最短 10 分鐘。
  • 變更指數型回退重試策略的最小和最大回退時間。
  • 變更重試策略,以便立即重試。
  • 設定無效信件主題
  • 設定傳送嘗試次數上限和下限。

如要設定重試政策,請按照下列步驟操作:

  1. 編寫 HTTP 函式。
  2. 使用 Pub/Sub API 建立 Pub/Sub 訂閱項目,並將函式的網址指定為目標。

如要進一步瞭解如何直接設定 Pub/Sub,請參閱 Pub/Sub 的處理失敗情況說明文件

後續步驟