Mengelola streaming

Di halaman ini, Anda akan mempelajari cara menggunakan Datastream API untuk:

  • Membuat aliran data
  • Mendapatkan informasi tentang aliran dan objek aliran
  • Perbarui aliran dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek aliran
  • Memulihkan streaming yang gagal secara permanen
  • Mengaktifkan streaming objek besar untuk streaming Oracle
  • Menghapus aliran data

Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).

Untuk mengetahui informasi umum tentang penggunaan Google Cloud CLI untuk mengelola aliran Datastream, lihat aliran Datastream gcloud CLI.

Membuat stream

Di bagian ini, Anda akan mempelajari cara membuat aliran yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut tidak komprehensif, tetapi menyoroti fitur tertentu Datastream. Untuk menangani kasus penggunaan spesifik Anda, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.

Bagian ini membahas kasus penggunaan berikut:

Contoh 1: Streaming objek tertentu ke BigQuery

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari MySQL ke BigQuery
  • Menyertakan sekumpulan objek dalam aliran
  • Menentukan mode tulis untuk aliran data sebagai append-only
  • Mengisi ulang semua objek yang disertakan dalam streaming

Berikut adalah permintaan untuk menarik semua tabel dari schema1 dan dua tabel tertentu dari schema2: tableA dan tableC. Peristiwa ditulis ke set data di BigQuery.

Permintaan tidak menyertakan parameter customerManagedEncryptionKey, sehingga sistem pengelolaan kunci internal Google Cloud digunakan untuk mengenkripsi data Anda alih-alih CMEK.

Parameter backfillAll yang terkait dengan melakukan pengisian ulang (atau snapshot) historis ditetapkan ke kamus kosong ({}), yang berarti Datastream mengisi ulang data historis dari semua tabel yang disertakan dalam aliran.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk membuat aliran, lihat dokumentasi Google Cloud SDK.

Contoh 2: Mengecualikan objek tertentu dari aliran dengan sumber PostgreSQL

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari PostgreSQL ke BigQuery
  • Mengecualikan objek dari aliran
  • Mengecualikan objek dari pengisian ulang

Kode berikut menunjukkan permintaan untuk membuat aliran yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran dari database PostgreSQL sumber, Anda perlu menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan Anda:

  • replicationSlot: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran.
  • publication: publikasi adalah grup tabel yang ingin Anda replikasi perubahannya. Nama publikasi harus ada di database sebelum memulai streaming. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftar includeObjects aliran.

Parameter backfillAll yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk membuat aliran, lihat dokumentasi Google Cloud SDK.

Contoh 3: Menentukan mode penulisan hanya tambah untuk aliran

Saat melakukan streaming ke BigQuery, Anda dapat menentukan mode penulisan: merge atau appendOnly. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi mode penulisan.

Jika Anda tidak menentukan mode penulisan dalam permintaan untuk membuat streaming, mode merge default akan digunakan.

Permintaan berikut menunjukkan cara menentukan mode appendOnly saat Anda membuat streaming MySQL ke BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk membuat aliran, lihat dokumentasi Google Cloud SDK.

Contoh 4: Streaming ke tujuan Cloud Storage

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari Oracle ke Cloud Storage
  • Menentukan sekumpulan objek yang akan disertakan dalam aliran data
  • Menentukan CMEK untuk mengenkripsi data dalam penyimpanan

Permintaan berikut menunjukkan cara membuat stream yang menulis peristiwa ke bucket di Cloud Storage.

Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (menggantikan nilai default 50 MB dan 60 detik).

Untuk format JSON, Anda dapat:

  • Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi .schema.

  • Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.

Dengan menggunakan parameter backfillNone, permintaan menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.

Permintaan menentukan parameter kunci enkripsi yang dikelola pelanggan yang memungkinkan Anda mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam Google Cloud project. Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Tindakan ini juga menentukan key ring untuk CMEK Anda.

Untuk mengetahui informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk mengetahui informasi selengkapnya tentang cara melindungi data Anda menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk membuat aliran, lihat dokumentasi Google Cloud SDK.

Contoh 5: Streaming ke tabel yang dikelola BigLake

Dalam contoh ini, Anda akan mempelajari cara mengonfigurasi aliran untuk mereplikasi data dari database MySQL ke tabel Iceberg BigLake dalam mode append-only. Sebelum membuat permintaan, pastikan Anda telah menyelesaikan langkah-langkah berikut:

  • Memiliki bucket Cloud Storage tempat Anda ingin menyimpan data
  • Membuat koneksi resource Cloud
  • Memberi koneksi resource Cloud Anda akses ke bucket Cloud Storage

Kemudian, Anda dapat menggunakan permintaan berikut untuk membuat feed:

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlBigLakeStream
{
  "displayName": "MySQL to BigLake stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlBigLakeCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          {
            "database": "my-mysql-database"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id",
    "bigqueryDestinationConfig": {
      "blmtConfig": {
        "bucket": "my-gcs-bucket-name",
        "rootPath": "my/folder",
        "connectionName": "my-project-id.us-central1.my-bigquery-connection-name",
        "fileFormat": "PARQUET",
        "tableFormat": "ICEBERG"
        },
      "singleTargetDataset": {
        "datasetId": "my-project-id:my-bigquery-dataset-id"
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

datastream streams create mysqlBigLakeStream --location=us-central1
--display-name=mysql-to-bl-stream --source=source --mysql-source-config=mysql_source_config.json
--destination=destination --bigquery-destination-config=bl_config.json
--backfill-none

Isi file konfigurasi sumber mysql_source_config.json:

{"excludeObjects": {}, "includeObjects": {"mysqlDatabases":[{"database":"my-mysql-database"}]}}

Isi file konfigurasi bl_config.json:

{ "blmtConfig": { "bucket": "my-gcs-bucket-name", "rootPath": "my/folder", "connectionName": "my-project-id.us-central1.my-bigquery-connection-name", "fileFormat": "PARQUET", "tableFormat": "ICEBERG" }, "singleTargetDataset": {"datasetId": "my-project-id:my-bigquery-dataset-id"}, "appendOnly": {} }

Terraform

resource "google_datastream_stream" "stream" {
  stream_id    = "mysqlBlStream"
  location     = "us-central1"
  display_name = "MySQL to BigLake stream"

  source_config {
    source_connection_profile = "/projects/myProjectId1/locations/us-central1/streams/mysqlBlCp"
    mysql_source_config {
      include_objects {
        mysql_databases {
          database = "my-mysql-database"
        }
      }
    }
  }

  destination_config {
    destination_connection_profile = "projects/myProjectId1/locations/us-central1/connectionProfiles/my-bq-cp-id"
    bigquery_destination_config {
      single_target_dataset {
        dataset_id = "my-project-id:my-bigquery-dataset-id"
      }
      blmt_config {
        bucket          = "my-gcs-bucket-name"
        table_format    = "ICEBERG"
        file_format     = "PARQUET"
        connection_name = "my-project-id.us-central1.my-bigquery-connection-name"
        root_path       = "my/folder"
      }
      append_only {}
    }
  }

  backfill_none {}
}
    

Memvalidasi definisi aliran data

Sebelum membuat streaming, Anda dapat memvalidasi definisinya. Dengan begitu, Anda dapat memastikan bahwa semua pemeriksaan validasi berhasil, dan bahwa aliran akan berjalan dengan sukses saat dibuat.

Validasi aliran memeriksa:

  • Apakah sumber dikonfigurasi dengan benar untuk memungkinkan Datastream melakukan streaming data dari sumber tersebut.
  • Apakah aliran dapat terhubung ke sumber dan tujuan.
  • Konfigurasi aliran data secara menyeluruh.

Untuk memvalidasi streaming, tambahkan &validate_only=true ke URL sebelum isi permintaan Anda:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi yang dijalankan Datastream untuk sumber dan tujuan Anda, beserta hasil pemeriksaan lulus atau gagal. Untuk setiap pemeriksaan validasi yang tidak lulus, informasi akan muncul mengenai alasan kegagalan dan tindakan yang harus dilakukan untuk memperbaiki masalah tersebut.

Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang ingin Anda gunakan oleh Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari validasi aliran, Datastream akan memverifikasi bahwa kunci ada, dan Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi aliran, pesan error berikut akan ditampilkan:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get untuk kunci tersebut.

Setelah melakukan koreksi yang sesuai, ajukan permintaan lagi untuk memastikan semua pemeriksaan validasi berhasil. Untuk contoh sebelumnya, pemeriksaan CMEK_VALIDATE_PERMISSIONS tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED.

Mendapatkan informasi tentang streaming

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang aliran. Informasi ini mencakup:

  • Nama aliran data (ID unik)
  • Nama yang mudah digunakan untuk streaming (nama tampilan)
  • Stempel waktu saat aliran dibuat dan terakhir diperbarui
  • Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
  • Status streaming

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Respons akan muncul seperti berikut:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang streaming Anda, lihat dokumentasi Google Cloud SDK.

Mencantumkan stream

Kode berikut menunjukkan permintaan untuk mengambil daftar semua aliran dalam project dan lokasi yang ditentukan.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang semua aliran Anda, lihat dokumentasi Google Cloud SDK.

Mencantumkan objek streaming

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek aliran.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang semua objek streaming, lihat dokumentasi Google Cloud SDK.

Daftar objek yang ditampilkan mungkin terlihat seperti berikut:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk mencantumkan objek streaming, lihat dokumentasi Google Cloud SDK.

Mulai streaming

Kode berikut menunjukkan permintaan untuk memulai streaming.

Dengan menggunakan parameter updateMask dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai streaming, ubah nilai di kolom state dari CREATED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk memulai streaming, lihat dokumentasi Google Cloud SDK.

Menjeda streaming

Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan menjeda aliran, Anda mengubah statusnya dari RUNNING menjadi PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk menjeda streaming, lihat dokumentasi Google Cloud SDK.

Melanjutkan streaming

Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk melanjutkan streaming, lihat dokumentasi Google Cloud SDK.

Memulihkan streaming

Anda dapat memulihkan streaming yang gagal secara permanen menggunakan metode RunStream. Setiap jenis database sumber memiliki definisi sendiri tentang kemungkinan operasi pemulihan streaming. Untuk mengetahui informasi selengkapnya, lihat Memulihkan streaming.

Memulihkan streaming untuk sumber MySQL atau Oracle

Contoh kode berikut menunjukkan permintaan untuk memulihkan stream untuk sumber MySQL atau Oracle dari berbagai posisi file log:

REST

Memulihkan aliran dari posisi saat ini. Ini adalah opsi default:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Memulihkan streaming dari posisi yang tersedia berikutnya:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Memulihkan streaming dari posisi terbaru:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Memulihkan aliran dari posisi tertentu (replikasi berbasis binlog MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan streaming
  • POSITION: Posisi dalam file log yang ingin Anda gunakan untuk memulihkan stream. Jika Anda tidak memberikan nilai, Datastream akan memulihkan streaming dari awal file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Memulihkan aliran dari posisi tertentu (replikasi berbasis GTID MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Ganti GTID_SET dengan satu atau beberapa GTID tunggal atau rentang GTID yang ingin Anda pulihkan alirannya.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:1-3"
      }
    }
  }
}

Memulihkan streaming dari posisi tertentu (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan nomor perubahan sistem (SCN) dalam file log redo yang ingin Anda pulihkan alirannya. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Untuk mengetahui informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan aliran.

gcloud

Memulihkan aliran menggunakan gcloud tidak didukung.

Memulihkan aliran untuk sumber PostgreSQL

Contoh kode berikut menunjukkan permintaan untuk memulihkan aliran untuk sumber PostgreSQL. Selama pemulihan, aliran data mulai membaca dari nomor urut log (LSN) pertama di slot replikasi yang dikonfigurasi untuk aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Jika Anda ingin mengubah slot replikasi, perbarui aliran dengan nama slot replikasi baru terlebih dahulu:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Memulihkan aliran menggunakan gcloud tidak didukung.

Memulihkan stream untuk sumber SQL Server

Contoh kode berikut menunjukkan contoh permintaan untuk memulihkan aliran untuk sumber SQL Server.

REST

Memulihkan streaming dari posisi pertama yang tersedia:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Memulihkan aliran dari nomor urut log yang disukai:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Memulihkan aliran menggunakan gcloud tidak didukung.

Memulai atau melanjutkan streaming dari posisi tertentu

Anda dapat memulai streaming atau melanjutkan streaming yang dijeda dari posisi tertentu untuk sumber MySQL dan Oracle. Hal ini mungkin berguna saat Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari posisi yang Anda tunjukkan. Untuk sumber MySQL, Anda perlu menunjukkan posisi binlog atau set GTID, dan untuk sumber Oracle, Anda perlu menunjukkan system change number (SCN) dalam file redo log.

Kode berikut menunjukkan permintaan untuk memulai atau melanjutkan streaming yang sudah dibuat dari posisi tertentu.

Memulai atau melanjutkan streaming dari posisi binlog tertentu (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai streaming.
  • POSITION: Posisi dalam file log dari mana Anda ingin memulai streaming. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari awal file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.

Memulai atau melanjutkan streaming dari set GTID tertentu (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "GTID_SET"
      }
    }
  }
}

Ganti GTID_SET dengan satu atau beberapa GTID tunggal atau rentang GTID dari mana Anda ingin memulai atau melanjutkan streaming.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlGtidPosition": {
        "gtidSet": "22cc56f5-3862-379a-9af5-051c59baef9d:1-561143685:561143688-591036613,b8d7df02-832b-32b9-bec7-2018806b76f6:3-7"
      }
    }
  }
}

gcloud

Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.

Mulai atau lanjutkan streaming dari nomor perubahan sistem tertentu dalam file log redo (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan system change number (SCN) dalam file redo log tempat Anda ingin memulai streaming. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud untuk memulai streaming, lihat dokumentasi Cloud SDK.

Mengubah streaming

Kode berikut menunjukkan permintaan untuk mengupdate konfigurasi rotasi file stream agar memutar file setiap 75 MB atau 45 detik.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask mencakup kolom fileRotationMb dan fileRotationInterval, yang masing-masing diwakili oleh tanda destinationConfig.gcsDestinationConfig.fileRotationMb dan destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Kode berikut menunjukkan permintaan untuk menyertakan file skema Unified Types di jalur file yang ditulis Datastream ke Cloud Storage. Akibatnya, Datastream menulis dua file: file data JSON dan file skema Avro.

Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat, yang diwakili oleh tanda destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Kode berikut menunjukkan permintaan agar Datastream mereplikasi data yang ada, selain perubahan berkelanjutan pada data, dari database sumber ke tujuan.

Bagian oracleExcludedObjects dari kode menampilkan tabel dan skema yang tidak dapat diisi ulang ke tujuan.

Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali tableA di schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk mengubah aliran, lihat dokumentasi Google Cloud SDK.

Mulai pengisian ulang untuk objek streaming

Stream di Datastream dapat mengisi ulang data historis, serta mengalirkan perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah Anda ingin data historis di-streaming.

Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll.

Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll, dan kecualikan tabel yang tidak Anda inginkan untuk memiliki data historis.

Jika Anda hanya ingin perubahan yang sedang berlangsung di-streaming ke tujuan, gunakan parameter backfillNone. Jika Anda ingin Datastream melakukan streaming snapshot semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.

Alasan lain untuk memulai pengisian ulang objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu kali. Akibatnya, data disinkronkan antara sumber dan tujuan.

Sebelum dapat memulai pengisian ulang untuk objek streaming, Anda harus mengambil informasi tentang objek tersebut.

Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang untuk streaming.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk memulai pengisian ulang objek aliran Anda, lihat dokumentasi Google Cloud SDK.

Menghentikan pengisian ulang untuk objek streaming

Setelah memulai pengisian ulang untuk objek streaming, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data dapat rusak. Anda tidak ingin skema atau data ini dialirkan ke tujuan, sehingga Anda menghentikan pengisian ulang untuk objek tersebut.

Anda juga dapat menghentikan pengisian ulang untuk objek guna tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Tindakan ini dapat menambah beban pada sumber. Jika beban signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai pengisian ulang untuk objek, satu per satu.

Sebelum dapat menghentikan pengisian ulang untuk objek streaming, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek streaming. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang untuk streaming.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk menghentikan pengisian ulang objek stream Anda, lihat dokumentasi Google Cloud SDK.

Mengubah jumlah tugas CDC serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah tugas pengambilan data perubahan (CDC) serentak maksimum untuk aliran MySQL menjadi 7.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentCdcTasks. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk aliran.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud, lihat dokumentasi Google Cloud SDK.

Mengubah jumlah tugas pengisian ulang serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas pengisian ulang serentak untuk aliran MySQL menjadi 25.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentBackfillTasks. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 16 tugas akan ditetapkan untuk aliran.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud, lihat dokumentasi Google Cloud SDK.

Mengaktifkan streaming objek besar untuk sumber Oracle

Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB), objek besar karakter (CLOB), dan objek besar karakter nasional (NCLOB) untuk streaming dengan sumber Oracle. Flag streamLargeObjects memungkinkan Anda menyertakan objek besar dalam aliran baru dan yang sudah ada. Flag disetel di tingkat aliran, Anda tidak perlu menentukan kolom jenis data objek besar.

Contoh berikut menunjukkan cara membuat aliran yang memungkinkan Anda melakukan streaming objek besar.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk memperbarui aliran, lihat dokumentasi Google Cloud SDK.

Menghapus feed

Kode berikut menunjukkan permintaan untuk menghapus aliran.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk menghapus aliran, lihat dokumentasi Google Cloud SDK.

Langkah berikutnya