Ereignisgesteuerte Übertragungen von AWS S3

Storage Transfer Service kann Ereignisbenachrichtigungen in AWS abrufen, um automatisch Daten zu übertragen, die am Quellspeicherort hinzugefügt oder aktualisiert wurden, in einen Cloud Storage-Bucket. Weitere Informationen zu den Vorteilen von ereignisgesteuerten Übertragungen

Bei ereignisgesteuerten Übertragungen wird auf Amazon S3-Ereignisbenachrichtigungen gewartet, die an Amazon SQS gesendet werden, um zu erfahren, wann Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Das Löschen von Objekten wird nicht erkannt. Wenn Sie ein Objekt in der Quelle löschen, wird das zugehörige Objekt im Ziel-Bucket nicht gelöscht.

Bei ereignisgesteuerten Übertragungen wird immer ein Cloud Storage-Bucket als Ziel verwendet.

Hinweise

Folgen Sie der Anleitung, um die erforderlichen Berechtigungen für Ihren Cloud Storage-Ziel-Bucket zu gewähren:

SQS-Warteschlange erstellen

  1. Rufen Sie in der AWS Console die Seite Simple Queue Service auf.

  2. Klicken Sie auf Warteschlange erstellen.

  3. Geben Sie einen Namen für diese Warteschlange ein.

  4. Wählen Sie im Abschnitt Zugriffsrichtlinie die Option Erweitert aus. Ein JSON-Objekt wird angezeigt:

     {
        "Version": "2008-10-17",
        "Id": "__default_policy_ID",
        "Statement": [
          {
            "Sid": "__owner_statement",
            "Effect": "Allow",
            "Principal": {
              "AWS": "01234567890"
            },
            "Action": [
              "SQS:*"
            ],
            "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
          }
        ]
      }
      

    Die Werte von AWS und Resource sind für jedes Projekt eindeutig.

  5. Kopieren Sie Ihre spezifischen Werte von AWS und Resource aus der angezeigten JSON-Datei in das folgende JSON-Snippet:

    {
      "Version": "2012-10-17",
      "Id": "example-ID",
      "Statement": [
        {
          "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "Service": "s3.amazonaws.com"
          },
          "Action": "SQS:SendMessage",
          "Resource": "RESOURCE",
          "Condition": {
            "StringEquals": {
              "aws:SourceAccount": "AWS"
            },
            "ArnLike": {
              "aws:SourceArn": "S3_BUCKET_ARN"
            }
          }
        }
      ]
    }

    Die Werte der Platzhalter im vorherigen JSON-Code haben das folgende Format:

    • AWS ist ein numerischer Wert, der Ihr Amazon Web Services-Projekt darstellt. Beispiel: "aws:SourceAccount": "1234567890".
    • RESOURCE ist eine Amazon Resource Number (ARN), die diese Warteschlange identifiziert. Beispiel: "Resource": "arn:aws:sqs:us-west-2:01234567890:test"
    • S3_BUCKET_ARN ist ein ARN, der den Quell-Bucket identifiziert. Beispiel: "aws:SourceArn": "arn:aws:s3:::example-aws-bucket" Sie finden den ARN eines Buckets in der AWS-Konsole auf der Detailseite des Buckets auf dem Tab Properties (Eigenschaften).
  6. Ersetzen Sie die JSON-Datei, die im Bereich Zugriffsrichtlinie angezeigt wird, durch die aktualisierte JSON-Datei oben.

  7. Klicken Sie auf Warteschlange erstellen.

Notieren Sie sich nach Abschluss den Amazon Resource Name (ARN) der Warteschlange. Der ARN hat das folgende Format:

arn:aws:sqs:us-east-1:1234567890:event-queue"

Benachrichtigungen für Ihren S3-Bucket aktivieren

  1. Rufen Sie in der AWS Console die Seite S3 auf.

  2. Wählen Sie in der Liste Buckets den Quell-Bucket aus.

  3. Wählen Sie den Tab Eigenschaften aus.

  4. Klicken Sie im Bereich Terminbenachrichtigungen auf Terminbenachrichtigung erstellen.

  5. Geben Sie einen Namen für dieses Ereignis an.

  6. Wählen Sie im Bereich Ereignistypen die Option Alle Ereignisse zum Erstellen von Objekten aus.

  7. Wählen Sie als Ziel die Option SQS-Warteschlange und dann die Warteschlange aus, die Sie für diese Übertragung erstellt haben.

  8. Klicken Sie auf Änderungen speichern.

Berechtigungen konfigurieren

Folgen Sie der Anleitung unter Zugriff auf eine Quelle konfigurieren: Amazon S3, um entweder eine Zugriffsschlüssel-ID und einen geheimen Schlüssel oder eine Rolle für die Federated Identity zu erstellen.

Ersetzen Sie das JSON für benutzerdefinierte Berechtigungen durch Folgendes:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:ReceiveMessage",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::S3_BUCKET_NAME",
                "arn:aws:s3:::S3_BUCKET_NAME/*",
                "AWS_QUEUE_ARN"
            ]
        }
    ]
}

Notieren Sie sich nach dem Erstellen die folgenden Informationen:

  • Notieren Sie sich für einen Nutzer die Zugriffsschlüssel-ID und den geheimen Schlüssel.
  • Notieren Sie sich für eine Federated Identity-Rolle den Amazon Resource Name (ARN) im Format arn:aws:iam::AWS_ACCOUNT:role/ROLE_NAME.

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud -Konsole verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Cloud Console

  1. Rufen Sie in der Google Cloud -Console die Seite Übertragungsjob erstellen auf.

    Übertragungsjob erstellen

  2. Wählen Sie Amazon S3 als Quelltyp und Cloud Storage als Ziel aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Geben Sie den Namen Ihres S3-Buckets ein. Der Bucket-Name ist der Name, der in der AWS Management Console angezeigt wird. Beispiel: my-aws-bucket

  5. Wählen Sie Ihre Authentifizierungsmethode aus und geben Sie die angeforderten Informationen ein, die Sie im vorherigen Abschnitt erstellt und notiert haben.

  6. Geben Sie den ARN der Amazon SQS-Warteschlange ein, die Sie zuvor erstellt haben. Dabei wird das folgende Format verwendet:

    arn:aws:sqs:us-east-1:1234567890:event-queue"
    
  7. Optional: Definieren Sie Filter und klicken Sie dann auf Nächster Schritt.

  8. Wählen Sie den Cloud Storage-Ziel-Bucket und optional den Pfad aus.

  9. Optional: Geben Sie eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, beginnt die Übertragung sofort und wird fortgesetzt, bis sie manuell beendet wird.

  10. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  11. Klicken Sie auf Erstellen.

Nach der Erstellung wird der Übertragungsjob ausgeführt und ein Event-Listener wartet auf Benachrichtigungen in der SQS-Warteschlange. Auf der Seite „Jobdetails“ wird pro Stunde ein Vorgang angezeigt. Sie enthält Details zu den für jeden Job übertragenen Daten.

REST

Senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create, um eine ereignisgesteuerte Übertragung mit der REST API zu erstellen:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "awsS3DataSource" {
      "bucketName": "AWS_SOURCE_NAME",
      "roleArn": "arn:aws:iam::1234567891011:role/role_for_federated_auth"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Die Parameter eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wenn die Endzeit weggelassen wird, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenAWSTransfer(w io.Writer, projectID string, s3SourceBucket string, gcsSinkBucket string, sqsQueueARN string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source AWS S3 bucket.
	// s3SourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Amazon Resource Name (ARN) of the AWS SNS queue to subscribe the event driven transfer to.
	// sqsQueueARN := "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue"

	// The AWS access key credential, should be accessed via environment variable for security
	awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID")

	// The AWS secret key credential, should be accessed via environment variable for security
	awsSecretKey := os.Getenv("AWS_SECRET_ACCESS_KEY")

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_AwsS3DataSource{
					AwsS3DataSource: &storagetransferpb.AwsS3Data{
						BucketName: s3SourceBucket,
						AwsAccessKey: &storagetransferpb.AwsAccessKey{
							AccessKeyId:     awsAccessKeyID,
							SecretAccessKey: awsSecretKey,
						}},
				},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: sqsQueueARN},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", s3SourceBucket, gcsSinkBucket, sqsQueueARN, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenAwsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the source AWS bucket to transfer data from
    String s3SourceBucket = "yourS3SourceBucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-bucket";

    // The ARN of the SQS queue to subscribe to
    String sqsQueueArn = "arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue";

    createEventDrivenAwsTransfer(projectId, s3SourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenAwsTransfer(
      String projectId, String s3SourceBucket, String gcsSinkBucket, String sqsQueueArn)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      // The ID used to access your AWS account. Should be accessed via environment variable.
      String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");

      // The Secret Key used to access your AWS account. Should be accessed via environment
      // variable.
      String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setAwsS3DataSource(
                          TransferTypes.AwsS3Data.newBuilder()
                              .setBucketName(s3SourceBucket)
                              .setAwsAccessKey(
                                  TransferTypes.AwsAccessKey.newBuilder()
                                      .setAccessKeyId(awsAccessKeyId)
                                      .setSecretAccessKey(awsSecretAccessKey))
                              .build())
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(sqsQueueArn).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job from "
              + s3SourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + sqsQueueArn
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// AWS S3 source bucket name
// s3SourceBucket = 'my-s3-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The ARN of the SQS queue to subscribe to
// sqsQueueArn = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

// AWS Access Key ID. Should be accessed via environment variable for security.
// awsAccessKeyId = 'AKIA...'

// AWS Secret Access Key. Should be accessed via environment variable for security.
// awsSecretAccessKey = 'HEAoMK2.../...ku8'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks an SQS queue.
 */
async function createEventDrivenAwsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        awsS3DataSource: {
          bucketName: s3SourceBucket,
          awsAccessKey: {
            accessKeyId: awsAccessKeyId,
            secretAccessKey: awsSecretAccessKey,
          },
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: sqsQueueArn,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${s3SourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenAwsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer


def create_event_driven_aws_transfer(
    project_id: str,
    description: str,
    source_s3_bucket: str,
    sink_gcs_bucket: str,
    sqs_queue_arn: str,
    aws_access_key_id: str,
    aws_secret_access_key: str,
):
    """Create an event driven transfer between two GCS buckets that tracks an AWS SQS queue"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks an SQS queue'

    # AWS S3 source bucket name
    # source_s3_bucket = 'my-s3-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_gcs_bucket = 'my-gcs-destination-bucket'

    # The ARN of the SQS queue to subscribe to
    # pubsub_id = 'arn:aws:sqs:us-east-1:1234567891011:s3-notification-queue'

    # AWS Access Key ID. Should be accessed via environment variable for security purposes.
    # aws_access_key_id = 'AKIA...'

    # AWS Secret Access Key. Should be accessed via environment variable for security purposes.
    # aws_secret_access_key = 'HEAoMK2.../...ku8'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "aws_s3_data_source": {
                        "bucket_name": source_s3_bucket,
                        "aws_access_key": {
                            "access_key_id": aws_access_key_id,
                            "secret_access_key": aws_secret_access_key,
                        },
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_gcs_bucket,
                    },
                },
                "event_stream": {
                    "name": sqs_queue_arn,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")