由事件驅動的 Cloud Storage 轉移作業

Storage 移轉服務可以監聽 Google Cloud中的事件通知,自動移轉 Cloud Storage 值區中新增或更新的資料。進一步瞭解事件導向移轉作業的優點。

從 Cloud Storage 進行事件驅動的移轉作業時,系統會使用 Pub/Sub 通知,瞭解來源 bucket 中的物件何時經過修改或新增。系統不會偵測物件刪除作業;在來源刪除物件不會一併刪除目的地值區中的相關聯物件。

事件驅動型移轉一律會使用 Cloud Storage 值區做為目的地。

設定權限

除了所有轉移作業都需要的權限,事件驅動轉移作業還需要 Pub/Sub Subscriber 角色。

  1. 找出專案的 Storage 移轉服務服務代理名稱:

    1. 前往 googleServiceAccounts.get 參考頁面

      系統會開啟互動式面板,標題為「試試這個方法」

    2. 在面板的「要求參數」下方,輸入專案 ID。您在此指定的專案必須是用來管理 Storage 移轉服務的專案,可能與來源值區的專案不同。

    3. 按一下 [Execute] (執行)

    服務專員的電子郵件地址會以 accountEmail 的值傳回。複製這個值。

    服務專員的電子郵件地址格式為 project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com

  2. Pub/Sub Subscriber 角色授予 Storage 移轉服務服務代理。

    Cloud 控制台

    請按照「透過 Google Cloud 控制台控管存取權」一文中的操作說明,將 Pub/Sub Subscriber 角色授予 Storage Transfer Service 服務。您可以在主題、訂閱項目或專案層級授予角色。

    gcloud CLI

    按照「設定政策」中的操作說明,新增下列繫結:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

設定 Pub/Sub

  1. 請確認您已滿足將 Pub/Sub 與 Cloud Storage 搭配使用的先決條件

  2. 為來源 Cloud Storage bucket 建立 Pub/Sub 通知。

    您無法使用 Google Cloud 控制台管理 Pub/Sub 通知。 請改用 gcloud CLI 或其中一個可用的用戶端程式庫

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. 為該主題建立提取訂閱項目。您必須為每項移轉工作建立個別的訂閱項目。

    下列範例顯示用來建立提取訂閱項目的 Google Cloud CLI 指令。如需控制台操作說明和用戶端程式庫程式碼,請參閱「建立提取訂閱項目」。

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

建立移轉工作

您可以使用 REST API 或 Google Cloud 控制台建立以事件為準的轉移作業。

請勿在轉移作業名稱中加入個人識別資訊 (PII) 或安全性資料等私密資訊。資源名稱可能會傳播至其他 Google Cloud 資源的名稱,並可能向專案外部的 Google 內部系統公開。

Cloud 控制台

  1. 前往 Google Cloud 控制台的「建立移轉工作」頁面。

    前往「建立轉移工作」

  2. 選取「Cloud Storage」做為來源和目的地。

  3. 在「Scheduling mode」中選取「Event-driven」,然後按一下「Next step」

  4. 選取這項轉移作業的來源 bucket。

  5. 在「事件串流」部分,輸入訂閱名稱:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. 視需要定義篩選器,然後按一下「下一步」

  7. 選取這項轉移作業的目的地 bucket。

  8. 視需要輸入轉移作業的開始和結束時間。如果未指定時間,系統會立即開始移轉,並持續執行直到手動停止為止。

  9. 指定任何轉移選項。詳情請參閱「建立轉移作業」頁面。

  10. 點選「建立」

建立完成後,轉移工作就會開始執行,事件監聽器則會等待 Pub/Sub 訂閱項目的通知。工作詳細資料頁面會顯示每小時的作業,並包含每項工作傳輸的資料詳細資料。

REST

如要使用 REST API 建立事件驅動的轉移作業,請將下列 JSON 物件傳送至 transferJobs.create 端點:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 為選用欄位。如果省略開始時間,系統會立即開始轉移;如果省略結束時間,系統會持續轉移,直到手動停止為止。

用戶端程式庫

Go

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Go API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Java API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Node.js API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Python API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

監控事件導向移轉作業

建立事件驅動型移轉作業時,Storage 移轉服務會建立移轉工作。到達開始時間後,系統就會開始執行轉移作業,而事件監聽器會等待 Pub/Sub 佇列傳送通知。

轉移作業會執行約 24 小時,狀態為 in progress。24 小時後,作業會完成並開始新的作業。 每 24 小時會建立一項新作業,直到轉移工作結束時間為止,或直到工作手動停止。

如果作業排定完成時正在進行檔案傳輸,系統會繼續執行作業,直到檔案傳輸完畢為止。系統會啟動新作業,並同時執行這兩項作業,直到舊作業完成為止。這段期間偵測到的任何事件,都會由新作業處理。

如要查看目前和已完成的作業,請按照下列步驟操作:

Google Cloud 控制台

  1. 前往 Google Cloud 控制台的「Storage Transfer Service」頁面。

    前往 Storage 移轉服務

  2. 在工作清單中,選取「全部」分頁或「雲端對雲端」

  3. 按一下移轉作業的工作 ID。「排程模式」欄會列出所有事件驅動的移轉作業,以及批次移轉作業。

  4. 選取「作業」分頁標籤。系統會顯示目前作業的詳細資料,並在「執行記錄」表格中列出已完成的作業。按一下任何已完成的作業,即可查看其他詳細資料。

gcloud

如要即時監控工作進度,請使用 gcloud transfer jobs monitor。回應會顯示目前的作業、工作開始時間、已轉移的資料量、略過的位元組,以及錯誤計數。

gcloud transfer jobs monitor JOB_NAME

如要擷取目前的作業名稱,請執行下列指令:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

如要列出目前和已完成的作業,請執行下列指令:

gcloud transfer operations list --job-names=JOB_NAME

如要查看作業的詳細資料:

gcloud transfer operations describe OPERATION_NAME