Trasferimenti basati su eventi da Cloud Storage

Storage Transfer Service può ascoltare le notifiche di eventi in Google Cloud per trasferire automaticamente i dati aggiunti o aggiornati in un bucket Cloud Storage. Scopri di più sui vantaggi dei trasferimenti basati su eventi.

I trasferimenti basati su eventi da Cloud Storage utilizzano le notifiche Pub/Sub per sapere quando gli oggetti nel bucket di origine sono stati modificati o aggiunti. Le eliminazioni di oggetti non vengono rilevate; l'eliminazione di un oggetto nell'origine non elimina l'oggetto associato nel bucket di destinazione.

I trasferimenti basati su eventi utilizzano sempre un bucket Cloud Storage come destinazione.

Configura autorizzazioni

Oltre alle autorizzazioni richieste per tutti i job di trasferimento, i trasferimenti basati su eventi richiedono il ruolo Pub/Sub Subscriber.

  1. Trova il nome dell'agente di servizio Storage Transfer Service per il tuo progetto:

    1. Vai alla pagina di riferimento di googleServiceAccounts.get.

      Si apre un riquadro interattivo intitolato Prova questo metodo.

    2. Nel riquadro, in Parametri della richiesta, inserisci il tuo ID progetto. Il progetto che specifichi qui deve essere quello che utilizzi per gestire Storage Transfer Service, che potrebbe essere diverso dal progetto del bucket di origine.

    3. Fai clic su Esegui.

    L'email dell'agente di servizio viene restituita come valore di accountEmail. Copia questo valore.

    L'email dell'agente di servizio utilizza il formato project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Concedi il ruolo Pub/Sub Subscriber all'agente di servizio Storage Transfer Service.

    console Cloud

    Segui le istruzioni riportate in Controllo dell'accesso tramite la console per concedere il ruolo Pub/Sub Subscriber al servizio di trasferimento di Storage. Google Cloud Il ruolo può essere concesso a livello di argomento, sottoscrizione o progetto.

    gcloud CLI

    Segui le istruzioni riportate in Impostare un criterio per aggiungere il seguente binding:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configurare Pub/Sub

  1. Assicurati di aver soddisfatto i prerequisiti per l'utilizzo di Pub/Sub con Cloud Storage.

  2. Crea una notifica Pub/Sub per il bucket Cloud Storage di origine.

    Non puoi gestire le notifiche Pub/Sub con la console Google Cloud . Utilizza gcloud CLI o una delle librerie client disponibili.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Crea una sottoscrizione pull per l'argomento. Devi creare un abbonamento separato per ogni job di trasferimento.

    Il seguente esempio mostra il comando Google Cloud CLI per creare una sottoscrizione pull. Per le istruzioni della console e il codice della libreria client, vedi Creare un abbonamento pull.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Creare un job di trasferimento

Puoi utilizzare l'API REST o la console Google Cloud per creare un job di trasferimento basato sugli eventi.

Non includere informazioni sensibili come quelle che consentono l'identificazione personale (PII) o dati di sicurezza nel nome del job di trasferimento. I nomi delle risorse potrebbero essere propagato ai nomi di altre risorse Google Cloud e potrebbero essere esposti a sistemi interni di Google al di fuori del tuo progetto.

console Cloud

  1. Vai alla pagina Crea job di trasferimento nella Google Cloud console.

    Vai a Crea job di trasferimento

  2. Seleziona Cloud Storage sia come origine che come destinazione.

  3. Come Modalità di pianificazione, seleziona Basata sugli eventi e fai clic su Passaggio successivo.

  4. Seleziona il bucket di origine per questo trasferimento.

  5. Nella sezione Stream di eventi, inserisci il nome dell'abbonamento:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. (Facoltativo) Definisci i filtri, poi fai clic su Passaggio successivo.

  7. Seleziona il bucket di destinazione per questo trasferimento.

  8. (Facoltativo) Inserisci un'ora di inizio e di fine per il trasferimento. Se non specifichi un orario, il trasferimento inizierà immediatamente e verrà eseguito fino a quando non verrà interrotto manualmente.

  9. Specifica le opzioni di trasferimento. Per maggiori informazioni, consulta la pagina Crea trasferimenti.

  10. Fai clic su Crea.

Una volta creato, il job di trasferimento inizia a essere eseguito e un listener di eventi attende le notifiche sulla sottoscrizione Pub/Sub. La pagina dei dettagli del job mostra un'operazione ogni ora e include i dettagli sui dati trasferiti per ogni job.

REST

Per creare un trasferimento basato su eventi utilizzando l'API REST, invia il seguente oggetto JSON all'endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime e eventStreamExpirationTime sono facoltativi. Se l'ora di inizio viene omessa, il trasferimento inizia immediatamente; se l'ora di fine viene omessa, il trasferimento continua fino all'interruzione manuale.

Librerie client

Go

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Go di Storage Transfer Service.

Per eseguire l'autenticazione in Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java di Storage Transfer Service.

Per eseguire l'autenticazione in Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js di Storage Transfer Service.

Per eseguire l'autenticazione in Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Per scoprire come installare e utilizzare la libreria client per Storage Transfer Service, consulta Librerie client di Storage Transfer Service. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python di Storage Transfer Service.

Per eseguire l'autenticazione in Storage Transfer Service, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Monitorare un trasferimento basato su eventi

Quando crei un trasferimento basato sugli eventi, Storage Transfer Service crea un job di trasferimento. Una volta raggiunto l'orario di inizio, viene avviata un'operazione di trasferimento e un listener di eventi attende le notifiche dalla coda Pub/Sub.

L'operazione di trasferimento viene eseguita, con lo stato in progress, per circa 24 ore. Dopo 24 ore, l'operazione termina e ne inizia una nuova. Viene creata una nuova operazione ogni 24 ore fino al raggiungimento dell'ora di fine del job di trasferimento o fino all'interruzione manuale del job.

Se è in corso un trasferimento di file al termine dell'operazione pianificata, l'operazione corrente rimane in corso finché il file non è stato completamente trasferito. Viene avviata una nuova operazione e le due operazioni vengono eseguite contemporaneamente fino al termine della vecchia operazione. Gli eventi rilevati durante questo periodo vengono gestiti dalla nuova operazione.

Per visualizzare l'operazione corrente e quelle completate:

Google Cloud console

  1. Vai alla pagina Storage Transfer Service nella Google Cloud console.

    Vai a Storage Transfer Service

  2. Nell'elenco dei job, seleziona la scheda Tutti o Da cloud a cloud.

  3. Fai clic sull'ID job per il trasferimento. La colonna Modalità di pianificazione identifica tutti i trasferimenti basati su eventi rispetto ai trasferimenti batch.

  4. Seleziona la scheda Operazioni. I dettagli vengono mostrati per l'operazione corrente e le operazioni completate sono elencate nella tabella Cronologia esecuzioni. Fai clic su qualsiasi operazione completata per ulteriori dettagli.

gcloud

Per monitorare l'avanzamento di un job in tempo reale, utilizza gcloud transfer jobs monitor. La risposta mostra l'operazione corrente, l'ora di inizio del job, la quantità di dati trasferiti, i byte ignorati e i conteggi degli errori.

gcloud transfer jobs monitor JOB_NAME

Per recuperare il nome dell'operazione corrente:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Per elencare le operazioni attuali e completate:

gcloud transfer operations list --job-names=JOB_NAME

Per visualizzare i dettagli di un'operazione:

gcloud transfer operations describe OPERATION_NAME