Dokumen ini menjelaskan cara memperbarui tugas streaming yang sedang berlangsung. Anda mungkin ingin memperbarui tugas Dataflow yang ada karena alasan berikut:
- Anda ingin meningkatkan atau menyempurnakan kode pipeline.
- Anda ingin memperbaiki bug dalam kode pipeline.
- Anda ingin memperbarui pipeline untuk menangani perubahan format data, atau untuk memperhitungkan versi atau perubahan lain di sumber data.
- Anda ingin menambal kerentanan keamanan yang terkait dengan Container-Optimized OS untuk semua pekerja Dataflow.
- Anda ingin menskalakan pipeline Apache Beam streaming untuk menggunakan jumlah pekerja yang berbeda.
Anda dapat memperbarui tugas dengan dua cara:
- Pembaruan tugas dalam proses: Untuk tugas streaming yang menggunakan
Streaming Engine, Anda dapat memperbarui opsi tugas
min-num-workers
danmax-num-workers
tanpa menghentikan tugas atau mengubah ID tugas. - Tugas pengganti: Untuk menjalankan kode pipeline yang telah diupdate atau untuk memperbarui opsi tugas yang tidak didukung oleh pembaruan tugas dalam proses, luncurkan tugas baru yang menggantikan tugas yang ada. Untuk memverifikasi apakah tugas penggantian valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya.
Saat Anda memperbarui tugas, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas yang sedang berjalan dan tugas pengganti potensial. Pemeriksaan kompatibilitas memastikan bahwa hal-hal seperti informasi status perantara dan data yang di-buffer dapat ditransfer dari tugas sebelumnya ke tugas pengganti.
Anda juga dapat menggunakan infrastruktur logging bawaan Apache Beam SDK untuk mencatat informasi saat memperbarui tugas. Untuk mengetahui informasi selengkapnya, lihat
Bekerja dengan log pipeline.
Untuk mengidentifikasi masalah pada kode pipeline, gunakan
tingkat logging DEBUG
.
- Untuk mengetahui petunjuk cara memperbarui tugas streaming yang menggunakan template klasik, lihat Memperbarui tugas streaming template kustom.
- Untuk petunjuk tentang cara memperbarui tugas streaming yang menggunakan Template Flex, ikuti petunjuk gcloud CLI di halaman ini, atau lihat Memperbarui tugas Template Flex.
Pembaruan opsi tugas dalam proses
Untuk tugas streaming yang menggunakan Streaming Engine, Anda dapat memperbarui opsi tugas berikut tanpa menghentikan tugas atau mengubah ID tugas:
min-num-workers
: jumlah minimum instance Compute Engine.max-num-workers
: jumlah maksimum instance Compute Engine.worker-utilization-hint
: the target CPU utilization, in the range [0.1, 0.9]
Untuk pembaruan tugas lainnya, Anda harus mengganti tugas saat ini dengan tugas yang diperbarui. Untuk mengetahui informasi selengkapnya, lihat Meluncurkan tugas penggantian.
Melakukan update saat dalam proses
Untuk melakukan update opsi tugas saat berjalan, lakukan langkah-langkah berikut.
gcloud
Gunakan perintah gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Ganti kode berikut:
- REGION: ID region tugas
- MINIMUM_WORKERS: jumlah minimum instance Compute Engine
- MAXIMUM_WORKERS: jumlah maksimum instance Compute Engine
- TARGET_UTILIZATION: nilai dalam rentang [0,1, 0,9]
- JOB_ID: ID tugas yang akan diupdate
Anda juga dapat memperbarui --min-num-workers
, --max-num-workers
, dan
worker-utilization-hint
secara terpisah.
REST
Gunakan
projects.locations.jobs.update
metode:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS, "worker_utilization_hint": TARGET_UTILIZATION } }
Ganti kode berikut:
- MASK: daftar parameter yang dipisahkan koma yang akan diperbarui, dari
berikut:
runtime_updatable_params.max_num_workers
runtime_updatable_params.min_num_workers
runtime_updatable_params.worker_utilization_hint
- PROJECT_ID: Google Cloud project ID tugas Dataflow
- REGION: ID region tugas
- JOB_ID: ID tugas yang akan diupdate
- MINIMUM_WORKERS: jumlah minimum instance Compute Engine
- MAXIMUM_WORKERS: jumlah maksimum instance Compute Engine
- TARGET_UTILIZATION: nilai dalam rentang [0,1, 0,9]
Anda juga dapat memperbarui min_num_workers
, max_num_workers
, dan worker_utilization_hint
satu per satu.
Tentukan parameter yang akan diperbarui dalam parameter kueri updateMask
, dan sertakan nilai yang diperbarui dalam kolom runtimeUpdatableParams
di isi permintaan. Contoh berikut memperbarui min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Tugas harus dalam status berjalan agar memenuhi syarat untuk update saat proses berlangsung. Error terjadi jika tugas belum dimulai atau sudah dibatalkan. Demikian pula, jika Anda meluncurkan tugas penggantian, tunggu hingga tugas tersebut mulai berjalan sebelum mengirimkan pembaruan dalam proses ke tugas baru.
Setelah Anda mengirimkan permintaan update, sebaiknya tunggu hingga permintaan selesai sebelum mengirimkan update lain. Lihat log tugas untuk melihat kapan permintaan selesai.
Memvalidasi tugas penggantian
Untuk memverifikasi apakah tugas penggantian valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya. Di Dataflow, grafik tugas adalah representasi grafis pipeline. Dengan memvalidasi grafik tugas, Anda mengurangi risiko pipeline mengalami error atau kegagalan pipeline setelah update. Selain itu, Anda dapat memvalidasi update tanpa perlu menghentikan tugas asli, sehingga tugas tersebut tidak mengalami periode nonaktif.
Untuk memvalidasi grafik tugas Anda, ikuti langkah-langkah untuk
meluncurkan tugas pengganti. Sertakan graph_validate_only
Opsi layanan Dataflow dalam perintah update.
Java
- Teruskan opsi
--update
. - Tetapkan opsi
--jobName
diPipelineOptions
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflowServiceOptions=graph_validate_only
. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transformNameMapping
. - Jika Anda mengirimkan tugas penggantian yang menggunakan Apache Beam SDK versi yang lebih baru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asli.
Python
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
diPipelineOptions
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflow_service_options=graph_validate_only
. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform_name_mapping
. - Jika Anda mengirimkan tugas penggantian yang menggunakan Apache Beam SDK versi yang lebih baru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asli.
Go
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflow_service_options=graph_validate_only
. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform_name_mapping
.
gcloud
Untuk memvalidasi grafik tugas untuk tugas Template Flex, gunakan perintah
gcloud dataflow flex-template run
dengan opsi additional-experiments
:
- Teruskan opsi
--update
. - Tetapkan JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
- Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi
--additional-experiments=graph_validate_only
. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform-name-mappings
.
Contoh:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
Ganti JOB_NAME dengan nama tugas yang ingin Anda perbarui.
REST
Gunakan kolom additionalExperiments
di objek
FlexTemplateRuntimeEnvironment
(Template flex) atau
RuntimeEnvironment
.
{
additionalExperiments : ["graph_validate_only"]
...
}
Opsi layanan graph_validate_only
hanya memvalidasi pembaruan pipeline. Jangan gunakan opsi ini saat membuat atau meluncurkan pipeline. Untuk memperbarui pipeline, luncurkan tugas penggantian tanpa opsi layanan graph_validate_only
.
Jika validasi grafik tugas berhasil, status tugas dan log tugas akan menampilkan status berikut:
- Status tugas adalah
JOB_STATE_DONE
. - Di konsol Google Cloud , Status tugas
adalah
Succeeded
. Pesan berikut muncul di log tugas:
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
Jika validasi grafik tugas gagal, status tugas dan log tugas akan menampilkan status berikut:
- Status tugas adalah
JOB_STATE_FAILED
. - Di konsol Google Cloud , Status tugas
adalah
Failed
. - Pesan muncul di log tugas yang menjelaskan error ketidakcocokan. Konten pesan bergantung pada error.
Meluncurkan tugas penggantian
Anda dapat mengganti pekerjaan yang ada karena alasan berikut:
- Untuk menjalankan kode pipeline yang telah diupdate.
- Untuk memperbarui opsi tugas yang tidak mendukung pembaruan saat dalam proses.
Untuk memverifikasi apakah tugas penggantian valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya.
Saat meluncurkan tugas penggantian, tetapkan opsi pipeline berikut untuk melakukan proses update selain opsi reguler tugas:
Java
- Teruskan opsi
--update
. - Tetapkan opsi
--jobName
diPipelineOptions
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transformNameMapping
. - Jika Anda mengirimkan tugas penggantian yang menggunakan Apache Beam SDK versi yang lebih baru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asli.
Python
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
diPipelineOptions
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform_name_mapping
. - Jika Anda mengirimkan tugas penggantian yang menggunakan Apache Beam SDK versi yang lebih baru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asli.
Go
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform_name_mapping
.
gcloud
Untuk mengupdate tugas Flex Template menggunakan gcloud CLI, gunakan perintah
gcloud dataflow flex-template run
. Memperbarui tugas lain menggunakan gcloud CLI tidak didukung.
- Teruskan opsi
--update
. - Tetapkan JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
- Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan
pemetaan transformasi dan meneruskannya menggunakan opsi
--transform-name-mappings
.
REST
Petunjuk ini menunjukkan cara memperbarui tugas non-template menggunakan REST API. Untuk menggunakan REST API guna memperbarui tugas template klasik, lihat Memperbarui tugas streaming template kustom. Untuk menggunakan REST API guna memperbarui tugas Template Flex, lihat Memperbarui tugas Template Flex.
Ambil resource
job
untuk tugas yang ingin Anda ganti menggunakan metodeprojects.locations.jobs.get
. Sertakan parameter kueriview
dengan nilaiJOB_VIEW_DESCRIPTION
. MenyertakanJOB_VIEW_DESCRIPTION
membatasi jumlah data dalam respons sehingga permintaan berikutnya tidak melebihi batas ukuran. Jika Anda memerlukan informasi tugas yang lebih mendetail, gunakan nilaiJOB_VIEW_ALL
.GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
Ganti nilai berikut:
- PROJECT_ID: Google Cloud project ID tugas Dataflow
- REGION: region tugas yang ingin Anda perbarui
- JOB_ID: ID tugas yang ingin Anda perbarui
Untuk memperbarui tugas, gunakan metode
projects.locations.jobs.create
. Dalam isi permintaan, gunakan resourcejob
yang Anda ambil.POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
Ganti kode berikut:
- JOB_ID: ID tugas yang sama dengan ID tugas yang ingin Anda update.
- JOB_NAME: nama tugas yang sama dengan nama tugas yang ingin Anda perbarui.
Jika ada nama transformasi dalam pipeline yang berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan kolom
transformNameMapping
.Opsional: Untuk mengirim permintaan menggunakan curl (Linux, macOS, atau Cloud Shell), simpan permintaan ke file JSON, lalu jalankan perintah berikut:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
Ganti FILE_PATH dengan jalur ke file JSON yang berisi isi permintaan.
Tentukan nama tugas penggantian Anda
Java
Saat Anda meluncurkan tugas penggantian, nilai yang Anda teruskan untuk opsi --jobName
harus sama persis dengan nama tugas yang ingin Anda ganti.
Python
Saat Anda meluncurkan tugas penggantian, nilai yang Anda teruskan untuk opsi --job_name
harus sama persis dengan nama tugas yang ingin Anda ganti.
Go
Saat Anda meluncurkan tugas penggantian, nilai yang Anda teruskan untuk opsi --job_name
harus sama persis dengan nama tugas yang ingin Anda ganti.
gcloud
Saat Anda meluncurkan tugas penggantian, JOB_NAME harus sama persis dengan nama tugas yang ingin Anda ganti.
REST
Tetapkan nilai kolom replaceJobId
ke ID tugas yang sama dengan tugas yang ingin
Anda perbarui. Untuk menemukan nilai nama tugas yang benar, pilih tugas sebelumnya di
Antarmuka Pemantauan Dataflow.
Kemudian, di panel samping Info tugas, temukan kolom ID tugas.
Untuk menemukan nilai nama tugas yang benar, pilih tugas sebelumnya di Antarmuka Pemantauan Dataflow. Kemudian, di panel samping Job info, temukan kolom Job name:

Atau, kueri daftar tugas yang ada menggunakan
Antarmuka Command Line Dataflow.
Masukkan perintah gcloud dataflow jobs list
ke jendela shell atau terminal Anda untuk mendapatkan daftar tugas Dataflow di project Google CloudAnda, dan temukan kolom NAME
untuk tugas yang ingin Anda ganti:
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
Membuat pemetaan transformasi
Jika pipeline pengganti mengubah nama transformasi dari nama di pipeline sebelumnya, layanan Dataflow memerlukan pemetaan transformasi. Pemetaan transformasi memetakan transformasi bernama dalam kode pipeline sebelumnya ke nama dalam kode pipeline pengganti.
Java
Teruskan pemetaan menggunakan opsi command line --transformNameMapping
,
menggunakan format umum berikut:
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transformNameMapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline
pengganti.
Saat menjalankan dengan --transformNameMapping
,
Anda mungkin perlu meng-escape
tanda petik sesuai dengan shell Anda. Misalnya, di Bash:
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
Teruskan pemetaan menggunakan opsi command line --transform_name_mapping
,
menggunakan format umum berikut:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline
pengganti.
Saat menjalankan dengan --transform_name_mapping
,
Anda mungkin perlu meng-escape
tanda petik sesuai dengan shell Anda. Misalnya, di Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Go
Teruskan pemetaan menggunakan opsi command line --transform_name_mapping
,
menggunakan format umum berikut:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline
pengganti.
Saat menjalankan dengan --transform_name_mapping
,
Anda mungkin perlu meng-escape
tanda petik sesuai dengan shell Anda. Misalnya, di Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
Teruskan pemetaan menggunakan opsi --transform-name-mappings
, dengan format umum berikut:
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform-name-mappings
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline
pengganti.
Saat menjalankan dengan --transform-name-mappings
,
Anda mungkin perlu meng-escape tanda petik sesuai dengan shell Anda. Misalnya, di Bash:
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
Teruskan pemetaan menggunakan kolom transformNameMapping
, dengan format umum berikut:
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
Anda hanya perlu memberikan entri pemetaan di transformNameMapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline
pengganti.
Menentukan nama transformasi
Nama transformasi di setiap instance dalam peta adalah nama yang Anda berikan saat Anda menerapkan transformasi dalam pipeline. Contoh:
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Go
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
Anda juga bisa mendapatkan nama transformasi untuk tugas sebelumnya dengan memeriksa grafik eksekusi tugas di Antarmuka Monitoring Dataflow:

Penamaan transformasi gabungan
Nama transformasi bersifat hierarkis, berdasarkan hierarki transformasi di
pipeline Anda. Jika pipeline Anda memiliki
transformasi gabungan,
transformasi bertingkat diberi nama berdasarkan transformasi yang memuatnya. Misalnya, pipeline Anda berisi transformasi gabungan bernama CountWidgets
, yang berisi transformasi dalam bernama Parse
. Nama lengkap
transformasi Anda adalah CountWidgets/Parse
, dan Anda harus menentukan
nama lengkap tersebut dalam pemetaan transformasi.
Jika pipeline baru Anda memetakan transformasi komposit ke nama yang berbeda, semua transformasi bertingkat juga akan otomatis diganti namanya. Anda harus menentukan nama yang diubah untuk transformasi dalam di pemetaan transformasi.
Memfaktorkan ulang hierarki transformasi
Jika pipeline pengganti menggunakan hierarki transformasi yang berbeda dengan pipeline sebelumnya, Anda harus mendeklarasikan pemetaan secara eksplisit. Anda mungkin memiliki hierarki transformasi yang berbeda karena Anda memfaktorkan ulang transformasi komposit, atau pipeline Anda bergantung pada transformasi komposit dari library yang berubah.
Misalnya, pipeline sebelumnya menerapkan transformasi gabungan, CountWidgets
,
yang berisi transformasi dalam bernama Parse
. Pipeline penggantian
memfaktorkan ulang CountWidgets
, dan menyusun Parse
di dalam transformasi lain bernama
Scan
. Agar pembaruan berhasil, Anda harus memetakan nama transformasi lengkap secara eksplisit di pipeline sebelumnya (CountWidgets/Parse
) ke nama transformasi di pipeline baru (CountWidgets/Scan/Parse
):
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus
memberikan pemetaan null. Misalkan pipeline penggantian Anda menghapus transformasi
CountWidgets/Parse
sepenuhnya:
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus
memberikan pemetaan null. Misalkan pipeline penggantian Anda menghapus transformasi
CountWidgets/Parse
sepenuhnya:
--transform_name_mapping={"CountWidgets/Parse":""}
Go
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus
memberikan pemetaan null. Misalkan pipeline penggantian Anda menghapus transformasi
CountWidgets/Parse
sepenuhnya:
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus
memberikan pemetaan null. Misalkan pipeline penggantian Anda menghapus transformasi
CountWidgets/Parse
sepenuhnya:
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus
memberikan pemetaan null. Misalkan pipeline penggantian Anda menghapus transformasi
CountWidgets/Parse
sepenuhnya:
"transformNameMapping": {
CountWidgets/main.Parse: null
}
Efek penggantian tugas
Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang telah diperbarui. Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas pengganti dengan ID Tugas yang diperbarui. Proses ini dapat menyebabkan periode nonaktif saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru dimulai.
Tugas penggantian mempertahankan item berikut:
- Data status perantara dari tugas sebelumnya. Cache dalam memori tidak disimpan.
- Rekaman data yang di-buffer atau metadata yang saat ini "sedang diproses" dari tugas sebelumnya. Misalnya, beberapa kumpulan data dalam pipeline Anda mungkin di-buffer saat menunggu periode untuk diselesaikan.
- Pembaruan opsi tugas dalam proses yang Anda terapkan ke tugas sebelumnya.
Data status menengah
Data status perantara dari tugas sebelumnya dipertahankan. Data status tidak mencakup cache dalam memori. Jika Anda ingin mempertahankan data cache dalam memori saat memperbarui pipeline, sebagai solusi sementara, refaktorkan pipeline Anda untuk mengonversi cache menjadi data status atau menjadi input samping. Untuk mengetahui informasi selengkapnya tentang penggunaan input samping, lihat Pola input samping dalam dokumentasi Apache Beam.
Pipeline streaming memiliki batas ukuran untuk ValueState
dan untuk input samping.
Akibatnya, jika Anda memiliki cache besar yang ingin dipertahankan, Anda mungkin perlu
menggunakan penyimpanan eksternal, seperti Memorystore atau Bigtable.
Data dalam penerbangan
Data "dalam proses" masih diproses oleh transformasi di pipeline baru Anda. Namun, transformasi tambahan yang Anda tambahkan dalam kode pipeline penggantian mungkin atau mungkin tidak berlaku, bergantung pada tempat data di-buffer. Dalam contoh ini, pipeline yang ada memiliki transformasi berikut:
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
Anda dapat mengganti tugas dengan kode pipeline baru, sebagai berikut:
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
Meskipun Anda menambahkan transformasi untuk memfilter string yang diawali dengan huruf "A", transformasi berikutnya (FormatStrings
) mungkin masih melihat string dalam buffer atau dalam proses yang diawali dengan "A" yang ditransfer dari tugas sebelumnya.
Mengubah windowing
Anda dapat mengubah strategi windowing dan pemicuan untuk elemen PCollection
di pipeline penggantian, tetapi berhati-hatilah.
Mengubah strategi penentuan jendela atau pemicu tidak akan memengaruhi data yang
sudah di-buffer atau sedang dalam proses.
Sebaiknya Anda hanya mencoba perubahan kecil pada windowing pipeline, seperti mengubah durasi jendela waktu tetap atau geser. Melakukan perubahan besar pada jendela atau pemicu, seperti mengubah algoritma jendela, dapat memberikan hasil yang tidak dapat diprediksi pada output pipeline Anda.
Pemeriksaan kompatibilitas pekerjaan
Saat Anda meluncurkan tugas pengganti, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas pengganti dan tugas sebelumnya. Jika pemeriksaan kompatibilitas berhasil, tugas sebelumnya akan dihentikan. Tugas pengganti Anda kemudian diluncurkan di layanan Dataflow sambil mempertahankan nama tugas yang sama. Jika pemeriksaan kompatibilitas gagal, tugas sebelumnya akan terus berjalan di layanan Dataflow dan tugas pengganti akan menampilkan error.
Java
Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi sementara saat ini terdiri dari langkah-langkah berikut:
- Gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda.
- Jalankan program pipeline penggantian dengan opsi
--update
. - Tunggu hingga tugas penggantian berhasil melewati pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Atau, Anda dapat memantau status tugas penggantian di Antarmuka Pemantauan Dataflow. Jika tugas Anda berhasil dimulai, tugas tersebut juga lulus pemeriksaan kompatibilitas.
Python
Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi sementara saat ini terdiri dari langkah-langkah berikut:
- Gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda.
- Jalankan program pipeline penggantian dengan opsi
--update
. - Tunggu hingga tugas penggantian berhasil melewati pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Atau, Anda dapat memantau status tugas penggantian di Antarmuka Pemantauan Dataflow. Jika tugas Anda berhasil dimulai, tugas tersebut juga lulus pemeriksaan kompatibilitas.
Go
Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal.
Secara khusus, Anda harus menentukan eksekusi non-blocking menggunakan
flag --execute_async
atau --async
. Solusi sementara saat ini terdiri dari langkah-langkah berikut:
- Jalankan program pipeline penggantian dengan opsi
--update
dan tanpa tanda--execute_async
atau--async
. - Tunggu hingga tugas penggantian berhasil melewati pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
gcloud
Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi sementara saat ini terdiri dari langkah-langkah berikut:
- Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
- Jalankan program pipeline penggantian dengan opsi
--update
. - Tunggu hingga tugas penggantian berhasil melewati pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
REST
Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi sementara saat ini terdiri dari langkah-langkah berikut:
- Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
- Jalankan program pipeline penggantian dengan kolom
replaceJobId
. - Tunggu hingga tugas penggantian berhasil melewati pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Pemeriksaan kompatibilitas menggunakan pemetaan transformasi yang diberikan untuk memastikan Dataflow dapat mentransfer data status perantara dari langkah-langkah dalam tugas sebelumnya ke tugas pengganti. Pemeriksaan kompatibilitas juga memastikan bahwa PCollection
di pipeline Anda menggunakan Coder yang sama.
Mengubah Coder
dapat menyebabkan pemeriksaan kompatibilitas gagal karena data dalam proses atau rekaman yang di-buffer mungkin tidak diserialisasi dengan benar di pipeline penggantian.
Mencegah gangguan kompatibilitas
Perbedaan tertentu antara pipeline sebelumnya dan pipeline pengganti dapat menyebabkan pemeriksaan kompatibilitas gagal. Perbedaan ini mencakup:
- Mengubah grafik pipeline tanpa memberikan pemetaan. Saat Anda memperbarui tugas, Dataflow akan mencoba mencocokkan transformasi dalam tugas sebelumnya dengan transformasi dalam tugas pengganti. Proses pencocokan ini membantu Dataflow mentransfer data status perantara untuk setiap langkah. Jika Anda mengganti nama atau menghapus langkah apa pun, Anda harus memberikan pemetaan transformasi agar Dataflow dapat mencocokkan data status dengan tepat.
- Mengubah input samping untuk langkah. Menambahkan input samping ke atau menghapusnya dari transformasi dalam pipeline penggantian menyebabkan pemeriksaan kompatibilitas gagal.
- Mengubah Coder untuk langkah. Saat Anda memperbarui tugas, Dataflow akan mempertahankan semua rekaman data yang saat ini di-buffer dan menanganinya dalam tugas pengganti. Misalnya, data yang di-buffer dapat terjadi saat windowing sedang diselesaikan. Jika tugas penggantian menggunakan encoding data yang berbeda atau tidak kompatibel, Dataflow tidak dapat melakukan serialisasi atau deserialisasi rekaman ini.
Menghapus operasi "stateful" dari pipeline Anda. Jika Anda menghapus operasi stateful dari pipeline, tugas pengganti Anda mungkin gagal dalam pemeriksaan kompatibilitas. Dataflow dapat menggabungkan beberapa langkah untuk efisiensi. Jika Anda menghapus operasi yang bergantung pada status dari dalam langkah gabungan, pemeriksaan akan gagal. Operasi stateful meliputi:
- Transformasi yang menghasilkan atau menggunakan input samping.
- Pembacaan I/O.
- Transformasi yang menggunakan status yang dikunci.
- Transformasi yang memiliki penggabungan jendela.
Mengubah variabel
DoFn
stateful. Untuk tugas streaming yang sedang berlangsung, jika pipeline Anda menyertakanDoFn
s stateful, mengubah variabelDoFn
stateful dapat menyebabkan pipeline gagal.Mencoba menjalankan penggantian tugas di zona geografis yang berbeda. Jalankan tugas penggantian di zona yang sama dengan tempat Anda menjalankan tugas sebelumnya.
Memperbarui skema
Apache Beam memungkinkan PCollection
memiliki skema dengan kolom bernama, sehingga Coder eksplisit tidak diperlukan. Jika nama dan jenis kolom untuk skema tertentu
tidak berubah (termasuk kolom bertingkat), skema tersebut tidak menyebabkan
pemeriksaan update gagal. Namun, update mungkin masih diblokir jika segmen lain dari pipeline baru tidak kompatibel.
Mengembangkan skema
Sering kali, skema PCollection
perlu dikembangkan karena persyaratan bisnis yang terus berubah. Layanan Dataflow memungkinkan melakukan perubahan berikut pada skema saat memperbarui pipeline:
- Menambahkan satu atau beberapa kolom baru ke skema, termasuk kolom bertingkat.
- Membuat jenis kolom wajib diisi (non-nullable) menjadi opsional (nullable).
Penghapusan kolom, perubahan nama kolom, atau perubahan jenis kolom tidak diizinkan selama pembaruan.
Meneruskan data tambahan ke operasi ParDo yang ada
Anda dapat meneruskan data tambahan (di luar band) ke operasi ParDo yang ada dengan menggunakan salah satu metode berikut, bergantung pada kasus penggunaan Anda:
- Serialkan informasi sebagai kolom di subclass
DoFn
Anda. - Variabel apa pun yang dirujuk oleh metode dalam
DoFn
anonim akan diserialisasi secara otomatis. - Menghitung data di dalam
DoFn.startBundle()
. - Teruskan data menggunakan
ParDo.withSideInputs
.
Untuk informasi lebih lanjut, lihat halaman berikut:
- Panduan pemrograman Apache Beam: ParDo, khususnya bagian tentang cara membuat DoFn dan input samping.
- Referensi Apache Beam SDK untuk Java: ParDo