Google Cloud to Neo4j 템플릿을 사용하면 Dataflow 작업을 통해 데이터 세트를 Neo4j 데이터베이스로 가져오고 Cloud Storage 버킷에 호스팅된 CSV 파일의 데이터를 가져올 수 있습니다. 또한 다양한 가져오기 단계에서 데이터를 조작하고 변환할 수 있습니다. 최초 가져오기 및 증분 가져오기 모두에 이 템플릿을 사용할 수 있습니다.
파이프라인 요구사항
- 실행 중인 Neo4j 인스턴스
- Cloud Storage 버킷
- CSV 파일 형식으로 가져올 데이터 세트
- 사용할 작업 사양 파일
작업 사양 파일 만들기
작업 사양 파일은 다음 섹션이 있는 JSON 객체로 구성됩니다.
config
: 가져오기 수행 방식에 영향을 미치는 전역 플래그입니다.sources
: 데이터 소스 정의입니다(관계형).targets
: 데이터 대상 정의입니다(그래프: 노드/관계).actions
: 로드 전/로드 후 작업입니다.
자세한 내용은 Neo4j 문서의 작업 사양 파일 만들기를 참조하세요.
템플릿 매개변수
필수 매개변수
- jobSpecUri: 소스 및 대상 메타데이터의 구성이 포함된 작업 사양 파일의 경로입니다.
선택적 매개변수
- neo4jConnectionUri: Neo4j 연결 메타데이터 JSON 파일의 경로입니다.
- neo4jConnectionSecretId: Neo4j 연결 메타데이터의 보안 비밀 ID입니다. GCS 경로 옵션 대신 사용할 수 있습니다.
- optionsJson: JSON 옵션입니다. 런타임 토큰을 사용합니다. (예: {token1:value1,token2:value2}). 기본값은 빈 값입니다.
- readQuery: SQL 쿼리를 재정의합니다. 기본값은 빈 값입니다.
- inputFilePattern: 텍스트 파일 패턴을 재정의합니다(예: gs://your-bucket/path/*.json). 기본값은 빈 값입니다.
- disabledAlgorithms: 사용 중지할 쉼표로 구분된 알고리즘입니다. 이 값을 없음으로 설정하면 알고리즘이 사용 중지되지 않습니다. 기본적으로 중지된 알고리즘은 취약점 또는 성능 문제를 일으킬 수 있으므로 이 매개변수를 사용할 때는 주의해야 합니다. (예: SSLv3, RC4).
- extraFilesToStage: 작업자에 스테이징할 파일의 쉼표로 구분된 Cloud Storage 경로 또는 Secret Manager 보안 비밀입니다. 이러한 파일은 각 작업자의 /extra_files 디렉터리에 저장됩니다. (예: gs://
템플릿 실행
콘솔
- Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다. 템플릿에서 작업 만들기로 이동
- 작업 이름 필드에 고유한 작업 이름을 입력합니다.
- (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은
us-central1
입니다.Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.
- Dataflow 템플릿 드롭다운 메뉴에서 the Google Cloud to Neo4j template을 선택합니다.
- 제공된 매개변수 필드에 매개변수 값을 입력합니다.
- 작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행합니다.
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Google_Cloud_to_Neo4j \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ jobSpecUri=JOB_SPEC_URI,\ neo4jConnectionUri=NEO4J_CONNECTION_URI,\
다음을 바꿉니다.
JOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
REGION_NAME
: Dataflow 작업을 배포할 리전(예:us-central1
)JOB_SPEC_URI
: 작업 사양 파일의 경로NEO4J_CONNECTION_URI
: Neo4j 연결 메타데이터의 경로
API
REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch
를 참조하세요.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "jobSpecUri": "JOB_SPEC_URI", "neo4jConnectionUri": "NEO4J_CONNECTION_URI", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Google_Cloud_to_Neo4j", "environment": { "maxWorkers": "10" } } }
다음을 바꿉니다.
PROJECT_ID
: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 IDJOB_NAME
: 선택한 고유한 작업 이름VERSION
: 사용할 템플릿 버전다음 값을 사용할 수 있습니다.
latest
: 버킷의 날짜가 지정되지 않은 상위 폴더(gs://dataflow-templates-REGION_NAME/latest/)에서 사용할 수 있는 최신 버전의 템플릿을 사용합니다.- 버전 이름(예:
2023-09-12-00_RC00
): 버킷의 날짜가 지정된 해당 상위 폴더(gs://dataflow-templates-REGION_NAME/)에 중첩되어 있는 특정 버전의 템플릿을 사용합니다.
LOCATION
: Dataflow 작업을 배포할 리전(예:us-central1
)JOB_SPEC_URI
: 작업 사양 파일의 경로NEO4J_CONNECTION_URI
: Neo4j 연결 메타데이터의 경로
템플릿 소스 코드
Java
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.neo4j.templates;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.Template.AdditionalDocumentationBlock;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.neo4j.actions.ActionDoFnFactory;
import com.google.cloud.teleport.v2.neo4j.actions.ActionPreloadFactory;
import com.google.cloud.teleport.v2.neo4j.actions.preload.PreloadAction;
import com.google.cloud.teleport.v2.neo4j.database.Neo4jConnection;
import com.google.cloud.teleport.v2.neo4j.model.InputValidator;
import com.google.cloud.teleport.v2.neo4j.model.Json;
import com.google.cloud.teleport.v2.neo4j.model.Json.ParsingResult;
import com.google.cloud.teleport.v2.neo4j.model.connection.ConnectionParams;
import com.google.cloud.teleport.v2.neo4j.model.enums.ArtifactType;
import com.google.cloud.teleport.v2.neo4j.model.helpers.JobSpecMapper;
import com.google.cloud.teleport.v2.neo4j.model.helpers.OptionsParamsMapper;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetQuerySpec.TargetQuerySpecBuilder;
import com.google.cloud.teleport.v2.neo4j.model.helpers.TargetSequence;
import com.google.cloud.teleport.v2.neo4j.model.job.ActionContext;
import com.google.cloud.teleport.v2.neo4j.model.job.OptionsParams;
import com.google.cloud.teleport.v2.neo4j.options.Neo4jFlexTemplateOptions;
import com.google.cloud.teleport.v2.neo4j.providers.Provider;
import com.google.cloud.teleport.v2.neo4j.providers.ProviderFactory;
import com.google.cloud.teleport.v2.neo4j.transforms.Neo4jRowWriterTransform;
import com.google.cloud.teleport.v2.neo4j.utils.BeamBlock;
import com.google.cloud.teleport.v2.neo4j.utils.FileSystemUtils;
import com.google.cloud.teleport.v2.neo4j.utils.ModelUtils;
import com.google.cloud.teleport.v2.neo4j.utils.ProcessingCoder;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.neo4j.importer.v1.Configuration;
import org.neo4j.importer.v1.ImportSpecification;
import org.neo4j.importer.v1.actions.Action;
import org.neo4j.importer.v1.actions.ActionStage;
import org.neo4j.importer.v1.sources.Source;
import org.neo4j.importer.v1.targets.CustomQueryTarget;
import org.neo4j.importer.v1.targets.NodeTarget;
import org.neo4j.importer.v1.targets.RelationshipTarget;
import org.neo4j.importer.v1.targets.Target;
import org.neo4j.importer.v1.targets.TargetType;
import org.neo4j.importer.v1.targets.Targets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which reads Google Cloud data (Text, BigQuery) and writes it to Neo4j.
*
* <p>In case of BigQuery, the source data can be either a table or a SQL query.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-neo4j/README_Google_Cloud_to_Neo4j.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Google_Cloud_to_Neo4j",
category = TemplateCategory.BATCH,
displayName = "Google Cloud to Neo4j",
description =
"The Google Cloud to Neo4j template lets you import a dataset into a Neo4j database through a Dataflow job, "
+ "sourcing data from CSV files hosted in Google Cloud Storage buckets. It also lets you to manipulate and transform the data "
+ "at various steps of the import. You can use the template for both first-time imports and incremental imports.",
optionsClass = Neo4jFlexTemplateOptions.class,
flexContainerName = "googlecloud-to-neo4j",
contactInformation = "https://support.neo4j.com/",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/google-cloud-to-neo4j",
requirements = {
"A running Neo4j instance",
"A Google Cloud Storage bucket",
"A dataset to import, in the form of CSV files",
"A job specification file to use"
},
additionalDocumentation = {
@AdditionalDocumentationBlock(
name = "Create a job specification file",
content = {
"The job specification file consists of a JSON object with the following sections:\n"
+ "- `config` - global flags affecting how the import is performed.\n"
+ "- `sources` - data source definitions (relational).\n"
+ "- `targets` - data target definitions (graph: nodes/relationships/custom queries).\n"
+ "- `actions` - pre/post-load actions.\n"
+ "For more information, see <a href=\"https://neo4j.com/docs/dataflow-google-cloud/job-specification/\" class=\"external\">Create a job specification file</a> in the Neo4j documentation."
})
},
preview = true)
public class GoogleCloudToNeo4j {
private static final Logger LOG = LoggerFactory.getLogger(GoogleCloudToNeo4j.class);
private final OptionsParams optionsParams;
private final ConnectionParams neo4jConnection;
private final ImportSpecification importSpecification;
private final Configuration globalSettings;
private final Pipeline pipeline;
private final String templateVersion;
private final TargetSequence targetSequence = new TargetSequence();
/**
* Main class for template. Initializes job using run-time on pipelineOptions.
*
* @param pipelineOptions framework supplied arguments
*/
public GoogleCloudToNeo4j(Neo4jFlexTemplateOptions pipelineOptions) {
////////////////////////////
// Job name gets a date on it when running within the container, but not with DirectRunner
// final String jobName = pipelineOptions.getJobName() + "-" + System.currentTimeMillis();
// pipelineOptions.setJobName(jobName);
// Set pipeline options
this.pipeline = Pipeline.create(pipelineOptions);
FileSystems.setDefaultPipelineOptions(pipelineOptions);
this.optionsParams = OptionsParamsMapper.fromPipelineOptions(pipelineOptions);
// Validate pipeline
processValidations(
"Errors found validating pipeline options: ",
InputValidator.validateNeo4jPipelineOptions(pipelineOptions));
this.templateVersion = readTemplateVersion(pipelineOptions);
String neo4jConnectionJson = readConnectionSettings(pipelineOptions);
ParsingResult parsingResult = InputValidator.validateNeo4jConnection(neo4jConnectionJson);
if (!parsingResult.isSuccessful()) {
processValidations(
"Errors found validating Neo4j connection: ",
parsingResult.formatErrors("Could not validate connection JSON"));
}
this.neo4jConnection = Json.map(parsingResult, ConnectionParams.class);
this.importSpecification = JobSpecMapper.parse(pipelineOptions.getJobSpecUri(), optionsParams);
globalSettings = importSpecification.getConfiguration();
///////////////////////////////////
// Source specific validations
for (Source source : importSpecification.getSources()) {
// get provider implementation for source
Provider providerImpl = ProviderFactory.of(source, targetSequence);
providerImpl.configure(optionsParams);
}
}
private static String readTemplateVersion(Neo4jFlexTemplateOptions options) {
Map<String, String> labels = options.as(DataflowPipelineOptions.class).getLabels();
String defaultVersion = "UNKNOWN";
if (labels == null) {
return defaultVersion;
}
return labels.getOrDefault("goog-dataflow-provided-template-version", defaultVersion);
}
private static String readConnectionSettings(Neo4jFlexTemplateOptions options) {
String secretId = options.getNeo4jConnectionSecretId();
if (StringUtils.isNotEmpty(secretId)) {
return SecretManagerUtils.getSecret(secretId);
}
String uri = options.getNeo4jConnectionUri();
try {
return FileSystemUtils.getPathContents(uri);
} catch (Exception e) {
throw new RuntimeException(
String.format("Unable to read Neo4j configuration at URI %s: ", uri), e);
}
}
/**
* Runs a pipeline which reads data from various sources and writes it to Neo4j.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Neo4jFlexTemplateOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Neo4jFlexTemplateOptions.class);
// Allow users to supply their own list of disabled algorithms if necessary
if (StringUtils.isNotBlank(options.getDisabledAlgorithms())) {
options.setDisabledAlgorithms(
"SSLv3, RC4, DES, MD5withRSA, DH keySize < 1024, EC keySize < 224, 3DES_EDE_CBC, anon,"
+ " NULL");
}
LOG.info("Job: {}", options.getJobSpecUri());
GoogleCloudToNeo4j template = new GoogleCloudToNeo4j(options);
template.run();
}
/** Raises RuntimeExceptions for validation errors. */
private void processValidations(String description, List<String> validationMessages) {
StringBuilder sb = new StringBuilder();
if (!validationMessages.isEmpty()) {
for (String msg : validationMessages) {
sb.append(msg);
sb.append(System.lineSeparator());
}
throw new RuntimeException(description + " " + sb);
}
}
public void run() {
try (Neo4jConnection directConnect =
new Neo4jConnection(this.neo4jConnection, this.templateVersion)) {
boolean resetDb = globalSettings.get(Boolean.class, "reset_db").orElse(false);
if (!resetDb) {
directConnect.verifyConnectivity();
} else {
directConnect.resetDatabase();
}
}
////////////////////////////
// If an action transformation has no upstream PCollection, it will use this default context
PCollection<Row> defaultActionContext =
pipeline.apply(
"Default Context",
Create.empty(TypeDescriptor.of(Row.class)).withCoder(ProcessingCoder.of()));
var processingQueue = new BeamBlock(defaultActionContext);
runPreloadActions(findActionsByStage(ActionStage.START).collect(toList()));
Map<ActionStage, List<PCollection<?>>> preActionRows =
findActionsByStages(
Set.of(
ActionStage.PRE_NODES, ActionStage.PRE_RELATIONSHIPS, ActionStage.PRE_QUERIES))
.map(action -> Map.entry(action.getStage(), runAction(action, defaultActionContext)))
.collect(
groupingBy(
Entry::getKey, mapping(Entry::getValue, Collectors.<PCollection<?>>toList())));
var sourceRows = new ArrayList<PCollection<?>>(importSpecification.getSources().size());
var targetRows = new HashMap<TargetType, List<PCollection<?>>>(targetCount());
var allActiveNodeTargets =
importSpecification.getTargets().getNodes().stream()
.filter(Target::isActive)
.collect(toList());
////////////////////////////
// Process sources
for (var source : importSpecification.getSources()) {
// get provider implementation for source
Provider provider = ProviderFactory.of(source, targetSequence);
provider.configure(optionsParams);
PCollection<Row> sourceMetadata =
pipeline.apply(
String.format("Metadata for source %s", source.getName()), provider.queryMetadata());
sourceRows.add(sourceMetadata);
Schema sourceBeamSchema = sourceMetadata.getSchema();
processingQueue.addToQueue(
ArtifactType.source, false, source.getName(), defaultActionContext, sourceMetadata);
PCollection<Row> nullableSourceBeamRows = null;
////////////////////////////
// Optimization: if single source query, reuse this PCollection rather than write it again
boolean targetsHaveTransforms = ModelUtils.targetsHaveTransforms(importSpecification, source);
if (!targetsHaveTransforms || !provider.supportsSqlPushDown()) {
nullableSourceBeamRows =
pipeline
.apply("Query " + source.getName(), provider.querySourceBeamRows(sourceBeamSchema))
.setRowSchema(sourceBeamSchema);
}
String sourceName = source.getName();
////////////////////////////
// Optimization: if we're not mixing nodes and edges, then run in parallel
// For relationship updates, max workers should be max 2. This parameter is job configurable.
////////////////////////////
// No optimization possible so write nodes then edges.
// Write node targets
List<NodeTarget> nodeTargets =
getActiveTargetsBySourceAndType(importSpecification, sourceName, TargetType.NODE);
for (NodeTarget target : nodeTargets) {
TargetQuerySpec targetQuerySpec =
new TargetQuerySpecBuilder()
.sourceBeamSchema(sourceBeamSchema)
.nullableSourceRows(nullableSourceBeamRows)
.target(target)
.build();
String nodeStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ source.getName()
+ "->"
+ target.getName()
+ " nodes";
PCollection<Row> preInsertBeamRows =
pipeline.apply(
"Query " + nodeStepDescription, provider.queryTargetBeamRows(targetQuerySpec));
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_NODES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(target.getDependencies(), nodeStepDescription));
PCollection<Row> blockingReturn =
preInsertBeamRows
.apply(
"** Unblocking "
+ nodeStepDescription
+ "(after "
+ String.join(", ", target.getDependencies())
+ " and pre-nodes actions)",
Wait.on(dependencies))
.setCoder(preInsertBeamRows.getCoder())
.apply(
"Writing " + nodeStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(preInsertBeamRows.getCoder());
targetRows
.computeIfAbsent(TargetType.NODE, (type) -> new ArrayList<>(nodeTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(
ArtifactType.node, false, target.getName(), blockingReturn, preInsertBeamRows);
}
////////////////////////////
// Write relationship targets
List<RelationshipTarget> relationshipTargets =
getActiveTargetsBySourceAndType(importSpecification, sourceName, TargetType.RELATIONSHIP);
for (var target : relationshipTargets) {
var targetQuerySpec =
new TargetQuerySpecBuilder()
.nullableSourceRows(nullableSourceBeamRows)
.sourceBeamSchema(sourceBeamSchema)
.target(target)
.startNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getStartNodeReference()))
.endNodeTarget(
findNodeTargetByName(allActiveNodeTargets, target.getEndNodeReference()))
.build();
PCollection<Row> preInsertBeamRows;
String relationshipStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ source.getName()
+ "->"
+ target.getName()
+ " edges";
if (ModelUtils.targetHasTransforms(target)) {
preInsertBeamRows =
pipeline.apply(
"Query " + relationshipStepDescription,
provider.queryTargetBeamRows(targetQuerySpec));
} else {
preInsertBeamRows = nullableSourceBeamRows;
}
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_RELATIONSHIPS, List.of()));
Set<String> dependencyNames = new LinkedHashSet<>(target.getDependencies());
dependencyNames.add(target.getStartNodeReference());
dependencyNames.add(target.getEndNodeReference());
dependencies.add(
processingQueue.waitOnCollections(dependencyNames, relationshipStepDescription));
PCollection<Row> blockingReturn =
preInsertBeamRows
.apply(
"** Unblocking "
+ relationshipStepDescription
+ "(after "
+ String.join(", ", dependencyNames)
+ " and pre-relationships actions)",
Wait.on(dependencies))
.setCoder(preInsertBeamRows.getCoder())
.apply(
"Writing " + relationshipStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(preInsertBeamRows.getCoder());
targetRows
.computeIfAbsent(
TargetType.RELATIONSHIP, (type) -> new ArrayList<>(relationshipTargets.size()))
.add(blockingReturn);
// serialize relationships
processingQueue.addToQueue(
ArtifactType.edge, false, target.getName(), blockingReturn, preInsertBeamRows);
}
////////////////////////////
// Custom query targets
List<CustomQueryTarget> customQueryTargets =
getActiveTargetsBySourceAndType(importSpecification, sourceName, TargetType.QUERY);
for (Target target : customQueryTargets) {
String customQueryStepDescription =
targetSequence.getSequenceNumber(target)
+ ": "
+ source.getName()
+ "->"
+ target.getName()
+ " (custom query)";
List<PCollection<?>> dependencies =
new ArrayList<>(preActionRows.getOrDefault(ActionStage.PRE_QUERIES, List.of()));
dependencies.add(
processingQueue.waitOnCollections(
target.getDependencies(), customQueryStepDescription));
PCollection<Row> blockingReturn =
nullableSourceBeamRows
.apply(
"** Unblocking "
+ customQueryStepDescription
+ "(after "
+ String.join(", ", target.getDependencies())
+ ")",
Wait.on(dependencies))
.setCoder(nullableSourceBeamRows.getCoder())
.apply(
"Writing " + customQueryStepDescription,
new Neo4jRowWriterTransform(
importSpecification,
neo4jConnection,
templateVersion,
targetSequence,
target))
.setCoder(nullableSourceBeamRows.getCoder());
targetRows
.computeIfAbsent(TargetType.QUERY, (type) -> new ArrayList<>(customQueryTargets.size()))
.add(blockingReturn);
processingQueue.addToQueue(
ArtifactType.custom_query,
false,
target.getName(),
blockingReturn,
nullableSourceBeamRows);
}
}
// Process POST-* actions, gather outputs and run END actions
List<PCollection<?>> endActionDependencies =
findActionsByStage(ActionStage.POST_SOURCES)
.map(action -> runAction(action, defaultActionContext, sourceRows))
.collect(Collectors.toCollection(ArrayList::new));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_NODES)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.NODE, List.of())))
.collect(toList()));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_RELATIONSHIPS)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.RELATIONSHIP, List.of())))
.collect(toList()));
endActionDependencies.addAll(
findActionsByStage(ActionStage.POST_QUERIES)
.map(
action ->
runAction(
action,
defaultActionContext,
targetRows.getOrDefault(TargetType.QUERY, List.of())))
.collect(toList()));
findActionsByStage(ActionStage.END)
.map(action -> runAction(action, defaultActionContext, endActionDependencies))
.forEach(GoogleCloudToNeo4j::noOp);
// For a Dataflow Flex Template, do NOT waitUntilFinish().
pipeline.run();
}
private PCollection<Row> runAction(Action action, PCollection<Row> defaultActionContext) {
return runAction(action, defaultActionContext, List.of());
}
private PCollection<Row> runAction(
Action action, PCollection<Row> defaultActionContext, List<PCollection<?>> dependencies) {
var actionName = action.getName();
return pipeline
.apply(String.format("** Setup %s", actionName), Create.of(1))
.apply(
String.format("** Wait on %s dependencies", action.getStage()), Wait.on(dependencies))
.setCoder(VarIntCoder.of())
.apply(
String.format("Running action %s", actionName),
ParDo.of(ActionDoFnFactory.of(newActionContext(action))))
.setCoder(defaultActionContext.getCoder());
}
private Stream<Action> findActionsByStage(ActionStage stage) {
return findActionsByStages(Set.of(stage));
}
private Stream<Action> findActionsByStages(Set<ActionStage> stages) {
return importSpecification.getActions().stream()
.filter(action -> stages.contains(action.getStage()));
}
private void runPreloadActions(List<Action> actions) {
for (Action action : actions) {
LOG.debug("Executing START action: {}", action.getName());
// Get targeted execution context
ActionContext context = new ActionContext(action, neo4jConnection, templateVersion);
PreloadAction actionImpl = ActionPreloadFactory.of(action, context);
List<String> msgs = actionImpl.execute();
for (String msg : msgs) {
LOG.info("START action {} output: {}", action.getName(), msg);
}
}
}
@NotNull
private ActionContext newActionContext(Action action) {
return new ActionContext(action, this.neo4jConnection, this.templateVersion);
}
private static NodeTarget findNodeTargetByName(List<NodeTarget> nodes, String reference) {
return nodes.stream()
.filter(target -> reference.equals(target.getName()))
.findFirst()
.orElseThrow(
() -> new IllegalArgumentException("Could not find active node target: " + reference));
}
@SuppressWarnings("unchecked")
private <T extends Target> List<T> getActiveTargetsBySourceAndType(
ImportSpecification importSpecification, String sourceName, TargetType targetType) {
Targets targets = importSpecification.getTargets();
return targets.getAll().stream()
.filter(
target ->
target.getTargetType() == targetType
&& target.isActive()
&& sourceName.equals(target.getSource()))
.map(target -> (T) target)
.collect(toList());
}
private static <T> void noOp(T item) {}
private int targetCount() {
var targets = this.importSpecification.getTargets();
return targets.getNodes().size()
+ targets.getRelationships().size()
+ targets.getCustomQueries().size();
}
}
다음 단계
- Dataflow 템플릿 알아보기
- Google 제공 템플릿 목록 참조