Ereignisgesteuerte Übertragungen von AWS S3

Der Storage Transfer Service kann Ereignisbenachrichtigungen in AWS empfangen, um Daten, die am Speicherort der Quelle hinzugefügt oder aktualisiert wurden, automatisch in einen Cloud Storage-Bucket zu übertragen. Weitere Informationen zu den Vorteilen ereignisgesteuerter Übertragungen

Bei ereignisgesteuerten Übertragungen werden Amazon S3-Ereignisbenachrichtigungen an Amazon SQS gesendet, um zu erfahren, wann Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Das Löschen von Objekten wird nicht erkannt. Wenn Sie ein Objekt in der Quelle löschen, wird das zugehörige Objekt im Ziel-Bucket nicht gelöscht.

SQS-Warteschlange erstellen

  1. Rufen Sie in der AWS Console die Seite Simple Queue Service auf.

  2. Klicken Sie auf Warteschlange erstellen.

  3. Geben Sie einen Namen für diese Warteschlange ein.

  4. Wählen Sie im Bereich Zugriffsrichtlinie die Option Erweitert aus. Ein JSON-Objekt wird so angezeigt:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Die Werte von AWS und Resource sind für jedes Projekt eindeutig.

  5. Kopieren Sie die Werte für AWS und Resource aus der angezeigten JSON-Datei in das folgende JSON-Snippet:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Die Werte der Platzhalter im obigen JSON-Code haben folgendes Format:

    • AWS ist ein numerischer Wert, der Ihr Amazon Web Services-Projekt darstellt. Beispiel: "aws:SourceAccount": "1234567890".
    • RESOURCE ist eine Amazon Resource Number (ARN), die diese Warteschlange identifiziert. Beispiel: "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN ist ein ARN, der den Quell-Bucket identifiziert. Beispiel: "aws:SourceArn": "arn:aws:s3:::example-aws-bucket" Sie finden die ARN eines Buckets auf der Seite mit den Bucket-Details in der AWS-Konsole auf dem Tab Properties (Eigenschaften).
  6. Ersetzen Sie die im Abschnitt Zugriffsrichtlinie angezeigte JSON durch die aktualisierte JSON-Datei oben.

  7. Klicken Sie auf Warteschlange erstellen.

Notieren Sie sich den Amazon Resource Name (ARN) der Warteschlange. Die ARN hat folgendes Format:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Benachrichtigungen für Ihren S3-Bucket aktivieren

  1. Rufen Sie in der AWS Console die Seite S3 auf.

  2. Wählen Sie in der Liste Buckets den Quell-Bucket aus.

  3. Wählen Sie den Tab Eigenschaften aus.

  4. Klicken Sie im Bereich Terminbenachrichtigungen auf Terminbenachrichtigung erstellen.

  5. Geben Sie einen Namen für dieses Ereignis an.

  6. Wählen Sie im Bereich Ereignistypen die Option Alle Ereignisse vom Typ „Objekt erstellt“ aus.

  7. Wählen Sie als Ziel SQS-Warteschlange und dann die Warteschlange aus, die Sie für diese Übertragung erstellt haben.

  8. Klicken Sie auf Änderungen speichern.

Berechtigungen konfigurieren

Folgen Sie der Anleitung unter Zugriff auf eine Quelle konfigurieren: Amazon S3, um entweder eine Zugriffsschlüssel-ID und einen geheimen Schlüssel oder eine Rolle für föderierte Identitäten zu erstellen.

Ersetzen Sie das JSON-Objekt für benutzerdefinierte Berechtigungen durch Folgendes:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Notieren Sie sich nach dem Erstellen die folgenden Informationen:

  • Notieren Sie sich für einen Nutzer die Zugriffsschlüssel-ID und den geheimen Schlüssel.
  • Notieren Sie sich für eine Federated Identity-Rolle den Amazon Resource Name (ARN), der das Format arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME hat.

Übertragungsjob erstellen

Sie können einen ereignisbasierten Übertragungsjob mit der REST API oder der Google Cloud Console erstellen.

Cloud Console

  1. Rufen Sie in der Google Cloud Console die Seite Übertragungsjob erstellen auf.

    Gehen Sie zu Übertragungsjob erstellen.

  2. Wählen Sie als Quelltyp Amazon S3 und als Ziel Cloud Storage aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Weiterer Schritt.

  4. Geben Sie den Namen Ihres S3-Buckets ein. Der Bucket-Name ist der Name, der in der AWS Management Console angezeigt wird. Beispiel: my-aws-bucket

  5. Wählen Sie die Authentifizierungsmethode aus und geben Sie die erforderlichen Informationen ein, die Sie im vorherigen Abschnitt erstellt und notiert haben.

  6. Geben Sie den ARN der zuvor erstellten Amazon SQS-Warteschlange ein. Sie hat das folgende Format:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Optional: Definieren Sie Filter und klicken Sie dann auf Vorgehen.

  8. Wählen Sie den Ziel-Cloud Storage-Bucket und optional den Pfad aus.

  9. Optional: Geben Sie eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Uhrzeit angeben, beginnt die Übertragung sofort und läuft, bis sie manuell beendet wird.

  10. Geben Sie gegebenenfalls Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  11. Klicken Sie auf Erstellen.

Nach der Erstellung wird der Übertragungsjob gestartet und ein Ereignisempfänger wartet auf Benachrichtigungen in der SQS-Warteschlange. Auf der Seite mit den Jobdetails wird jede Stunde eine Operation angezeigt. Außerdem sind Details zu den für jeden Job übertragenen Daten enthalten.

REST

Wenn Sie eine ereignisgesteuerte Übertragung mit der REST API erstellen möchten, senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wenn die Endzeit weggelassen wird, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")