Transfer berbasis peristiwa dari Cloud Storage

Storage Transfer Service dapat memproses notifikasi peristiwa di Google Clouduntuk mentransfer data secara otomatis yang telah ditambahkan atau diperbarui di bucket Cloud Storage. Pelajari lebih lanjut manfaat transfer berbasis peristiwa.

Transfer berbasis peristiwa dari Cloud Storage menggunakan notifikasi Pub/Sub untuk mengetahui kapan objek di bucket sumber telah diubah atau ditambahkan. Penghapusan objek tidak terdeteksi; menghapus objek di sumber tidak akan menghapus objek terkait di bucket tujuan.

Transfer berbasis peristiwa selalu menggunakan bucket Cloud Storage sebagai tujuan.

Konfigurasikan izin

Selain izin yang diperlukan untuk semua tugas transfer, transfer yang diaktifkan oleh peristiwa memerlukan peran Pub/Sub Subscriber.

  1. Temukan nama agen layanan Storage Transfer Service untuk project Anda:

    1. Buka halaman referensi googleServiceAccounts.get.

      Panel interaktif akan terbuka, berjudul Coba metode ini.

    2. Di panel, di bagian Parameter permintaan, masukkan ID project Anda. Project yang Anda tentukan di sini harus berupa project yang Anda gunakan untuk mengelola Storage Transfer Service, yang mungkin berbeda dengan project bucket sumber.

    3. Klik Jalankan.

    Email agen layanan Anda ditampilkan sebagai nilai accountEmail. Salin nilai ini.

    Email agen layanan menggunakan format project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com.

  2. Berikan peran Pub/Sub Subscriber kepada agen layanan Storage Transfer Service.

    Cloud Console

    Ikuti petunjuk di Mengontrol akses melalui konsol Google Cloud untuk memberikan peran Pub/Sub Subscriber ke layanan Storage Transfer Service. Peran dapat diberikan di tingkat topik, langganan, atau project.

    gcloud CLI

    Ikuti petunjuk di Menetapkan kebijakan untuk menambahkan binding berikut:

    {
      "role": "roles/pubsub.subscriber",
      "members": [
        "serviceAccount:project-PROJECT_NUMBER@storage-transfer-service.iam.gserviceaccount.com"
    }

Mengonfigurasi Pub/Sub

  1. Pastikan Anda telah memenuhi Prasyarat untuk menggunakan Pub/Sub dengan Cloud Storage.

  2. Buat notifikasi Pub/Sub untuk bucket Cloud Storage sumber.

    Anda tidak dapat mengelola notifikasi Pub/Sub dengan konsol Google Cloud . Gunakan gcloud CLI atau salah satu library klien yang tersedia sebagai gantinya.

    gcloud storage buckets notifications create gs://SOURCE_BUCKET_NAME --topic=TOPIC_NAME
  3. Buat langganan pull untuk topik. Anda harus membuat langganan terpisah untuk setiap tugas transfer.

    Contoh berikut menunjukkan perintah Google Cloud CLI untuk membuat langganan pull. Untuk mengetahui petunjuk konsol dan kode library klien, lihat Membuat langganan pull.

    gcloud pubsub subscriptions create SUBSCRIPTION_ID --topic=TOPIC_NAME --ack-deadline=300

Membuat tugas transfer

Anda dapat menggunakan REST API atau konsol Google Cloud untuk membuat tugas transfer berbasis peristiwa.

Jangan sertakan informasi sensitif seperti informasi identitas pribadi (PII) atau data keamanan dalam nama tugas transfer Anda. Nama resource dapat disalurkan ke nama resource Google Cloud lain dan dapat diekspos ke sistem internal Google di luar project Anda.

Cloud Console

  1. Buka halaman Create transfer job di konsol Google Cloud .

    Buka Buat tugas transfer

  2. Pilih Cloud Storage sebagai sumber dan tujuan.

  3. Sebagai Scheduling mode, pilih Event-driven, lalu klik Next step.

  4. Pilih bucket sumber untuk transfer ini.

  5. Di bagian Aliran peristiwa, masukkan nama langganan:

    projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID

  6. Secara opsional, tentukan filter apa pun, lalu klik Langkah berikutnya.

  7. Pilih bucket tujuan untuk transfer ini.

  8. Secara opsional, masukkan waktu mulai dan berakhir untuk transfer. Jika Anda tidak menentukan waktu, transfer akan segera dimulai dan akan berjalan hingga dihentikan secara manual.

  9. Tentukan opsi transfer. Informasi selengkapnya tersedia di halaman Membuat transfer.

  10. Klik Buat.

Setelah dibuat, tugas transfer mulai berjalan dan pemroses peristiwa menunggu notifikasi pada langganan Pub/Sub. Halaman detail tugas menampilkan satu operasi setiap jam, dan menyertakan detail data yang ditransfer untuk setiap tugas.

REST

Untuk membuat transfer berbasis peristiwa menggunakan REST API, kirim objek JSON berikut ke endpoint transferJobs.create:

transfer_job {
  "description": "YOUR DESCRIPTION",
  "status": "ENABLED",
  "projectId": "PROJECT_ID",
  "transferSpec" {
    "gcsDataSource" {
      "bucketName": "GCS_SOURCE_NAME"
    },
    "gcsDataSink": {
        "bucketName": "GCS_SINK_NAME"
    }
  }
  "eventStream" {
    "name": "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID",
    "eventStreamStartTime": "2022-12-02T01:00:00+00:00",
    "eventStreamExpirationTime": "2023-01-31T01:00:00+00:00"
  }
}

eventStreamStartTime dan eventStreamExpirationTime bersifat opsional. Jika waktu mulai tidak ditentukan, transfer akan segera dimulai; jika waktu berakhir tidak ditentukan, transfer akan berlanjut hingga dihentikan secara manual.

Library klien

Go

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Storage Transfer Service.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


func createEventDrivenGCSTransfer(w io.Writer, projectID string, gcsSourceBucket string, gcsSinkBucket string, pubSubId string) (*storagetransferpb.TransferJob, error) {
	// Your Google Cloud Project ID.
	// projectID := "my-project-id"

	// The name of the source GCS bucket.
	// gcsSourceBucket := "my-source-bucket"

	// The name of the GCS bucket to transfer objects to.
	// gcsSinkBucket := "my-sink-bucket"

	// The Pub/Sub topic to subscribe the event driven transfer to.
	// pubSubID := "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID"

	ctx := context.Background()
	client, err := storagetransfer.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("storagetransfer.NewClient: %w", err)
	}
	defer client.Close()

	req := &storagetransferpb.CreateTransferJobRequest{
		TransferJob: &storagetransferpb.TransferJob{
			ProjectId: projectID,
			TransferSpec: &storagetransferpb.TransferSpec{
				DataSource: &storagetransferpb.TransferSpec_GcsDataSource{
					GcsDataSource: &storagetransferpb.GcsData{BucketName: gcsSourceBucket}},
				DataSink: &storagetransferpb.TransferSpec_GcsDataSink{
					GcsDataSink: &storagetransferpb.GcsData{BucketName: gcsSinkBucket}},
			},
			EventStream: &storagetransferpb.EventStream{Name: pubSubId},
			Status:      storagetransferpb.TransferJob_ENABLED,
		},
	}
	resp, err := client.CreateTransferJob(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("failed to create transfer job: %w", err)
	}

	fmt.Fprintf(w, "Created an event driven transfer job from %v to %v subscribed to %v with name %v", gcsSourceBucket, gcsSinkBucket, pubSubId, resp.Name)
	return resp, nil
}

Java

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Storage Transfer Service.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.storagetransfer.v1.proto.StorageTransferServiceClient;
import com.google.storagetransfer.v1.proto.TransferProto;
import com.google.storagetransfer.v1.proto.TransferTypes;

public class CreateEventDrivenGcsTransfer {
  public static void main(String[] args) throws Exception {
    // Your Google Cloud Project ID
    String projectId = "your-project-id";

    // The name of the GCS AWS bucket to transfer data from
    String gcsSourceBucket = "your-gcs-source-bucket";

    // The name of the GCS bucket to transfer data to
    String gcsSinkBucket = "your-gcs-sink-bucket";

    // The ARN of the PubSub queue to subscribe to
    String sqsQueueArn = "projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID";

    createEventDrivenGcsTransfer(projectId, gcsSourceBucket, gcsSinkBucket, sqsQueueArn);
  }

  public static void createEventDrivenGcsTransfer(
      String projectId, String gcsSourceBucket, String gcsSinkBucket, String pubSubId)
      throws Exception {
    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources,
    // or use "try-with-close" statement to do this automatically.
    try (StorageTransferServiceClient storageTransfer = StorageTransferServiceClient.create()) {

      TransferTypes.TransferJob transferJob =
          TransferTypes.TransferJob.newBuilder()
              .setProjectId(projectId)
              .setTransferSpec(
                  TransferTypes.TransferSpec.newBuilder()
                      .setGcsDataSource(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSourceBucket))
                      .setGcsDataSink(
                          TransferTypes.GcsData.newBuilder().setBucketName(gcsSinkBucket)))
              .setStatus(TransferTypes.TransferJob.Status.ENABLED)
              .setEventStream(TransferTypes.EventStream.newBuilder().setName(pubSubId).build())
              .build();

      TransferTypes.TransferJob response =
          storageTransfer.createTransferJob(
              TransferProto.CreateTransferJobRequest.newBuilder()
                  .setTransferJob(transferJob)
                  .build());

      System.out.println(
          "Created a transfer job between from "
              + gcsSourceBucket
              + " to "
              + gcsSinkBucket
              + " subscribed to "
              + pubSubId
              + " with name "
              + response.getName());
    }
  }
}

Node.js

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Storage Transfer Service.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


// Imports the Google Cloud client library
const {
  StorageTransferServiceClient,
} = require('@google-cloud/storage-transfer');

/**
 * TODO(developer): Uncomment the following lines before running the sample.
 */
// The ID of the Google Cloud Platform Project that owns the job
// projectId = 'my-project-id'

// Google Cloud Storage source bucket name
// gcsSourceBucket = 'my-gcs-source-bucket'

// Google Cloud Storage destination bucket name
// gcsSinkBucket = 'my-gcs-destination-bucket'

// The subscription ID to a Pubsub queue to track
// pubsubId = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

// Creates a client
const client = new StorageTransferServiceClient();

/**
 * Creates an event driven transfer that tracks a Pubsub subscription.
 */
async function createEventDrivenGcsTransfer() {
  const [transferJob] = await client.createTransferJob({
    transferJob: {
      projectId,
      status: 'ENABLED',
      transferSpec: {
        gcsDataSource: {
          bucketName: gcsSourceBucket,
        },
        gcsDataSink: {
          bucketName: gcsSinkBucket,
        },
      },
      eventStream: {
        name: pubsubId,
      },
    },
  });

  console.log(
    `Created an event driven transfer from '${gcsSourceBucket}' to '${gcsSinkBucket}' with name ${transferJob.name}`
  );
}

createEventDrivenGcsTransfer();

Python

Untuk mempelajari cara menginstal dan menggunakan library klien untuk Storage Transfer Service, lihat library klien Storage Transfer Service. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Storage Transfer Service.

Untuk melakukan autentikasi ke Storage Transfer Service, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


from google.cloud import storage_transfer


def create_event_driven_gcs_transfer(
    project_id: str,
    description: str,
    source_bucket: str,
    sink_bucket: str,
    pubsub_id: str,
):
    """Create an event driven transfer between two GCS buckets that tracks a PubSub subscription"""

    client = storage_transfer.StorageTransferServiceClient()

    # The ID of the Google Cloud Platform Project that owns the job
    # project_id = 'my-project-id'

    # A description of this job
    # description = 'Creates an event-driven transfer that tracks a pubsub subscription'

    # Google Cloud Storage source bucket name
    # source_bucket = 'my-gcs-source-bucket'

    # Google Cloud Storage destination bucket name
    # sink_bucket = 'my-gcs-destination-bucket'

    # The Pubsub Subscription ID to track
    # pubsub_id = 'projects/PROJECT_NAME/subscriptions/SUBSCRIPTION_ID'

    transfer_job_request = storage_transfer.CreateTransferJobRequest(
        {
            "transfer_job": {
                "project_id": project_id,
                "description": description,
                "status": storage_transfer.TransferJob.Status.ENABLED,
                "transfer_spec": {
                    "gcs_data_source": {
                        "bucket_name": source_bucket,
                    },
                    "gcs_data_sink": {
                        "bucket_name": sink_bucket,
                    },
                },
                "event_stream": {
                    "name": pubsub_id,
                },
            },
        }
    )

    result = client.create_transfer_job(transfer_job_request)
    print(f"Created transferJob: {result.name}")

Memantau transfer berdasarkan peristiwa

Saat Anda membuat transfer yang diaktifkan oleh peristiwa, Storage Transfer Service akan membuat tugas transfer. Setelah waktu mulai tercapai, operasi transfer mulai berjalan dan pendengar peristiwa menunggu notifikasi dari antrean Pub/Sub.

Operasi transfer berjalan, dengan status in progress, selama sekitar 24 jam. Setelah 24 jam, operasi selesai dan operasi baru dimulai. Operasi baru dibuat setiap 24 jam hingga waktu berakhir tugas transfer tercapai, atau hingga tugas dihentikan secara manual.

Jika transfer file sedang berlangsung saat operasi dijadwalkan selesai, operasi saat ini akan tetap berlangsung hingga file telah ditransfer sepenuhnya. Operasi baru dimulai, dan kedua operasi berjalan secara bersamaan hingga operasi lama selesai. Setiap peristiwa yang terdeteksi selama periode ini akan ditangani oleh operasi baru.

Untuk melihat operasi saat ini dan operasi yang telah selesai:

Google Cloud console

  1. Buka halaman Storage Transfer Service di konsol Google Cloud .

    Buka Storage Transfer Service

  2. Dalam daftar tugas, pilih tab Semua atau Cloud-to-cloud.

  3. Klik ID tugas untuk transfer Anda. Kolom Mode penjadwalan mengidentifikasi semua transfer berbasis peristiwa versus transfer batch.

  4. Pilih tab Operasi. Detail ditampilkan untuk operasi saat ini, dan operasi yang telah selesai dicantumkan dalam tabel Histori dijalankan. Klik operasi yang telah selesai untuk melihat detail tambahan.

gcloud

Untuk memantau progres tugas secara real time, gunakan gcloud transfer jobs monitor. Respons menampilkan operasi saat ini, waktu mulai tugas, jumlah data yang ditransfer, byte yang dilewati, dan jumlah error.

gcloud transfer jobs monitor JOB_NAME

Untuk mengambil nama operasi saat ini:

gcloud transfer jobs describe JOB_NAME --format="value(latestOperationName)"

Untuk mencantumkan operasi saat ini dan yang telah selesai:

gcloud transfer operations list --job-names=JOB_NAME

Untuk melihat detail tentang operasi:

gcloud transfer operations describe OPERATION_NAME