Transferts basés sur des événements à partir de Cloud Storage

Le service de transfert de stockage peut écouter les notifications d'événements dans Google Cloudpour transférer automatiquement les données qui ont été ajoutées ou mises à jour dans un bucket Cloud Storage. En savoir plus sur les avantages des transferts basés sur des événements

Les transferts basés sur des événements depuis Cloud Storage utilisent des notifications Pub/Sub pour savoir quand des objets du bucket source ont été modifiés ou ajoutés. Les suppressions d'objets ne sont pas détectées. La suppression d'un objet à la source n'entraîne pas la suppression de l'objet associé dans le bucket de destination.

Les transferts basés sur les événements utilisent toujours un bucket Cloud Storage comme destination.

Configurer les autorisations

En plus des autorisations requises pour tous les jobs de transfert, les transferts basés sur les événements nécessitent le rôle Pub/Sub Subscriber.

  1. Recherchez le nom de l'agent de service service de transfert de stockage pour votre projet :

    1. Accédez à la page de référence googleServiceAccounts.get.

      Un panneau interactif s'affiche, intitulé Essayer cette méthode.

    2. Dans le panneau, sous Paramètres des requêtes, saisissez l'ID de votre projet. Le projet que vous spécifiez ici doit être le projet que vous utilisez pour gérer le service de transfert de stockage, qui peut être différent du projet du bucket source.

    3. Cliquez sur Exécuter.

    L'adresse e-mail de votre agent de service est renvoyée en tant que valeur de accountEmail. Copiez cette valeur.

    L'adresse e-mail de l'agent de service utilise le format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Attribuez le rôle Pub/Sub Subscriber à l'agent de service du service de transfert de stockage.

    Cloud Console

    Suivez les instructions de la section Contrôler l'accès via la console Google Cloud pour accorder le rôle Pub/Sub Subscriber au service de transfert de stockage. Ce rôle peut être attribué au niveau du thème, de l'abonnement ou du projet.

    CLI gcloud

    Suivez les instructions de la section Définir une règle pour ajouter la liaison suivante :

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configurer Pub/Sub

  1. Assurez-vous de remplir les conditions préalables pour utiliser Pub/Sub avec Cloud Storage.

  2. Créez une notification Pub/Sub pour le bucket Cloud Storage source.

    Vous ne pouvez pas gérer les notifications Pub/Sub avec la console Google Cloud . Utilisez plutôt gcloud CLI ou l'une des bibliothèques clientes disponibles.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Créez un abonnement pull pour le sujet. Vous devez créer un abonnement distinct pour chaque tâche de transfert.

    L'exemple suivant montre la commande Google Cloud CLI permettant de créer un abonnement par extraction. Pour obtenir des instructions concernant la console et le code de la bibliothèque cliente, consultez Créer un abonnement pull.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Créer un job de transfert

Vous pouvez utiliser l'API REST ou la console Google Cloud pour créer un job de transfert basé sur des événements.

N'incluez pas d'informations sensibles telles que des informations permettant d'identifier personnellement l'utilisateur ou des données de sécurité dans le nom de votre job de transfert. Les noms de ressources peuvent être propagés aux noms d'autres ressources Google Cloud et peuvent être exposés aux systèmes internes de Google en dehors de votre projet.

Cloud Console

  1. Accédez à la page Créer un job de transfert dans la console Google Cloud .

    Accéder à Créer une tâche de transfert

  2. Sélectionnez Cloud Storage comme source et destination.

  3. Dans Scheduling mode (Mode de planification), sélectionnez Event-driven (Piloté par les événements), puis cliquez sur Next step (Étape suivante).

  4. Sélectionnez le bucket source pour ce transfert.

  5. Dans la section Flux d'événements, saisissez le nom de l'abonnement :

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Vous pouvez éventuellement définir des filtres, puis cliquer sur Étape suivante.

  7. Sélectionnez le bucket de destination pour ce transfert.

  8. Vous pouvez également saisir une heure de début et de fin pour le transfert. Si vous ne spécifiez pas d'heure, le transfert commencera immédiatement et s'exécutera jusqu'à ce qu'il soit arrêté manuellement.

  9. Spécifiez les options de transfert. Pour en savoir plus, consultez la page Créer des transferts.

  10. Cliquez sur Créer.

Une fois la tâche de transfert créée, elle commence à s'exécuter et un écouteur d'événements attend les notifications sur l'abonnement Pub/Sub. La page des détails du job affiche une opération par heure et inclut des informations sur les données transférées pour chaque job.

REST

Pour créer un transfert basé sur des événements à l'aide de l'API REST, envoyez l'objet JSON suivant au point de terminaison transferJobs.create :

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime et eventStreamExpirationTime sont facultatifs. Si l'heure de début est omise, le transfert commence immédiatement. Si l'heure de fin est omise, le transfert se poursuit jusqu'à ce qu'il soit arrêté manuellement.

Bibliothèques clientes

Go

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Go du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Java du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Node.js du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Pour savoir comment installer et utiliser la bibliothèque cliente pour le service de transfert de stockage, consultez la page Bibliothèques clientes du service de transfert de stockage. Pour en savoir plus, consultez la documentation de référence de l'API Python du service de transfert de stockage.

Pour vous authentifier auprès du service de transfert de stockage, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Surveiller un transfert basé sur des événements

Lorsque vous créez un transfert basé sur des événements, le service de transfert de stockage crée un job de transfert. Une fois l'heure de début atteinte, une opération de transfert commence à s'exécuter et un écouteur d'événements attend les notifications de la file d'attente Pub/Sub.

L'opération de transfert s'exécute pendant environ 24 heures, avec un état in progress. Au bout de 24 heures, l'opération se termine et une nouvelle commence. Une opération est créée toutes les 24 heures jusqu'à ce que l'heure de fin du job de transfert soit atteinte ou jusqu'à ce que le job soit arrêté manuellement.

Si un transfert de fichier est en cours lorsque l'opération est censée se terminer, l'opération actuelle reste en cours jusqu'à ce que le fichier ait été complètement transféré. Une nouvelle opération est lancée, et les deux opérations s'exécutent simultanément jusqu'à ce que l'ancienne se termine. Tous les événements détectés pendant cette période sont gérés par la nouvelle opération.

Pour afficher l'opération en cours et les opérations terminées :

Console Google Cloud

  1. Accédez à la page Service de transfert de stockage dans la console Google Cloud .

    Accéder au service de transfert de stockage

  2. Dans la liste des jobs, sélectionnez l'onglet Tous ou Cloud à cloud.

  3. Cliquez sur l'ID de job de votre transfert. La colonne Mode de planification identifie tous les transferts déclenchés par un événement par rapport aux transferts par lot.

  4. Sélectionnez l'onglet Opérations. Les détails de l'opération en cours s'affichent, et les opérations terminées sont listées dans le tableau Historique d'exécution. Cliquez sur une opération terminée pour en savoir plus.

gcloud

Pour surveiller la progression d'une tâche en temps réel, utilisez gcloud transfer jobs monitor. La réponse indique l'opération en cours, l'heure de début du job, la quantité de données transférées, les octets ignorés et le nombre d'erreurs.

gcloud transfer jobs monitor JOB_NAME

Pour récupérer le nom de l'opération en cours :

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Pour lister les opérations en cours et terminées :

gcloud transfer operations list --job-names=JOB_NAME

Pour afficher les détails d'une opération :

gcloud transfer operations describe OPERATION_NAME