Event-driven transfers from Cloud Storage

Storage Transfer Service can listen to event notifications in Google Cloud to automatically transfer data that has been added or updated in a Cloud Storage bucket. Learn more about the benefits of event-driven transfers.

Event-driven transfers from Cloud Storage use Pub/Sub notifications to know when objects in the source bucket have been modified or added. Object deletions are not detected; deleting an object at the source does not delete the associated object in the destination bucket.

Configure permissions

  1. Find the name of the Storage Transfer Service service agent for your project:

    1. Go to the googleServiceAccounts.get reference page.

      An interactive panel opens, titled Try this method.

    2. In the panel, under Request parameters, enter your project ID. The project you specify here must be the project you're using to manage Storage Transfer Service, which might be different from the source bucket's project.

    3. Click Execute.

    Your service agent's email is returned as the value of accountEmail. Copy this value.

    The service agent's email uses the format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Grant the Pub/Sub Subscriber role to the Storage Transfer Service service agent.

    Cloud console

    Follow the instructions in Controlling access through the Google Cloud console to grant the Pub/Sub Subscriber role to the Storage Transfer Service service. The role can be granted at the topic, subscription, or project level.

    gcloud CLI

    Follow the instructions in Setting a policy to add the following binding:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Configure Pub/Sub

  1. Make sure that you've satisfied the Prerequisites for using Pub/Sub with Cloud Storage.

  2. Configure Pub/Sub notification for Cloud Storage:

    gcloud storage buckets notifications create gs://BUCKET_NAME --topic=TOPIC_NAME
  3. Create a pull subscription for the topic:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Create a transfer job

You can use the REST API or the Google Cloud console to create an event-based transfer job.

Don't include sensitive information such as personally identifiable information (PII) or security data in your transfer job name. Resource names may be propagated to the names of other Google Cloud resources and may be exposed to Google-internal systems outside of your project.

Cloud console

  1. Go to the Create transfer job page in the Google Cloud console.

    Go to Create transfer job

  2. Select Cloud Storage as both the source and the destination.

  3. As the Scheduling mode select Event-driven and click Next step.

  4. Select the source bucket for this transfer.

  5. In the Event stream section, enter the subscription name:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Optionally, define any filters, then click Next step.

  7. Select the destination bucket for this transfer.

  8. Optionally, enter a start and end time for the transfer. If you don't specify a time, the transfer will start immediately and will run until manually stopped.

  9. Specify any transfer options. More information is available from the Create transfers page.

  10. Click Create.

Once created, the transfer job starts running and an event listener waits for notifications on the Pub/Sub subscription. The job details page shows one operation each hour, and includes details on data transferred for each job.

REST

To create an event-driven transfer using the REST API, send the following JSON object to the transferJobs.create endpoint:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

The eventStreamStartTime and eventStreamExpirationTime are optional. If the start time is omitted, the transfer starts immediately; if the end time is omitted, the transfer continues until manually stopped.

Client libraries

Go

To learn how to install and use the client library for Storage Transfer Service, see Storage Transfer Service client libraries. For more information, see the Storage Transfer Service Go API reference documentation.

To authenticate to Storage Transfer Service, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

To learn how to install and use the client library for Storage Transfer Service, see Storage Transfer Service client libraries. For more information, see the Storage Transfer Service Java API reference documentation.

To authenticate to Storage Transfer Service, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

To learn how to install and use the client library for Storage Transfer Service, see Storage Transfer Service client libraries. For more information, see the Storage Transfer Service Node.js API reference documentation.

To authenticate to Storage Transfer Service, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

To learn how to install and use the client library for Storage Transfer Service, see Storage Transfer Service client libraries. For more information, see the Storage Transfer Service Python API reference documentation.

To authenticate to Storage Transfer Service, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")