Ereignisgesteuerte Übertragungen aus Cloud Storage

Storage Transfer Service kann auf Ereignisbenachrichtigungen in Google Cloudwarten, um automatisch Daten zu übertragen, die in einem Cloud Storage-Bucket hinzugefügt oder aktualisiert wurden. Weitere Informationen zu den Vorteilen von ereignisgesteuerten Übertragungen

Bei ereignisgesteuerten Übertragungen aus Cloud Storage werden Pub/Sub-Benachrichtigungen verwendet, um zu erfahren, wann Objekte im Quell-Bucket geändert oder hinzugefügt wurden. Das Löschen von Objekten wird nicht erkannt. Wenn Sie ein Objekt in der Quelle löschen, wird das zugehörige Objekt im Ziel-Bucket nicht gelöscht.

Bei ereignisgesteuerten Übertragungen wird immer ein Cloud Storage-Bucket als Ziel verwendet.

Berechtigungen konfigurieren

Zusätzlich zu den Berechtigungen, die für alle Übertragungsjobs erforderlich sind, ist für ereignisgesteuerte Übertragungen die Rolle Pub/Sub Subscriber erforderlich.

  1. So finden Sie den Namen des Storage Transfer Service-Dienst-Agents für Ihr Projekt:

    1. Rufen Sie die Referenzseite googleServiceAccounts.get auf.

      Es wird ein interaktives Steuerfeld mit dem Titel Diese Methode testen geöffnet.

    2. Geben Sie im Steuerfeld unter Anfrageparameter Ihre Projekt-ID ein. Das hier angegebene Projekt muss das Projekt sein, das Sie zum Verwalten des Storage Transfer Service verwenden. Dieses kann sich vom Projekt des Quell-Buckets unterscheiden.

    3. Klicken Sie auf Ausführen.

    Die E-Mail-Adresse des Dienst-Agents wird als Wert von accountEmail zurückgegeben. Kopieren Sie diesen Wert.

    Die E-Mail des Dienst-Agents hat das Format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Weisen Sie dem Storage Transfer Service-Dienst-Agent die Rolle Pub/Sub Subscriber zu.

    Cloud Console

    Folgen Sie der Anleitung unter Zugriff über die Google Cloud -Konsole steuern, um dem Storage Transfer Service die Rolle Pub/Sub Subscriber zuzuweisen. Die Rolle kann auf Themen-, Abo- oder Projektebene gewährt werden.

    gcloud CLI

    Folgen Sie der Anleitung unter Richtlinie festlegen, um die folgende Bindung hinzuzufügen:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Pub/Sub konfigurieren

  1. Achten Sie darauf, dass Sie die Voraussetzungen für die Verwendung von Pub/Sub mit Cloud Storage erfüllt haben.

  2. Erstellen Sie eine Pub/Sub-Benachrichtigung für den Cloud Storage-Quellbucket.

    Sie können Pub/Sub-Benachrichtigungen nicht mit der Google Cloud Console verwalten. Verwenden Sie stattdessen die gcloud CLI oder eine der verfügbaren Clientbibliotheken.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Erstellen Sie ein Pull-Abo für das Thema. Sie müssen für jeden Übertragungsjob ein separates Abo erstellen.

    Das folgende Beispiel zeigt den Google Cloud CLI-Befehl zum Erstellen eines Pull-Abos. Eine Anleitung für die Konsole und Clientbibliotheks-Code finden Sie unter Pull-Abo erstellen.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Übertragungsjob erstellen

Sie können die REST API oder die Google Cloud -Konsole verwenden, um einen ereignisbasierten Übertragungsjob zu erstellen.

Der Name des Übertragungsjobs darf keine vertraulichen Informationen wie personenidentifizierbare Informationen (PII) oder Sicherheitsdaten enthalten. Ressourcennamen können an die Namen anderer Google Cloud Ressourcen weitergegeben und für Google-interne Systeme außerhalb Ihres Projekts verfügbar gemacht werden.

Cloud Console

  1. Rufen Sie in der Google Cloud -Console die Seite Übertragungsjob erstellen auf.

    Übertragungsjob erstellen

  2. Wählen Sie Cloud Storage als Quelle und Ziel aus.

  3. Wählen Sie als Planungsmodus die Option Ereignisgesteuert aus und klicken Sie auf Nächster Schritt.

  4. Wählen Sie den Quell-Bucket für diesen Transfer aus.

  5. Geben Sie im Bereich Eventstream den Namen des Abos ein:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Optional: Definieren Sie Filter und klicken Sie dann auf Nächster Schritt.

  7. Wählen Sie den Ziel-Bucket für diese Übertragung aus.

  8. Optional: Geben Sie eine Start- und Endzeit für die Übertragung ein. Wenn Sie keine Zeit angeben, beginnt die Übertragung sofort und wird ausgeführt, bis sie manuell beendet wird.

  9. Geben Sie Übertragungsoptionen an. Weitere Informationen finden Sie auf der Seite Übertragungen erstellen.

  10. Klicken Sie auf Erstellen.

Nachdem der Übertragungsjob erstellt wurde, wird er ausgeführt und ein Event-Listener wartet auf Benachrichtigungen zum Pub/Sub-Abo. Auf der Seite mit den Jobdetails wird für jede Stunde ein Vorgang angezeigt. Außerdem sind Details zu den für jeden Job übertragenen Daten enthalten.

REST

Senden Sie das folgende JSON-Objekt an den Endpunkt transferJobs.create, um eine ereignisgesteuerte Übertragung mit der REST API zu erstellen:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Die Parameter eventStreamStartTime und eventStreamExpirationTime sind optional. Wenn die Startzeit weggelassen wird, beginnt die Übertragung sofort. Wenn die Endzeit weggelassen wird, wird die Übertragung fortgesetzt, bis sie manuell beendet wird.

Clientbibliotheken

Go

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Informationen zum Installieren und Verwenden der Clientbibliothek für Storage Transfer Service finden Sie unter Storage Transfer Service-Clientbibliotheken. Weitere Informationen finden Sie in der Referenzdokumentation zur Storage Transfer Service Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich beim Storage Transfer Service zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Ereignisgesteuerte Übertragung überwachen

Wenn Sie eine ereignisgesteuerte Übertragung erstellen, wird im Storage Transfer Service ein Übertragungsjob erstellt. Sobald die Startzeit erreicht ist, wird ein Übertragungsvorgang ausgeführt und ein Event-Listener wartet auf Benachrichtigungen aus der Pub/Sub-Warteschlange.

Der Übertragungsvorgang wird mit dem Status in progress etwa 24 Stunden lang ausgeführt. Nach 24 Stunden wird der Vorgang abgeschlossen und ein neuer Vorgang beginnt. Alle 24 Stunden wird ein neuer Vorgang erstellt, bis die Endzeit des Übertragungsjobs erreicht ist oder der Job manuell beendet wird.

Wenn eine Dateiübertragung läuft, wenn der Vorgang abgeschlossen sein soll, wird der aktuelle Vorgang fortgesetzt, bis die Datei vollständig übertragen wurde. Ein neuer Vorgang wird gestartet und die beiden Vorgänge werden gleichzeitig ausgeführt, bis der alte Vorgang abgeschlossen ist. Alle Ereignisse, die in diesem Zeitraum erkannt werden, werden vom neuen Vorgang verarbeitet.

So rufen Sie den aktuellen Vorgang und alle abgeschlossenen Vorgänge auf:

Google Cloud console

  1. Rufen Sie in der Google Cloud -Console die Seite Storage Transfer Service auf.

    Storage Transfer Service aufrufen

  2. Wählen Sie in der Jobliste entweder den Tab Alle oder Cloud-zu-Cloud aus.

  3. Klicken Sie auf die Job-ID für die Übertragung. In der Spalte Planungsmodus werden alle ereignisgesteuerten Übertragungen im Vergleich zu Batchübertragungen angegeben.

  4. Wählen Sie den Tab Vorgänge aus. Details zum aktuellen Vorgang werden angezeigt und abgeschlossene Vorgänge sind in der Tabelle Ausführungsverlauf aufgeführt. Klicken Sie auf einen abgeschlossenen Vorgang, um weitere Details zu sehen.

gcloud

Verwenden Sie gcloud transfer jobs monitor, um den Fortschritt eines Jobs in Echtzeit zu überwachen. Die Antwort enthält den aktuellen Vorgang, die Startzeit des Jobs, die Menge der übertragenen Daten, die Anzahl der übersprungenen Bytes und die Anzahl der Fehler.

gcloud transfer jobs monitor JOB_NAME

So rufen Sie den aktuellen Vorgangsnamen ab:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

So listen Sie aktuelle und abgeschlossene Vorgänge auf:

gcloud transfer operations list --job-names=JOB_NAME

So rufen Sie Details zu einem Vorgang auf:

gcloud transfer operations describe OPERATION_NAME