從 AWS S3 進行事件驅動轉移

Storage 移轉服務可以監聽 AWS 中的事件通知,自動將來源位置新增或更新的資料移轉至 Cloud Storage bucket。進一步瞭解事件導向移轉作業的優點。

事件驅動型轉移作業會監聽傳送至 Amazon SQS 的 Amazon S3 事件通知,瞭解來源值區中的物件何時經過修改或新增。系統不會偵測物件刪除作業;在來源刪除物件不會一併刪除目的地值區中的相關聯物件。

事件驅動型移轉一律會使用 Cloud Storage 值區做為目的地。

事前準備

按照操作說明,在目標 Cloud Storage 值區中授予必要權限:

建立 SQS 佇列

  1. 在 AWS 控制台中,前往「Simple Queue Service」(簡單佇列服務) 頁面。

  2. 按一下「建立佇列」

  3. 輸入這個佇列的名稱

  4. 在「存取權政策」部分,選取「進階」。系統會顯示 JSON 物件:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    每個專案的 AWSResource 值都不相同。

  5. 將顯示的 JSON 中的 AWSResource 特定值複製到下列 JSON 片段:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    上述 JSON 中的預留位置值採用下列格式:

    • AWS 是代表 Amazon Web Services 專案的數值。例如:"aws:SourceAccount": "1234567890"
    • RESOURCE 是用來識別這個佇列的 Amazon 資源編號 (ARN)。例如:"Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN 是用來識別來源值區的 ARN。例如:"aws:SourceArn": "arn:aws:s3:::example-aws-bucket"。您可以在 AWS 控制台的 bucket 詳細資料頁面中,從「Properties」分頁找到 bucket 的 ARN。
  6. 將「存取權政策」部分顯示的 JSON 替換為上述更新後的 JSON。

  7. 按一下「建立佇列」

完成後,請記下佇列的 Amazon Resource Name (ARN)。ARN 的格式如下:

arn:aws:sqs:us-east-1:1234567890:event-queue"

啟用 S3 值區的通知

  1. 在 AWS 控制台中,前往「S3」S3頁面。

  2. 在「Buckets」(值區) 清單中,選取來源值區。

  3. 選取「屬性」分頁標籤。

  4. 在「活動通知」部分,按一下「建立活動通知」

  5. 指定這項活動的名稱。

  6. 在「事件類型」部分,選取「所有物件建立事件」

  7. 在「目的地」中選取「SQS 佇列」,然後選取您為這項轉移作業建立的佇列。

  8. 按一下 [儲存變更]。

設定權限

請按照「設定來源存取權:Amazon S3」一文中的操作說明,建立存取金鑰 ID 和私密金鑰,或是 Federated Identity 角色。

將自訂權限 JSON 換成下列內容:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

建立後,請記下下列資訊:

  • 記下使用者的存取金鑰 ID 和密鑰。
  • 如果是聯合身分角色,請記下 Amazon Resource Name (ARN),格式為 arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME

建立移轉工作

您可以使用 REST API 或 Google Cloud 控制台建立以事件為準的轉移作業。

Cloud 控制台

  1. 前往 Google Cloud 控制台的「建立移轉工作」頁面。

    前往「建立轉移工作」

  2. 選取「Amazon S3」做為來源類型,並選取「Cloud Storage」做為目的地。

  3. 在「Scheduling mode」中選取「Event-driven」,然後按一下「Next step」

  4. 輸入 S3 bucket 名稱。值區名稱即是在 AWS 管理主控台中顯示的名稱。例如:my-aws-bucket

  5. 選取驗證方式,然後輸入您在上一個部分建立並記錄的資訊。

  6. 輸入您先前建立的 Amazon SQS 佇列 ARN。格式如下:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. 視需要定義篩選器,然後按一下「下一步」

  8. 選取目標 Cloud Storage 值區和路徑 (選用)。

  9. 視需要輸入轉移作業的開始和結束時間。如果未指定時間,轉移作業會立即開始,並持續執行,直到手動停止為止。

  10. 指定任何轉移選項。詳情請參閱「建立轉移作業」頁面。

  11. 點選「建立」

建立完成後,移轉工作就會開始執行,事件監聽器則會等待 SQS 佇列的通知。工作詳細資料頁面每小時會顯示一項作業,並列出每項工作傳輸的資料詳細資料。

REST

如要使用 REST API 建立事件驅動的轉移作業,請將下列 JSON 物件傳送至 transferJobs.create 端點:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTimeeventStreamExpirationTime 為選用欄位。如果省略開始時間,系統會立即開始轉移;如果省略結束時間,系統會持續轉移,直到手動停止為止。

用戶端程式庫

Go

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Go API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Java API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Node.js API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

如要瞭解如何安裝及使用 Storage 移轉服務的用戶端程式庫,請參閱這篇文章。 詳情請參閱 Storage 移轉服務 Python API 參考說明文件

如要向 Storage 移轉服務進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")