Transferencias controladas por eventos desde Cloud Storage

El Servicio de transferencia de almacenamiento puede escuchar las notificaciones de eventos en Google Cloudpara transferir automáticamente los datos que se agregaron o actualizaron en un bucket de Cloud Storage. Obtén más información sobre los beneficios de las transferencias centradas en eventos.

Las transferencias controladas por eventos desde Cloud Storage usan notificaciones de Pub/Sub para saber cuándo se modificaron o agregaron objetos en el bucket de origen. No se detectan las eliminaciones de objetos. Si se borra un objeto en el origen, no se borra el objeto asociado en el bucket de destino.

Las transferencias controladas por eventos siempre usan un bucket de Cloud Storage como destino.

Configura permisos

Además de los permisos requeridos para todos los trabajos de transferencia, las transferencias basadas en eventos requieren el rol Pub/Sub Subscriber.

  1. Busca el nombre del agente de servicio de Servicio de transferencia de almacenamiento para tu proyecto:

    1. Ve a la página de referencia de googleServiceAccounts.get.

      Se abrirá un panel interactivo con el título Prueba este método.

    2. En el panel, en Parámetros de solicitud, ingresa el ID de tu proyecto. El proyecto que especifiques aquí debe ser el proyecto que usas para administrar el Servicio de transferencia de almacenamiento, que puede ser diferente del proyecto del bucket de origen.

    3. Haz clic en Ejecutar.

    El correo electrónico del agente de servicio se muestra como el valor de accountEmail. Copia este valor.

    El correo electrónico del agente de servicio usa el formato project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Otorga el rol Pub/Sub Subscriber al agente de servicio del Servicio de transferencia de almacenamiento.

    Consola de Cloud

    Sigue las instrucciones en Controla el acceso a través de la consola de Google Cloud para otorgar el rol de Pub/Sub Subscriber al servicio de Servicio de transferencia de almacenamiento. El rol se puede otorgar a nivel de tema, suscripción o proyecto.

    gcloud CLI

    Sigue las instrucciones en Cómo establecer una política para agregar la siguiente vinculación:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configura Pub/Sub

  1. Asegúrate de cumplir con los requisitos previos para usar Pub/Sub con Cloud Storage.

  2. Crea una notificación de Pub/Sub para el bucket de Cloud Storage de origen.

    No puedes administrar las notificaciones de Pub/Sub con la Google Cloud consola. En su lugar, usa gcloud CLI o una de las bibliotecas cliente disponibles.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Crea una suscripción de extracción para el tema. Debes crear una suscripción independiente para cada trabajo de transferencia.

    En el siguiente ejemplo, se muestra el comando de Google Cloud CLI para crear una suscripción de extracción. Para obtener instrucciones sobre la consola y el código de la biblioteca cliente, consulta Cómo crear una suscripción de extracción.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Crear un trabajo de transferencia

Puedes usar la API de REST o la consola de Google Cloud para crear un trabajo de transferencia basado en eventos.

No incluyas información sensible, como información de identificación personal (PII) ni datos de seguridad en el nombre de tu trabajo de transferencia. Los nombres de recursos se pueden propagar a los nombres de otros recursos de Google Cloud y se pueden exponer a los sistemas internos de Google fuera de tu proyecto.

Consola de Cloud

  1. Ve a la página Crear trabajo de transferencia en la Google Cloud consola.

    Ir a Crear trabajo de transferencia

  2. Selecciona Cloud Storage como la fuente y el destino.

  3. En Modo de programación, selecciona Controlado por eventos y haz clic en Siguiente paso.

  4. Selecciona el bucket de origen para esta transferencia.

  5. En la sección Flujo de eventos, ingresa el nombre de la suscripción:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. De manera opcional, define los filtros que desees y, luego, haz clic en Siguiente paso.

  7. Selecciona el bucket de destino para esta transferencia.

  8. De manera opcional, ingresa una hora de inicio y finalización para la transferencia. Si no especificas una hora, la transferencia comenzará de inmediato y se ejecutará hasta que se detenga manualmente.

  9. Especifica las opciones de transferencia. Puedes obtener más información en la página Crea transferencias.

  10. Haz clic en Create.

Una vez creado, el trabajo de transferencia comienza a ejecutarse y un objeto de escucha de eventos espera las notificaciones en la suscripción de Pub/Sub. La página de detalles del trabajo muestra una operación por hora y, además, incluye detalles sobre los datos transferidos para cada trabajo.

REST

Para crear una transferencia basada en eventos con la API de REST, envía el siguiente objeto JSON al extremo transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

Los parámetros eventStreamStartTime y eventStreamExpirationTime son opcionales. Si se omite la hora de inicio, la transferencia comienza de inmediato. Si se omite la hora de finalización, la transferencia continúa hasta que se detiene manualmente.

Bibliotecas cliente

Go

Para obtener información sobre cómo instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las bibliotecas cliente del Servicio de transferencia de almacenamiento. Si deseas obtener más información, consulta la documentación de referencia de la API de Storage Transfer Service Go.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Para obtener información sobre cómo instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las bibliotecas cliente del Servicio de transferencia de almacenamiento. Si deseas obtener más información, consulta la documentación de referencia de la API de Storage Transfer Service Java.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Para obtener información sobre cómo instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las bibliotecas cliente del Servicio de transferencia de almacenamiento. Si deseas obtener más información, consulta la documentación de referencia de la API de Storage Transfer Service Node.js.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Para obtener información sobre cómo instalar y usar la biblioteca cliente del Servicio de transferencia de almacenamiento, consulta las bibliotecas cliente del Servicio de transferencia de almacenamiento. Si deseas obtener más información, consulta la documentación de referencia de la API de Storage Transfer Service Python.

Para autenticarte en el Servicio de transferencia de almacenamiento, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Supervisa una transferencia centrada en eventos

Cuando creas una transferencia basada en eventos, el Servicio de transferencia de almacenamiento crea un trabajo de transferencia. Una vez que se alcanza la hora de inicio, comienza a ejecutarse una operación de transferencia y un objeto de escucha de eventos espera las notificaciones de la cola de Pub/Sub.

La operación de transferencia se ejecuta con el estado in progress durante aproximadamente 24 horas. Después de 24 horas, finaliza la operación y comienza una nueva. Se crea una operación nueva cada 24 horas hasta que se alcanza la hora de finalización del trabajo de transferencia o hasta que se detiene el trabajo de forma manual.

Si hay una transferencia de archivos en curso cuando se programa la finalización de la operación, la operación actual seguirá en curso hasta que se haya transferido el archivo por completo. Se inicia una operación nueva, y las dos operaciones se ejecutan de forma simultánea hasta que finaliza la operación anterior. La nueva operación controla los eventos que se detectan durante este período.

Para ver la operación actual y las operaciones completadas, haz lo siguiente:

Google Cloud console

  1. Ve a la página Servicio de transferencia de almacenamiento en la Google Cloud consola.

    Ir al Servicio de transferencia de almacenamiento

  2. En la lista de trabajos, selecciona la pestaña Todos o De nube a nube.

  3. Haz clic en el ID del trabajo de tu transferencia. La columna Modo de programación identifica todas las transferencias basadas en eventos en comparación con las transferencias por lotes.

  4. Selecciona la pestaña Operaciones. Se muestran los detalles de la operación actual y las operaciones completadas se enumeran en la tabla Historial de ejecuciones. Haz clic en cualquier operación completada para obtener detalles adicionales.

gcloud

Para supervisar el progreso de un trabajo en tiempo real, usa gcloud transfer jobs monitor. La respuesta muestra la operación actual, la hora de inicio del trabajo, la cantidad de datos transferidos, los bytes omitidos y los recuentos de errores.

gcloud transfer jobs monitor JOB_NAME

Para recuperar el nombre de la operación actual, haz lo siguiente:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Para enumerar las operaciones actuales y completadas, haz lo siguiente:

gcloud transfer operations list --job-names=JOB_NAME

Para ver los detalles de una operación, sigue estos pasos:

gcloud transfer operations describe OPERATION_NAME