「Datastream 到 Spanner」範本是一種串流管道,可從 Cloud Storage 值區讀取 Datastream 事件,並將這些事件寫入 Spanner 資料庫。這項功能適用於將資料從 Datastream 來源遷移至 Spanner。指定 gcsPubSubSubscription
參數,從 Pub/Sub 通知讀取資料,或提供 inputFilePattern
參數,直接從 Cloud Storage 中的檔案讀取資料。
執行範本前,移轉所需的所有資料表都必須存在於目的地 Spanner 資料庫中。因此,從來源資料庫到目的地 Spanner 的結構定義遷移作業,必須在資料遷移作業前完成。遷移前,資料表可能已有資料。這個範本不會將 Datastream 結構定義變更傳播至 Spanner 資料庫。
只有在遷移結束時,所有資料都寫入 Spanner 後,才能確保資料一致性。為儲存寫入 Spanner 的每筆記錄的排序資訊,這個範本會為 Spanner 資料庫中的每個資料表建立額外資料表 (稱為「影子資料表」)。這項設定可確保遷移完成後的一致性。遷移作業完成後,系統不會刪除影子資料表,您可以在遷移作業結束時使用這些資料表進行驗證。
作業期間發生的任何錯誤 (例如結構定義不相符、JSON 檔案格式不正確,或是執行轉換作業時發生錯誤),都會記錄在錯誤佇列中。錯誤佇列是 Cloud Storage 資料夾,會儲存所有發生錯誤的 Datastream 事件,以及文字格式的錯誤原因。錯誤可能是暫時性或永久性,並會儲存在錯誤佇列中適當的 Cloud Storage 資料夾。系統會自動重試暫時性錯誤,但不會重試永久性錯誤。如果發生永久性錯誤,您可以在範本執行期間,選擇修正變更事件,並將其移至可重試的 bucket。
管道相關規定
- Datastream 串流處於「執行中」或「尚未開始」狀態。
- 用於複製 Datastream 事件的 Cloud Storage bucket。
- 含有現有資料表的 Spanner 資料庫。這些資料表可能為空白,也可能包含資料。
範本參數
必要參數
- instanceId:變更內容複製到的 Spanner 執行個體。
- databaseId:用於複製變更的 Spanner 資料庫。
選用參數
- inputFilePattern:包含要複製的 Datastream 檔案的 Cloud Storage 檔案位置。這通常是串流的根路徑。這項功能已停止支援。請僅使用這項功能,重試進入嚴重 DLQ 的項目。
- inputFileFormat:Datastream 產生的輸出檔案格式。例如
avro,json
。預設值為avro
。 - sessionFilePath:Cloud Storage 中的工作階段檔案路徑,內含 HarbourBridge 的對應資訊。
- projectId:Spanner 專案 ID。
- spannerHost:要在範本中呼叫的 Cloud Spanner 端點。例如,
https://batch-spanner.googleapis.com
。預設值為:https://batch-spanner.googleapis.com。 - gcsPubSubSubscription:Cloud Storage 通知政策中使用的 Pub/Sub 訂閱項目。名稱格式為
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
。 - streamName:要輪詢結構定義資訊和來源類型的串流名稱或範本。
- shadowTablePrefix:用於命名影子資料表的前置字串。預設值:
shadow_
。 - shouldCreateShadowTables:這個旗標表示是否必須在 Cloud Spanner 資料庫中建立影子資料表。預設值為 true。
- rfcStartDateTime:用於從 Cloud Storage 擷取的開始日期時間 (https://tools.ietf.org/html/rfc3339)。預設值為 1970-01-01T00:00:00.00Z。
- fileReadConcurrency:要讀取的並行 DataStream 檔案數量。預設值為 30。
- deadLetterQueueDirectory:儲存錯誤佇列輸出內容時使用的檔案路徑。預設檔案路徑是 Dataflow 工作臨時位置下的目錄。
- dlqRetryMinutes:無效信件佇列重試之間的間隔分鐘數。預設值為
10
。 - dlqMaxRetryCount:透過 DLQ 重試暫時性錯誤的次數上限。預設值為
500
。 - dataStreamRootUrl:Datastream API 根網址。預設值為:https://datastream.googleapis.com/。
- datastreamSourceType:這是 Datastream 連線的來源資料庫類型。例如 mysql/oracle。在沒有實際執行的 Datastream 情況下進行測試時,必須設定這項屬性。
- roundJsonDecimals:如果設定此旗標,系統會將 JSON 資料欄中的小數值四捨五入,以便儲存而不失精確度。預設值為 false。
- runMode:這是執行模式類型,包括一般模式或使用 retryDLQ 的模式。預設值為:regular。
- transformationContextFilePath:雲端儲存空間中的轉換內容檔案路徑,用於填入遷移期間執行的轉換所用資料。例如:分片 ID 至資料庫名稱,用於識別遷移資料列的資料庫。
- directoryWatchDurationInMinutes:管道應持續輪詢 GCS 目錄的時間長度。Datastream 輸出檔案會依目錄結構排列,顯示以分鐘為單位分組的事件時間戳記。這個參數應約等於事件在來源資料庫中發生,以及 Datastream 將相同事件寫入 GCS 之間可能發生的最大延遲。第 99.9 個百分位數 = 10 分鐘。預設值為 10。
- spannerPriority:Cloud Spanner 呼叫的請求優先順序。值必須是 [
HIGH
、MEDIUM
、LOW
] 其中之一。預設為HIGH
。 - dlqGcsPubSubSubscription:在一般模式下執行時,用於 DLQ 重試目錄的 Cloud Storage 通知政策中的 Pub/Sub 訂閱項目。名稱格式為
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
,設定後,系統會忽略 deadLetterQueueDirectory 和 dlqRetryMinutes。 - transformationJarPath:Cloud Storage 中自訂 JAR 檔案的位置,該檔案包含自訂轉換邏輯,用於處理前向遷移作業中的記錄。預設為空白。
- transformationClassName:具有自訂轉換邏輯的完整類別名稱。如果指定 transformationJarPath,則為必填欄位。預設為空白。
- transformationCustomParameters:字串,內含要傳遞至自訂轉換類別的任何自訂參數。預設為空白。
- filteredEventsDirectory:這是儲存透過自訂轉換篩選事件的檔案路徑。預設值是 Dataflow 工作暫時位置下的目錄。在大多數情況下,預設值就已足夠。
- shardingContextFilePath:雲端儲存空間中的分片內容檔案路徑,用於在每個來源分片的 Spanner 資料庫中填入分片 ID。格式為 Map<stream_name, Map<db_name, shard_id>>。
- tableOverrides:這是從來源到 Spanner 的資料表名稱覆寫。格式如下:[{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]這個範例顯示將 Singers 資料表對應至 Vocalists,並將 Albums 資料表對應至 Records。例如,
[{Singers, Vocalists}, {Albums, Records}]
。預設為空白。 - columnOverrides:這是從來源到 Spanner 的資料欄名稱覆寫。格式如下:[{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]請注意,來源和 Spanner 配對中的 SourceTableName 應保持不變。如要覆寫資料表名稱,請使用 tableOverrides。這個範例分別在 Singers 和 Albums 資料表中,將 SingerName 對應至 TalentName,並將 AlbumName 對應至 RecordName。例如,
[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]
。預設為空白。 - schemaOverridesFilePath:這個檔案會指定從來源到 Spanner 的資料表和資料欄名稱覆寫。預設為空白。
- shadowTableSpannerDatabaseId:影子資料表的選用獨立資料庫。如未指定,系統會在主要資料庫中建立影子資料表。如果指定了這個值,請務必一併指定 shadowTableSpannerInstanceId。預設為空白。
- shadowTableSpannerInstanceId:影子資料表的選用獨立執行個體。如未指定,系統會在主要執行個體中建立影子資料表。如果指定,請確保也指定 shadowTableSpannerDatabaseId。預設為空白。
- failureInjectionParameter:失敗植入參數。僅供測試。預設為空白。
執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Datastream to Spanner template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ streamName=STREAM_NAME,\ instanceId=CLOUDSPANNER_INSTANCE,\ databaseId=CLOUDSPANNER_DATABASE,\ deadLetterQueueDirectory=DLQ
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
:用於儲存資料串流事件的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
:您的 Spanner 執行個體。CLOUDSPANNER_DATABASE
:您的 Spanner 資料庫。DLQ
:錯誤佇列目錄的 Cloud Storage 路徑。
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
:用於儲存資料串流事件的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
CLOUDSPANNER_INSTANCE
:您的 Spanner 執行個體。CLOUDSPANNER_DATABASE
:您的 Spanner 資料庫。DLQ
:錯誤佇列目錄的 Cloud Storage 路徑。
範本原始碼
Java
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.datastream.v1.model.SourceConfig;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager;
import com.google.cloud.teleport.v2.cdc.dlq.PubSubNotifiedDlqIO;
import com.google.cloud.teleport.v2.cdc.dlq.StringDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.utils.DataStreamClient;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NoopSchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardingContextReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.TransformationContextReader;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner.Options;
import com.google.cloud.teleport.v2.templates.constants.DatastreamToSpannerConstants;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.templates.spanner.ProcessInformationSchema;
import com.google.cloud.teleport.v2.templates.transform.ChangeEventTransformerDoFn;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.SpannerServiceFactoryImpl;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline ingests DataStream data from GCS as events. The events are written to Cloud
* Spanner.
*
* <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/README_Cloud_Datastream_to_Spanner.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Datastream_to_Spanner",
category = TemplateCategory.STREAMING,
displayName = "Datastream to Cloud Spanner",
description = {
"The Datastream to Cloud Spanner template is a streaming pipeline that reads <a"
+ " href=\"https://cloud.google.com/datastream/docs\">Datastream</a> events from a Cloud"
+ " Storage bucket and writes them to a Cloud Spanner database. It is intended for data"
+ " migration from Datastream sources to Cloud Spanner.\n",
"All tables required for migration must exist in the destination Cloud Spanner database prior"
+ " to template execution. Hence schema migration from a source database to destination"
+ " Cloud Spanner must be completed prior to data migration. Data can exist in the tables"
+ " prior to migration. This template does not propagate Datastream schema changes to the"
+ " Cloud Spanner database.\n",
"Data consistency is guaranteed only at the end of migration when all data has been written"
+ " to Cloud Spanner. To store ordering information for each record written to Cloud"
+ " Spanner, this template creates an additional table (called a shadow table) for each"
+ " table in the Cloud Spanner database. This is used to ensure consistency at the end of"
+ " migration. The shadow tables are not deleted after migration and can be used for"
+ " validation purposes at the end of migration.\n",
"Any errors that occur during operation, such as schema mismatches, malformed JSON files, or"
+ " errors resulting from executing transforms, are recorded in an error queue. The error"
+ " queue is a Cloud Storage folder which stores all the Datastream events that had"
+ " encountered errors along with the error reason in text format. The errors can be"
+ " transient or permanent and are stored in appropriate Cloud Storage folders in the"
+ " error queue. The transient errors are retried automatically while the permanent"
+ " errors are not. In case of permanent errors, you have the option of making"
+ " corrections to the change events and moving them to the retriable bucket while the"
+ " template is running."
},
optionsClass = Options.class,
flexContainerName = "datastream-to-spanner",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-cloud-spanner",
contactInformation = "https://cloud.google.com/support",
requirements = {
"A Datastream stream in Running or Not started state.",
"A Cloud Storage bucket where Datastream events are replicated.",
"A Cloud Spanner database with existing tables. These tables can be empty or contain data.",
},
streaming = true,
supportsAtLeastOnce = true)
public class DataStreamToSpanner {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner.class);
private static final String AVRO_SUFFIX = "avro";
private static final String JSON_SUFFIX = "json";
/**
* Options supported by the pipeline.
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions, StreamingOptions, DataflowPipelineWorkerPoolOptions {
@TemplateParameter.GcsReadFile(
order = 1,
groupName = "Source",
optional = true,
description =
"File location for Datastream file output in Cloud Storage. Support for this feature has been disabled.",
helpText =
"The Cloud Storage file location that contains the Datastream files to replicate. Typically, "
+ "this is the root path for a stream. Support for this feature has been disabled."
+ " Please use this feature only for retrying entries that land in severe DLQ.")
String getInputFilePattern();
void setInputFilePattern(String value);
@TemplateParameter.Enum(
order = 2,
enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
optional = true,
description = "Datastream output file format (avro/json).",
helpText =
"The format of the output file produced by Datastream. For example `avro,json`. Defaults to `avro`.")
@Default.String("avro")
String getInputFileFormat();
void setInputFileFormat(String value);
@TemplateParameter.GcsReadFile(
order = 3,
optional = true,
description = "Session File Path in Cloud Storage",
helpText =
"Session file path in Cloud Storage that contains mapping information from"
+ " HarbourBridge")
String getSessionFilePath();
void setSessionFilePath(String value);
@TemplateParameter.Text(
order = 4,
groupName = "Target",
description = "Cloud Spanner Instance Id.",
helpText = "The Spanner instance where the changes are replicated.")
String getInstanceId();
void setInstanceId(String value);
@TemplateParameter.Text(
order = 5,
groupName = "Target",
description = "Cloud Spanner Database Id.",
helpText = "The Spanner database where the changes are replicated.")
String getDatabaseId();
void setDatabaseId(String value);
@TemplateParameter.ProjectId(
order = 6,
groupName = "Target",
optional = true,
description = "Cloud Spanner Project Id.",
helpText = "The Spanner project ID.")
String getProjectId();
void setProjectId(String projectId);
@TemplateParameter.Text(
order = 7,
groupName = "Target",
optional = true,
description = "The Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template.",
example = "https://batch-spanner.googleapis.com")
@Default.String("https://batch-spanner.googleapis.com")
String getSpannerHost();
void setSpannerHost(String value);
@TemplateParameter.PubsubSubscription(
order = 8,
optional = true,
description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy. For the name,"
+ " use the format `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`.")
String getGcsPubSubSubscription();
void setGcsPubSubSubscription(String value);
@TemplateParameter.Text(
order = 9,
groupName = "Source",
optional = true,
description = "Datastream stream name.",
helpText =
"The name or template for the stream to poll for schema information and source type.")
String getStreamName();
void setStreamName(String value);
@TemplateParameter.Text(
order = 10,
optional = true,
description = "Cloud Spanner shadow table prefix.",
helpText = "The prefix used to name shadow tables. Default: `shadow_`.")
@Default.String("shadow_")
String getShadowTablePrefix();
void setShadowTablePrefix(String value);
@TemplateParameter.Boolean(
order = 11,
optional = true,
description = "If true, create shadow tables in Cloud Spanner.",
helpText =
"This flag indicates whether shadow tables must be created in Cloud Spanner database.")
@Default.Boolean(true)
Boolean getShouldCreateShadowTables();
void setShouldCreateShadowTables(Boolean value);
@TemplateParameter.DateTime(
order = 12,
optional = true,
description =
"The starting DateTime used to fetch from Cloud Storage "
+ "(https://tools.ietf.org/html/rfc3339).",
helpText =
"The starting DateTime used to fetch from Cloud Storage "
+ "(https://tools.ietf.org/html/rfc3339).")
@Default.String("1970-01-01T00:00:00.00Z")
String getRfcStartDateTime();
void setRfcStartDateTime(String value);
@TemplateParameter.Integer(
order = 13,
optional = true,
description = "File read concurrency",
helpText = "The number of concurrent DataStream files to read.")
@Default.Integer(30)
Integer getFileReadConcurrency();
void setFileReadConcurrency(Integer value);
@TemplateParameter.Text(
order = 14,
optional = true,
description = "Dead letter queue directory.",
helpText =
"The file path used when storing the error queue output. "
+ "The default file path is a directory under the Dataflow job's temp location.")
@Default.String("")
String getDeadLetterQueueDirectory();
void setDeadLetterQueueDirectory(String value);
@TemplateParameter.Integer(
order = 15,
optional = true,
description = "Dead letter queue retry minutes",
helpText = "The number of minutes between dead letter queue retries. Defaults to `10`.")
@Default.Integer(10)
Integer getDlqRetryMinutes();
void setDlqRetryMinutes(Integer value);
@TemplateParameter.Integer(
order = 16,
optional = true,
description = "Dead letter queue maximum retry count",
helpText =
"The max number of times temporary errors can be retried through DLQ. Defaults to `500`.")
@Default.Integer(500)
Integer getDlqMaxRetryCount();
void setDlqMaxRetryCount(Integer value);
// DataStream API Root Url (only used for testing)
@TemplateParameter.Text(
order = 17,
optional = true,
description = "Datastream API Root URL (only required for testing)",
helpText = "Datastream API Root URL.")
@Default.String("https://datastream.googleapis.com/")
String getDataStreamRootUrl();
void setDataStreamRootUrl(String value);
@TemplateParameter.Text(
order = 18,
optional = true,
description = "Datastream source type (only required for testing)",
helpText =
"This is the type of source database that Datastream connects to. Example -"
+ " mysql/oracle. Need to be set when testing without an actual running"
+ " Datastream.")
String getDatastreamSourceType();
void setDatastreamSourceType(String value);
@TemplateParameter.Boolean(
order = 19,
optional = true,
description =
"If true, rounds the decimal values in json columns to a number that can be stored"
+ " without loss of precision.",
helpText =
"This flag if set, rounds the decimal values in json columns to a number that can be"
+ " stored without loss of precision.")
@Default.Boolean(false)
Boolean getRoundJsonDecimals();
void setRoundJsonDecimals(Boolean value);
@TemplateParameter.Enum(
order = 20,
optional = true,
description = "Run mode - currently supported are : regular or retryDLQ",
enumOptions = {@TemplateEnumOption("regular"), @TemplateEnumOption("retryDLQ")},
helpText = "This is the run mode type, whether regular or with retryDLQ.")
@Default.String("regular")
String getRunMode();
void setRunMode(String value);
@TemplateParameter.GcsReadFile(
order = 21,
optional = true,
helpText =
"Transformation context file path in cloud storage used to populate data used in"
+ " transformations performed during migrations Eg: The shard id to db name to"
+ " identify the db from which a row was migrated",
description = "Transformation context file path in cloud storage")
String getTransformationContextFilePath();
void setTransformationContextFilePath(String value);
@TemplateParameter.Integer(
order = 22,
optional = true,
description = "Directory watch duration in minutes. Default: 10 minutes",
helpText =
"The Duration for which the pipeline should keep polling a directory in GCS. Datastream"
+ "output files are arranged in a directory structure which depicts the timestamp "
+ "of the event grouped by minutes. This parameter should be approximately equal to"
+ "maximum delay which could occur between event occurring in source database and "
+ "the same event being written to GCS by Datastream. 99.9 percentile = 10 minutes")
@Default.Integer(10)
Integer getDirectoryWatchDurationInMinutes();
void setDirectoryWatchDurationInMinutes(Integer value);
@TemplateParameter.Enum(
order = 23,
enumOptions = {
@TemplateEnumOption("LOW"),
@TemplateEnumOption("MEDIUM"),
@TemplateEnumOption("HIGH")
},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of:"
+ " [`HIGH`,`MEDIUM`,`LOW`]. Defaults to `HIGH`.")
@Default.Enum("HIGH")
RpcPriority getSpannerPriority();
void setSpannerPriority(RpcPriority value);
@TemplateParameter.PubsubSubscription(
order = 24,
optional = true,
description =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode.",
helpText =
"The Pub/Sub subscription being used in a Cloud Storage notification policy for DLQ"
+ " retry directory when running in regular mode. For the name, use the format"
+ " `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>`. When set, the"
+ " deadLetterQueueDirectory and dlqRetryMinutes are ignored.")
String getDlqGcsPubSubSubscription();
void setDlqGcsPubSubSubscription(String value);
@TemplateParameter.GcsReadFile(
order = 25,
optional = true,
description = "Custom jar location in Cloud Storage",
helpText =
"Custom JAR file location in Cloud Storage for the file that contains the custom transformation logic for processing records"
+ " in forward migration.")
@Default.String("")
String getTransformationJarPath();
void setTransformationJarPath(String value);
@TemplateParameter.Text(
order = 26,
optional = true,
description = "Custom class name",
helpText =
"Fully qualified class name having the custom transformation logic. It is a"
+ " mandatory field in case transformationJarPath is specified")
@Default.String("")
String getTransformationClassName();
void setTransformationClassName(String value);
@TemplateParameter.Text(
order = 27,
optional = true,
description = "Custom parameters for transformation",
helpText =
"String containing any custom parameters to be passed to the custom transformation class.")
@Default.String("")
String getTransformationCustomParameters();
void setTransformationCustomParameters(String value);
@TemplateParameter.Text(
order = 28,
optional = true,
description = "Filtered events directory",
helpText =
"This is the file path to store the events filtered via custom transformation. Default is a directory"
+ " under the Dataflow job's temp location. The default value is enough under most"
+ " conditions.")
@Default.String("")
String getFilteredEventsDirectory();
void setFilteredEventsDirectory(String value);
@TemplateParameter.GcsReadFile(
order = 29,
optional = true,
helpText =
"Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard."
+ "It is of the format Map<stream_name, Map<db_name, shard_id>>",
description = "Sharding context file path in cloud storage")
String getShardingContextFilePath();
void setShardingContextFilePath(String value);
@TemplateParameter.Text(
order = 30,
optional = true,
description = "Table name overrides from source to spanner",
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example = "[{Singers, Vocalists}, {Albums, Records}]",
helpText =
"These are the table name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]"
+ "This example shows mapping Singers table to Vocalists and Albums table to Records.")
@Default.String("")
String getTableOverrides();
void setTableOverrides(String value);
@TemplateParameter.Text(
order = 31,
optional = true,
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
description = "Column name overrides from source to spanner",
example =
"[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
helpText =
"These are the column name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]"
+ "Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides."
+ "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
@Default.String("")
String getColumnOverrides();
void setColumnOverrides(String value);
@TemplateParameter.Text(
order = 32,
optional = true,
description = "File based overrides from source to spanner",
helpText =
"A file which specifies the table and the column name overrides from source to spanner.")
@Default.String("")
String getSchemaOverridesFilePath();
void setSchemaOverridesFilePath(String value);
@TemplateParameter.Text(
order = 33,
optional = true,
groupName = "Target",
description = "Cloud Spanner Shadow Table Instance Id.",
helpText =
"Optional separate instance for shadow tables. If not specified, shadow tables will be created in the main instance. If specified, ensure shadowTableSpannerDatabaseId is specified as well.")
@Default.String("")
String getShadowTableSpannerInstanceId();
void setShadowTableSpannerInstanceId(String value);
@TemplateParameter.Text(
order = 33,
optional = true,
groupName = "Target",
description = "Cloud Spanner Shadow Table Database Id.",
helpText =
"Optional separate database for shadow tables. If not specified, shadow tables will be created in the main database. If specified, ensure shadowTableSpannerInstanceId is specified as well.")
@Default.String("")
String getShadowTableSpannerDatabaseId();
void setShadowTableSpannerDatabaseId(String value);
@TemplateParameter.Text(
order = 34,
optional = true,
description = "Failure injection parameter",
helpText = "Failure injection parameter. Only used for testing.")
@Default.String("")
String getFailureInjectionParameter();
void setFailureInjectionParameter(String value);
}
private static void validateSourceType(Options options) {
boolean isRetryMode = "retryDLQ".equals(options.getRunMode());
if (isRetryMode) {
// retry mode does not read from Datastream
return;
}
String sourceType = getSourceType(options);
if (!DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES.contains(sourceType)) {
throw new IllegalArgumentException(
"Unsupported source type found: "
+ sourceType
+ ". Specify one of the following source types: "
+ DatastreamConstants.SUPPORTED_DATASTREAM_SOURCES);
}
options.setDatastreamSourceType(sourceType);
}
static String getSourceType(Options options) {
if (options.getDatastreamSourceType() != null) {
return options.getDatastreamSourceType();
}
if (options.getStreamName() == null) {
throw new IllegalArgumentException("Stream name cannot be empty.");
}
GcpOptions gcpOptions = options.as(GcpOptions.class);
DataStreamClient datastreamClient;
SourceConfig sourceConfig;
try {
datastreamClient = new DataStreamClient(gcpOptions.getGcpCredential());
sourceConfig = datastreamClient.getSourceConnectionProfile(options.getStreamName());
} catch (IOException e) {
LOG.error("IOException Occurred: DataStreamClient failed initialization.");
throw new IllegalArgumentException("Unable to initialize DatastreamClient: " + e);
}
// TODO: use getPostgresSourceConfig() instead of an else once SourceConfig.java is updated.
if (sourceConfig.getMysqlSourceConfig() != null) {
return DatastreamConstants.MYSQL_SOURCE_TYPE;
} else if (sourceConfig.getOracleSourceConfig() != null) {
return DatastreamConstants.ORACLE_SOURCE_TYPE;
} else {
return DatastreamConstants.POSTGRES_SOURCE_TYPE;
}
// LOG.error("Source Connection Profile Type Not Supported");
// throw new IllegalArgumentException("Unsupported source connection profile type in
// Datastream");
}
/**
* Main entry point for executing the pipeline.
*
* @param args The command-line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOG.info("Starting DataStream to Cloud Spanner");
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
validateSourceType(options);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
public static PipelineResult run(Options options) {
/*
* Stages:
* 1) Ingest and Normalize Data to FailsafeElement with JSON Strings
* 2) Write JSON Strings to Cloud Spanner
* 3) Write Failures to GCS Dead Letter Queue
*/
Pipeline pipeline = Pipeline.create(options);
DeadLetterQueueManager dlqManager = buildDlqManager(options);
// Ingest session file into schema object.
Schema schema = SessionFileReader.read(options.getSessionFilePath());
/*
* Stage 1: Ingest/Normalize Data to FailsafeElement with JSON Strings and
* read Cloud Spanner information schema.
* a) Prepare spanner config and process information schema
* b) Read DataStream data from GCS into JSON String FailsafeElements
* c) Reconsume Dead Letter Queue data from GCS into JSON String FailsafeElements
* d) Flatten DataStream and DLQ Streams
*/
// Prepare Spanner config
SpannerConfig spannerConfig =
SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withInstanceId(ValueProvider.StaticValueProvider.of(options.getInstanceId()))
.withDatabaseId(ValueProvider.StaticValueProvider.of(options.getDatabaseId()))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
.withCommitRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
SpannerConfig shadowTableSpannerConfig = getShadowTableSpannerConfig(options);
/* Process information schema
* 1) Read information schema from destination Cloud Spanner database
* 2) Check if shadow tables are present and create if necessary
* 3) Return new information schema
*/
PCollectionTuple ddlTuple =
pipeline.apply(
"Process Information Schema",
new ProcessInformationSchema(
spannerConfig,
shadowTableSpannerConfig,
options.getShouldCreateShadowTables(),
options.getShadowTablePrefix(),
options.getDatastreamSourceType()));
PCollectionView<Ddl> ddlView =
ddlTuple
.get(ProcessInformationSchema.MAIN_DDL_TAG)
.apply("Cloud Spanner Main DDL as view", View.asSingleton());
PCollectionView<Ddl> shadowTableDdlView =
ddlTuple
.get(ProcessInformationSchema.SHADOW_TABLE_DDL_TAG)
.apply("Cloud Spanner shadow tables DDL as view", View.asSingleton());
PCollection<FailsafeElement<String, String>> jsonRecords = null;
// Elements sent to the Dead Letter Queue are to be reconsumed.
// A DLQManager is to be created using PipelineOptions, and it is in charge
// of building pieces of the DLQ.
PCollectionTuple reconsumedElements = null;
boolean isRegularMode = "regular".equals(options.getRunMode());
if (isRegularMode && (!Strings.isNullOrEmpty(options.getDlqGcsPubSubSubscription()))) {
reconsumedElements =
dlqManager.getReconsumerDataTransformForFiles(
pipeline.apply(
"Read retry from PubSub",
new PubSubNotifiedDlqIO(
options.getDlqGcsPubSubSubscription(),
// file paths to ignore when re-consuming for retry
new ArrayList<String>(
Arrays.asList("/severe/", "/tmp_retry", "/tmp_severe/", ".temp")))));
} else {
reconsumedElements =
dlqManager.getReconsumerDataTransform(
pipeline.apply(dlqManager.dlqReconsumer(options.getDlqRetryMinutes())));
}
PCollection<FailsafeElement<String, String>> dlqJsonRecords =
reconsumedElements
.get(DeadLetterQueueManager.RETRYABLE_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
if (isRegularMode) {
LOG.info("Regular Datastream flow");
PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
pipeline.apply(
new DataStreamIO(
options.getStreamName(),
options.getInputFilePattern(),
options.getInputFileFormat(),
options.getGcsPubSubSubscription(),
options.getRfcStartDateTime())
.withFileReadConcurrency(options.getFileReadConcurrency())
.withoutDatastreamRecordsReshuffle()
.withDirectoryWatchDuration(
Duration.standardMinutes(options.getDirectoryWatchDurationInMinutes())));
int maxNumWorkers = options.getMaxNumWorkers() != 0 ? options.getMaxNumWorkers() : 1;
jsonRecords =
PCollectionList.of(datastreamJsonRecords)
.and(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply(
"Reshuffle",
Reshuffle.<FailsafeElement<String, String>>viaRandomKey()
.withNumBuckets(
maxNumWorkers * DatastreamToSpannerConstants.MAX_DOFN_PER_WORKER));
} else {
LOG.info("DLQ retry flow");
jsonRecords =
PCollectionList.of(dlqJsonRecords)
.apply(Flatten.pCollections())
.apply("Reshuffle", Reshuffle.viaRandomKey());
}
/*
* Stage 2: Transform records
*/
// Ingest transformation context file into memory.
TransformationContext transformationContext =
TransformationContextReader.getTransformationContext(
options.getTransformationContextFilePath());
// Ingest sharding context file into memory.
ShardingContext shardingContext =
ShardingContextReader.getShardingContext(options.getShardingContextFilePath());
CustomTransformation customTransformation =
CustomTransformation.builder(
options.getTransformationJarPath(), options.getTransformationClassName())
.setCustomParameters(options.getTransformationCustomParameters())
.build();
// Create the overrides mapping.
ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);
ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema,
schemaOverridesParser,
transformationContext,
shardingContext,
options.getDatastreamSourceType(),
customTransformation,
options.getRoundJsonDecimals(),
ddlView,
spannerConfig);
PCollectionTuple transformedRecords =
jsonRecords.apply(
"Apply Transformation to events",
ParDo.of(changeEventTransformerDoFn)
.withSideInputs(ddlView)
.withOutputTags(
DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG,
TupleTagList.of(
Arrays.asList(
DatastreamToSpannerConstants.FILTERED_EVENT_TAG,
DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))));
/*
* Stage 3: Write filtered records to GCS
*/
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String filterEventsDirectory =
options.getFilteredEventsDirectory().isEmpty()
? tempLocation + "filteredEvents/"
: options.getFilteredEventsDirectory();
LOG.info("Filtered events directory: {}", filterEventsDirectory);
transformedRecords
.get(DatastreamToSpannerConstants.FILTERED_EVENT_TAG)
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(
"Write Filtered Events To GCS",
TextIO.write().to(filterEventsDirectory).withSuffix(".json").withWindowedWrites());
spannerConfig =
SpannerServiceFactoryImpl.createSpannerService(
spannerConfig, options.getFailureInjectionParameter());
/*
* Stage 4: Write transformed records to Cloud Spanner
*/
SpannerTransactionWriter.Result spannerWriteResults =
transformedRecords
.get(DatastreamToSpannerConstants.TRANSFORMED_EVENT_TAG)
.apply(
"Write events to Cloud Spanner",
new SpannerTransactionWriter(
spannerConfig,
shadowTableSpannerConfig,
ddlView,
shadowTableDdlView,
options.getShadowTablePrefix(),
options.getDatastreamSourceType(),
isRegularMode));
/*
* Stage 5: Write failures to GCS Dead Letter Queue
* a) Retryable errors are written to retry GCS Dead letter queue
* b) Severe errors are written to severe GCS Dead letter queue
*/
// We will write only the original payload from the failsafe event to the DLQ. We are doing
// that in
// StringDeadLetterQueueSanitizer.
spannerWriteResults
.retryableErrors()
.apply(
"DLQ: Write retryable Failures to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getRetryDlqDirectoryWithDateTime())
.withTmpDirectory(options.getDeadLetterQueueDirectory() + "/tmp_retry/")
.setIncludePaneInfo(true)
.build());
PCollection<FailsafeElement<String, String>> dlqErrorRecords =
reconsumedElements
.get(DeadLetterQueueManager.PERMANENT_ERRORS)
.setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
// TODO: Write errors from transformer and spanner writer into separate folders
PCollection<FailsafeElement<String, String>> permanentErrors =
PCollectionList.of(dlqErrorRecords)
.and(spannerWriteResults.permanentErrors())
.and(transformedRecords.get(DatastreamToSpannerConstants.PERMANENT_ERROR_TAG))
.apply(Flatten.pCollections());
// increment the metrics
permanentErrors
.apply("Update metrics", ParDo.of(new MetricUpdaterDoFn(isRegularMode)))
.apply(
"DLQ: Write Severe errors to GCS",
MapElements.via(new StringDeadLetterQueueSanitizer()))
.setCoder(StringUtf8Coder.of())
.apply(
"Write To DLQ",
DLQWriteTransform.WriteDLQ.newBuilder()
.withDlqDirectory(dlqManager.getSevereDlqDirectoryWithDateTime())
.withTmpDirectory((options).getDeadLetterQueueDirectory() + "/tmp_severe/")
.setIncludePaneInfo(true)
.build());
// Execute the pipeline and return the result.
return pipeline.run();
}
static SpannerConfig getShadowTableSpannerConfig(Options options) {
// Validate shadow table Spanner config - both instance and database must be specified together
String shadowTableSpannerInstanceId = options.getShadowTableSpannerInstanceId();
String shadowTableSpannerDatabaseId = options.getShadowTableSpannerDatabaseId();
LOG.info(
"Input Shadow table db - instance {} and database {}",
shadowTableSpannerInstanceId,
shadowTableSpannerDatabaseId);
if ((Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& !Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))
|| (!Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& Strings.isNullOrEmpty(shadowTableSpannerDatabaseId))) {
throw new IllegalArgumentException(
"Both shadowTableSpannerInstanceId and shadowTableSpannerDatabaseId must be specified together");
}
// If not specified, use main instance and main database values. The shadow table database
// stores the shadow tables and by default, is the same as the main database for backward
// compatibility.
if (Strings.isNullOrEmpty(shadowTableSpannerInstanceId)
&& Strings.isNullOrEmpty(shadowTableSpannerDatabaseId)) {
shadowTableSpannerInstanceId = options.getInstanceId();
shadowTableSpannerDatabaseId = options.getDatabaseId();
LOG.info(
"Overwrote shadow table instance - {} and db- {}",
shadowTableSpannerInstanceId,
shadowTableSpannerDatabaseId);
}
return SpannerConfig.create()
.withProjectId(ValueProvider.StaticValueProvider.of(options.getProjectId()))
.withHost(ValueProvider.StaticValueProvider.of(options.getSpannerHost()))
.withInstanceId(ValueProvider.StaticValueProvider.of(shadowTableSpannerInstanceId))
.withDatabaseId(ValueProvider.StaticValueProvider.of(shadowTableSpannerDatabaseId))
.withRpcPriority(ValueProvider.StaticValueProvider.of(options.getSpannerPriority()))
.withCommitRetrySettings(
RetrySettings.newBuilder()
.setTotalTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setInitialRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setRetryDelayMultiplier(1)
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(0))
.setInitialRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setRpcTimeoutMultiplier(1)
.setMaxRpcTimeout(org.threeten.bp.Duration.ofMinutes(4))
.setMaxAttempts(1)
.build());
}
private static DeadLetterQueueManager buildDlqManager(Options options) {
String tempLocation =
options.as(DataflowPipelineOptions.class).getTempLocation().endsWith("/")
? options.as(DataflowPipelineOptions.class).getTempLocation()
: options.as(DataflowPipelineOptions.class).getTempLocation() + "/";
String dlqDirectory =
options.getDeadLetterQueueDirectory().isEmpty()
? tempLocation + "dlq/"
: options.getDeadLetterQueueDirectory();
LOG.info("Dead-letter queue directory: {}", dlqDirectory);
options.setDeadLetterQueueDirectory(dlqDirectory);
if ("regular".equals(options.getRunMode())) {
return DeadLetterQueueManager.create(dlqDirectory, options.getDlqMaxRetryCount());
} else {
String retryDlqUri =
FileSystems.matchNewResource(dlqDirectory, true)
.resolve("severe", StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
LOG.info("Dead-letter retry directory: {}", retryDlqUri);
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
}
}
static ISchemaOverridesParser configureSchemaOverrides(Options options) {
// incorrect configuration
if (!options.getSchemaOverridesFilePath().isEmpty()
&& (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty())) {
throw new IllegalArgumentException(
"Only one of file based or string based overrides must be configured! Please correct the configuration and re-run the job");
}
// string based overrides
if (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty()) {
Map<String, String> userOptionsOverrides = new HashMap<>();
if (!options.getTableOverrides().isEmpty()) {
userOptionsOverrides.put("tableOverrides", options.getTableOverrides());
}
if (!options.getColumnOverrides().isEmpty()) {
userOptionsOverrides.put("columnOverrides", options.getColumnOverrides());
}
return new SchemaStringOverridesParser(userOptionsOverrides);
}
// file based overrides
if (!options.getSchemaOverridesFilePath().isEmpty()) {
return new SchemaFileOverridesParser(options.getSchemaOverridesFilePath());
}
// no overrides
return new NoopSchemaOverridesParser();
}
}
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。