串流管道。從 Spanner 變更串流讀取資料,並寫入來源。
範本參數
參數 | 說明 |
---|---|
changeStreamName |
管道讀取的 Spanner 變更串流名稱。 |
instanceId |
變更串流所在的 Spanner 執行個體名稱。 |
databaseId |
變更串流監控的 Spanner 資料庫名稱。 |
spannerProjectId |
Spanner 專案名稱。 |
metadataInstance |
這個執行個體會儲存連接器使用的中繼資料,以控管變更串流 API 資料的用量。 |
metadataDatabase |
這個資料庫會儲存連接器使用的中繼資料,用來控管變更串流 API 資料的用量。 |
sourceShardsFilePath |
Cloud Storage 檔案的路徑,其中包含來源分片的連線設定檔資訊。 |
startTimestamp |
選用:變更朗讀的開始時間戳記。預設為空白。 |
endTimestamp |
選用:讀取變更的結束時間戳記。如未提供時間戳記,則會無限期讀取。預設為空白。 |
shadowTablePrefix |
選用:用於命名影子資料表的前置字串。預設值:shadow_ 。 |
sessionFilePath |
選填:Cloud Storage 中的工作階段路徑,內含 HarbourBridge 的對應資訊。 |
filtrationMode |
選用:篩選模式。指定如何根據條件捨棄特定記錄。支援的模式包括:none (不篩選任何內容)、forward_migration (篩選使用轉送遷移管道寫入的記錄)。預設值為 forward_migration 。 |
shardingCustomJarPath |
選用:Cloud Storage 中的自訂 JAR 檔案位置,內含擷取分片 ID 的自訂邏輯。如果設定這個參數,請設定 shardingCustomJarPath 參數。預設為空白。 |
shardingCustomClassName |
選用:具有自訂分片 ID 實作的完整類別名稱。如果指定 shardingCustomJarPath ,則必須提供這項參數。預設為空白。 |
shardingCustomParameters |
選用:包含要傳遞至自訂分片類別的任何自訂參數的字串。預設為空白。 |
sourceDbTimezoneOffset |
選用:來源資料庫與世界標準時間的時差。範例值:+10:00。預設值為 +00:00 。 |
dlqGcsPubSubSubscription |
選用:在一般模式下執行時,Cloud Storage 通知政策中使用的 Pub/Sub 訂閱項目,適用於 DLQ 重試目錄。名稱格式應為 projects/<project-id>/subscriptions/<subscription-name>。設定後,系統會忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。 |
skipDirectoryName |
選用:從反向複製作業中略過的記錄會寫入這個目錄。預設目錄名稱為 skip。 |
maxShardConnections |
選用:特定分片可接受的連線數上限。預設值為 10000 。 |
deadLetterQueueDirectory |
選用:儲存錯誤佇列輸出內容時使用的路徑。預設路徑是 Dataflow 工作暫時位置下的目錄。 |
dlqMaxRetryCount |
選用:透過死信佇列重試暫時性錯誤的次數上限。預設值為 500。 |
runMode |
選用:執行模式類型。支援的值:regular 、retryDLQ 。預設值:regular 。指定 retryDLQ ,只重試嚴重死信佇列記錄。 |
dlqRetryMinutes |
選用:無效信件佇列重試間隔分鐘數。預設值為 10。 |
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Spanner Change Streams to Source Database template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud CLI
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_to_SourceDb \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ changeStreamName=CHANGE_STREAM_NAME,\ instanceId=INSTANCE_ID,\ databaseId=DATABASE_ID,\ spannerProjectId=SPANNER_PROJECT_ID,\ metadataInstance=METADATA_INSTANCE,\ metadataDatabase=METADATA_DATABASE,\ sourceShardsFilePath=SOURCE_SHARDS_FILE_PATH,\
更改下列內容:
JOB_NAME
: 您選擇的不重複工作名稱VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
CHANGE_STREAM_NAME
:要讀取的變更串流名稱INSTANCE_ID
:Cloud Spanner 執行個體 ID。DATABASE_ID
:Cloud Spanner 資料庫 ID。SPANNER_PROJECT_ID
:Cloud Spanner 專案 ID。METADATA_INSTANCE
:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 執行個體METADATA_DATABASE
:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 資料庫SOURCE_SHARDS_FILE_PATH
:包含來源分片詳細資料的 GCS 檔案路徑
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "changeStreamName": "CHANGE_STREAM_NAME", "instanceId": "INSTANCE_ID", "databaseId": "DATABASE_ID", "spannerProjectId": "SPANNER_PROJECT_ID", "metadataInstance": "METADATA_INSTANCE", "metadataDatabase": "METADATA_DATABASE", "sourceShardsFilePath": "SOURCE_SHARDS_FILE_PATH", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_to_SourceDb", "environment": { "maxWorkers": "10" } } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
CHANGE_STREAM_NAME
:要讀取的變更串流名稱INSTANCE_ID
:Cloud Spanner 執行個體 ID。DATABASE_ID
:Cloud Spanner 資料庫 ID。SPANNER_PROJECT_ID
:Cloud Spanner 專案 ID。METADATA_INSTANCE
:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 執行個體METADATA_DATABASE
:從變更串流讀取資料時,用於儲存中繼資料的 Cloud Spanner 資料庫SOURCE_SHARDS_FILE_PATH
:包含來源分片詳細資料的 GCS 檔案路徑
範本原始碼
Java
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.CASSANDRA_SOURCE_TYPE;
import static com.google.cloud.teleport.v2.spanner.migrations.constants.Constants.MYSQL_SOURCE_TYPE;
import com.google.cloud.Timestamp;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.metadata.CassandraSourceMetadata;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SpannerTable;
import com.google.cloud.teleport.v2.spanner.migrations.shard.CassandraShard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
import com.google.cloud.teleport.v2.templates.SpannerToSourceDb.Options;
import com.google.cloud.teleport.v2.templates.changestream.TrimmedShardedDataChangeRecord;
import com.google.cloud.teleport.v2.templates.constants.Constants;
import com.google.cloud.teleport.v2.templates.transforms.AssignShardIdFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertChangeStreamErrorRecordToFailsafeElementFn;
import com.google.cloud.teleport.v2.templates.transforms.ConvertDlqRecordToTrimmedShardedDataChangeRecordFn;
import com.google.cloud.teleport.v2.templates.transforms.FilterRecordsFn;
import com.google.cloud.teleport.v2.templates.transforms.PreprocessRecordsFn;
import com.google.cloud.teleport.v2.templates.transforms.SourceWriterTransform;
import com.google.cloud.teleport.v2.templates.transforms.UpdateDlqMetricsFn;
import com.google.cloud.teleport.v2.templates.utils.CassandraSourceSchemaReader;
import com.google.cloud.teleport.v2.templates.utils.ShadowTableCreator;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** This pipeline reads Spanner Change streams data and writes them to a source DB. */
@Template(
name = "Spanner_to_SourceDb",
category = TemplateCategory.STREAMING,
displayName = "Spanner Change Streams to Source Database",
description =
"Streaming pipeline. Reads data from Spanner Change Streams and"
+ " writes them to a source.",
optionsClass = Options.class,
flexContainerName = "spanner-to-sourcedb",
contactInformation = "https://cloud.google.com/support",
hidden = false,
streaming = true)
public class SpannerToSourceDb {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDb.class);
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, StreamingOptions {
@TemplateParameter.Text(
order = 1,
optional = false,
description = "Name of the change stream to read from",
helpText =
"This is the name of the Spanner change stream that the pipeline will read from.")
String getChangeStreamName();
void setChangeStreamName(String value);
@TemplateParameter.Text(
order = 2,
optional = false,
description = "Cloud Spanner Instance Id.",
helpText =
"This is the name of the Cloud Spanner instance where the changestream is present.")
String getInstanceId();
void setInstanceId(String value);
@TemplateParameter.Text(
order = 3,
optional = false,
description = "Cloud Spanner Database Id.",
helpText =
"This is the name of the Cloud Spanner database that the changestream is monitoring")
String getDatabaseId();
void setDatabaseId(String value);
@TemplateParameter.ProjectId(
order = 4,
optional = false,
description = "Cloud Spanner Project Id.",
helpText = "This is the name of the Cloud Spanner project.")
String getSpannerProjectId();
void setSpannerProjectId(String projectId);
@TemplateParameter.Text(
order = 5,
optional = false,
description = "Cloud Spanner Instance to store metadata when reading from changestreams",
helpText =
"This is the instance to store the metadata used by the connector to control the"
+ " consumption of the change stream API data.")
String getMetadataInstance();
void setMetadataInstance(String value);
@TemplateParameter.Text(
order = 6,
optional = false,
description = "Cloud Spanner Database to store metadata when reading from changestreams",
helpText =
"This is the database to store the metadata used by the connector to control the"
+ " consumption of the change stream API data.")
String getMetadataDatabase();
void setMetadataDatabase(String value);
@TemplateParameter.Text(
order = 7,
optional = true,
description = "Changes are read from the given timestamp",
helpText = "Read changes from the given timestamp.")
@Default.String("")
String getStartTimestamp();
void setStartTimestamp(String value);
@TemplateParameter.Text(
order = 8,
optional = true,
description = "Changes are read until the given timestamp",
helpText =
"Read changes until the given timestamp. If no timestamp provided, reads indefinitely.")
@Default.String("")
String getEndTimestamp();
void setEndTimestamp(String value);
@TemplateParameter.Text(
order = 9,
optional = true,
description = "Cloud Spanner shadow table prefix.",
helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
@Default.String("rev_shadow_")
String getShadowTablePrefix();
void setShadowTablePrefix(String value);
@TemplateParameter.GcsReadFile(
order = 10,
optional = false,
description = "Path to GCS file containing the the Source shard details",
helpText = "Path to GCS file containing connection profile info for source shards.")
String getSourceShardsFilePath();
void setSourceShardsFilePath(String value);
@TemplateParameter.GcsReadFile(
order = 11,
optional = true,
description = "Session File Path in Cloud Storage",
helpText =
"Session file path in Cloud Storage that contains mapping information from"
+ " HarbourBridge")
String getSessionFilePath();
void setSessionFilePath(String value);
@TemplateParameter.Enum(
order = 12,
optional = true,
enumOptions = {@TemplateEnumOption("none"), @TemplateEnumOption("forward_migration")},
description = "Filtration mode",
helpText =
"Mode of Filtration, decides how to drop certain records based on a criteria. Currently"
+ " supported modes are: none (filter nothing), forward_migration (filter records"
+ " written via the forward migration pipeline). Defaults to forward_migration.")
@Default.String("forward_migration")
String getFiltrationMode();
void setFiltrationMode(String value);
@TemplateParameter.GcsReadFile(
order = 13,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
"Custom jar location in Cloud Storage that contains the customization logic"
+ " for fetching shard id.")
@Default.String("")
String getShardingCustomJarPath();
void setShardingCustomJarPath(String value);
@TemplateParameter.Text(
order = 14,
optional = true,
description = "Custom class name",
helpText =
"Fully qualified class name having the custom shard id implementation. It is a"
+ " mandatory field in case shardingCustomJarPath is specified")
@Default.String("")
String getShardingCustomClassName();
void setShardingCustomClassName(String value);
@TemplateParameter.Text(
order = 15,
optional = true,
description = "Custom sharding logic parameters",
helpText =
"String containing any custom parameters to be passed to the custom sharding class.")
@Default.String("")
String getShardingCustomParameters();
void setShardingCustomParameters(String value);
@TemplateParameter.Text(
order = 16,
optional = true,
description = "SourceDB timezone offset",
helpText =
"This is the timezone offset from UTC for the source database. Example value: +10:00")
@Default.String("+00:00")
String getSourceDbTimezoneOffset();
void setSourceDbTimezoneOffset(String value);
@TemplateParameter.PubsubSubscription(
order = 17,
optional = true,
description =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode. The name should be in the format"
+ " of projects/<project-id>/subscriptions/<subscription-name>. When set, the"
+ " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
String getDlqGcsPubSubSubscription();
void setDlqGcsPubSubSubscription(String value);
@TemplateParameter.Text(
order = 18,
optional = true,
description = "Directory name for holding skipped records",
helpText =
"Records skipped from reverse replication are written to this directory. Default"
+ " directory name is skip.")
@Default.String("skip")
String getSkipDirectoryName();
void setSkipDirectoryName(String value);
@TemplateParameter.Long(
order = 19,
optional = true,
description = "Maximum connections per shard.",
helpText = "This will come from shard file eventually.")
@Default.Long(10000)
Long getMaxShardConnections();
void setMaxShardConnections(Long value);
@TemplateParameter.Text(
order = 20,
optional = true,
description = "Dead letter queue directory.",
helpText =
"The file path used when storing the error queue output. "
+ "The default file path is a directory under the Dataflow job's temp location.")
@Default.String("")
String getDeadLetterQueueDirectory();
void setDeadLetterQueueDirectory(String value);
@TemplateParameter.Integer(
order = 21,
optional = true,
description = "Dead letter queue maximum retry count",
helpText =
"The max number of times temporary errors can be retried through DLQ. Defaults to 500.")
@Default.Integer(500)
Integer getDlqMaxRetryCount();
void setDlqMaxRetryCount(Integer value);
@TemplateParameter.Enum(
order = 22,
optional = true,
description = "Run mode - currently supported are : regular or retryDLQ",
enumOptions = {@TemplateEnumOption("regular"), @TemplateEnumOption("retryDLQ")},
helpText =
"This is the run mode type, whether regular or with retryDLQ.Default is regular."
+ " retryDLQ is used to retry the severe DLQ records only.")
@Default.String("regular")
String getRunMode();
void setRunMode(String value);
@TemplateParameter.Integer(
order = 23,
optional = true,
description = "Dead letter queue retry minutes",
helpText = "The number of minutes between dead letter queue retries. Defaults to 10.")
@Default.Integer(10)
Integer getDlqRetryMinutes();
void setDlqRetryMinutes(Integer value);
@TemplateParameter.Enum(
order = 24,
optional = true,
description = "Source database type, ex: mysql",
enumOptions = {@TemplateEnumOption("mysql"), @TemplateEnumOption("cassandra")},
helpText = "The type of source database to reverse replicate to.")
@Default.String("mysql")
String getSourceType();
void setSourceType(String value);
@TemplateParameter.GcsReadFile(
order = 25,
optional = true,
description = "Custom transformation jar location in Cloud Storage",
helpText =
"Custom jar location in Cloud Storage that contains the custom transformation logic for processing records"
+ " in reverse replication.")
@Default.String("")
String getTransformationJarPath();
void setTransformationJarPath(String value);
@TemplateParameter.Text(
order = 26,
optional = true,
description = "Custom class name for transformation",
helpText =
"Fully qualified class name having the custom transformation logic. It is a"
+ " mandatory field in case transformationJarPath is specified")
@Default.String("")
String getTransformationClassName();
void setTransformationClassName(String value);
@TemplateParameter.Text(
order = 27,
optional = true,
description = "Custom parameters for transformation",
helpText =
"String containing any custom parameters to be passed to the custom transformation class.")
@Default.String("")
String getTransformationCustomParameters();
void setTransformationCustomParameters(String value);
@TemplateParameter.Text(
order = 28,
optional = true,
description = "Directory name for holding filtered records",
helpText =
"Records skipped from reverse replication are written to this directory. Default"
+ " directory name is skip.")
@Default.String("filteredEvents")
String getFilterEventsDirectoryName();
void setFilterEventsDirectoryName(String value);
@TemplateParameter.Boolean(
order = 29,
optional = true,
description = "Boolean setting if reverse migration is sharded",
helpText =
"Sets the template to a sharded migration. If source shard template contains more"
+ " than one shard, the value will be set to true. This value defaults to false.")
@Default.Boolean(false)
Boolean getIsShardedMigration();
void setIsShardedMigration(Boolean value);
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOG.info("Starting Spanner change streams to sink");
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
pipeline
.getOptions()
.as(DataflowPipelineWorkerPoolOptions.class)
.setAutoscalingAlgorithm(AutoscalingAlgorithmType.THROUGHPUT_BASED);
// calculate the max connections per worker
int maxNumWorkers =
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers() > 0
? pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers()
: 1;
int connectionPoolSizePerWorker = (int) (options.getMaxShardConnections() / maxNumWorkers);
if (connectionPoolSizePerWorker < 1) {
// This can happen when the number of workers is more than max.
// This can cause overload on the source database. Error out and let the user know.
LOG.error(
"Max workers {} is more than max shard connections {}, this can lead to more database"
+ " connections than desired",
maxNumWorkers,
options.getMaxShardConnections());
throw new IllegalArgumentException(
"Max Dataflow workers "
+ maxNumWorkers
+ " is more than max per shard connections: "
+ options.getMaxShardConnections()
+ " this can lead to more"
+ " database connections than desired. Either reduce the max allowed workers or"
+ " incease the max shard connections");
}
// Read the session file for Mysql Only
Schema schema =
MYSQL_SOURCE_TYPE.equals(options.getSourceType())
? SessionFileReader.read(
options.getSessionFilePath()) // Read from session file for MYSQL source type
: new Schema();
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()));
// Create shadow tables
// Note that there is a limit on the number of tables that can be created per DB: 5000.
// If we create shadow tables per shard, there will be an explosion of tables.
// Anyway the shadow table has Spanner PK so no need to again separate by the shard
// Lookup by the Spanner PK should be sufficient.
// Prepare Spanner config
SpannerConfig spannerMetadataConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getSpannerProjectId()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getMetadataInstance()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getMetadataDatabase()));
ShadowTableCreator shadowTableCreator =
new ShadowTableCreator(
spannerConfig,
spannerMetadataConfig,
SpannerAccessor.getOrCreate(spannerMetadataConfig)
.getDatabaseAdminClient()
.getDatabase(
spannerMetadataConfig.getInstanceId().get(),
spannerMetadataConfig.getDatabaseId().get())
.getDialect(),
options.getShadowTablePrefix());
DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class);
shadowTableCreator.createShadowTablesInSpanner();
Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
List<Shard> shards;
String shardingMode;
if (MYSQL_SOURCE_TYPE.equals(options.getSourceType())) {
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
} else {
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
shards = cassandraConfigFileReader.getCassandraShard(options.getSourceShardsFilePath());
LOG.info("Cassandra config is: {}", shards.get(0));
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
}
if (shards.size() == 1 && !options.getIsShardedMigration()) {
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
Shard shard = shards.get(0);
if (shard.getLogicalShardId() == null) {
shard.setLogicalShardId(Constants.DEFAULT_SHARD_ID);
LOG.info(
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);
}
}
if (options.getSourceType().equals(CASSANDRA_SOURCE_TYPE)) {
Map<String, SpannerTable> spannerTableMap =
SpannerSchema.convertDDLTableToSpannerTable(ddl.allTables());
Map<String, NameAndCols> spannerTableNameColsMap =
SpannerSchema.convertDDLTableToSpannerNameAndColsTable(ddl.allTables());
try {
CassandraSourceMetadata cassandraSourceMetadata =
new CassandraSourceMetadata.Builder()
.setResultSet(
CassandraSourceSchemaReader.getInformationSchemaAsResultSet(
(CassandraShard) shards.get(0)))
.build();
schema =
new Schema(
spannerTableMap,
null,
cassandraSourceMetadata.getSourceTableMap(),
spannerTableNameColsMap,
cassandraSourceMetadata.getNameAndColsMap());
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
boolean isRegularMode = "regular".equals(options.getRunMode());
PCollectionTuple reconsumedElements = null;
DeadLetterQueueManager dlqManager = buildDlqManager(options);
int reshuffleBucketSize =
maxNumWorkers
* (debugOptions.getNumberOfWorkerHarnessThreads() > 0
? debugOptions.getNumberOfWorkerHarnessThreads()
: Constants.DEFAULT_WORKER_HARNESS_THREAD_COUNT);
if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
reconsumedElements =
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
new PubSubNotifiedDlqIO(
options.getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
new ArrayList<String>(
Arrays.asList(
"/severe/",
"/tmp_retry",
"/tmp_severe/",
".temp",
"/tmp_skip/",
"/" + options.getSkipDirectoryName())))));
} else {
reconsumedElements =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
}
PCollection<FailsafeElement<String, String>> dlqJsonStrRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<TrimmedShardedDataChangeRecord> dlqRecords =
dlqJsonStrRecords
.apply(
"Convert DLQ records to TrimmedShardedDataChangeRecord",
ParDo.of(new ConvertDlqRecordToTrimmedShardedDataChangeRecordFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
PCollection<TrimmedShardedDataChangeRecord> mergedRecords = null;
if (isRegularMode) {
PCollection<TrimmedShardedDataChangeRecord> changeRecordsFromDB =
pipeline
.apply(
getReadChangeStreamDoFn(
options,
spannerConfig)) // This emits PCollection<DataChangeRecord> which is Spanner
// change
// stream data
.apply("Reshuffle", Reshuffle.viaRandomKey())
.apply("Filteration", ParDo.of(new FilterRecordsFn(options.getFiltrationMode())))
.apply("Preprocess", ParDo.of(new PreprocessRecordsFn()))
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
mergedRecords =
PCollectionList.of(changeRecordsFromDB)
.and(dlqRecords)
.apply("Flatten", Flatten.pCollections())
.setCoder(SerializableCoder.of(TrimmedShardedDataChangeRecord.class));
} else {
mergedRecords = dlqRecords;
}
CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
SourceWriterTransform.Result sourceWriterOutput =
mergedRecords
.apply(
"AssignShardId", // This emits PCollection<KV<Long,
// TrimmedShardedDataChangeRecord>> which is Spanner change stream data with key as
// PK
// mod
// number of parallelism
ParDo.of(
new AssignShardIdFn(
spannerConfig,
schema,
ddl,
shardingMode,
shards.get(0).getLogicalShardId(),
options.getSkipDirectoryName(),
options.getShardingCustomJarPath(),
options.getShardingCustomClassName(),
options.getShardingCustomParameters(),
options.getMaxShardConnections() * shards.size(),
options.getSourceType()))) // currently assuming that all shards accept the
// same
.setCoder(
KvCoder.of(
VarLongCoder.of(), SerializableCoder.of(TrimmedShardedDataChangeRecord.class)))
.apply("Reshuffle2", Reshuffle.of())
.apply(
"Write to source",
new SourceWriterTransform(
shards,
schema,
spannerMetadataConfig,
options.getSourceDbTimezoneOffset(),
ddl,
options.getShadowTablePrefix(),
options.getSkipDirectoryName(),
connectionPoolSizePerWorker,
options.getSourceType(),
customTransformation));
PCollection<FailsafeElement<String, String>> dlqPermErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<FailsafeElement<String, String>> permErrorsFromSourceWriter =
sourceWriterOutput
.permanentErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle3", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert permanent errors from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqPermErrorRecords)
.and(permErrorsFromSourceWriter)
.apply(Flatten.pCollections())
.apply("Reshuffle", Reshuffle.viaRandomKey());
permanentErrors
.apply("Update DLQ metrics", ParDo.of(new UpdateDlqMetricsFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for severe errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> retryErrors =
sourceWriterOutput
.retryableErrors()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle4", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert retryable errors from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
retryErrors
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ for retryable errors",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> skippedRecords =
sourceWriterOutput
.skippedSourceWrites()
.setCoder(StringUtf8Coder.of())
.apply(
"Reshuffle5", Reshuffle.<String>viaRandomKey().withNumBuckets(reshuffleBucketSize))
.apply(
"Convert skipped records from source writer to DLQ format",
ParDo.of(new ConvertChangeStreamErrorRecordToFailsafeElementFn()))
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
skippedRecords
.apply(
"Write skipped records to GCS", MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Writing skipped records to GCS",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(
options.getDeadLetterQueueDirectory() + "/" + options.getSkipDirectoryName())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_skip/")
.setIncludePaneInfo(true)
.build());
return pipeline.run();
}
public static SpannerIO.ReadChangeStream getReadChangeStreamDoFn(
Options options, SpannerConfig spannerConfig) {
Timestamp startTime = Timestamp.now();
if (!options.getStartTimestamp().equals("")) {
startTime = Timestamp.parseTimestamp(options.getStartTimestamp());
}
SpannerIO.ReadChangeStream readChangeStreamDoFn =
SpannerIO.readChangeStream()
.withSpannerConfig(spannerConfig)
.withChangeStreamName(options.getChangeStreamName())
.withMetadataInstance(options.getMetadataInstance())
.withMetadataDatabase(options.getMetadataDatabase())
.withInclusiveStartAt(startTime);
if (!options.getEndTimestamp().equals("")) {
return readChangeStreamDoFn.withInclusiveEndAt(
Timestamp.parseTimestamp(options.getEndTimestamp()));
}
return readChangeStreamDoFn;
}
private static DeadLetterQueueManager buildDlqManager(Options options) {
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String dlqDirectory =
options.getDeadLetterQueueDirectory().isEmpty()
? tempLocation + "dlq/"
: options.getDeadLetterQueueDirectory();
LOG.info("Dead-letter queue directory: {}", dlqDirectory);
options.setDeadLetterQueueDirectory(dlqDirectory);
if ("regular".equals(options.getRunMode())) {
return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount());
} else {
String retryDlqUri =
FileSystems.matchNewResource(dlqDirectory, true)
.resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
LOG.info("Dead-letter retry directory: {}", retryDlqUri);
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
}
}
}