Dataflow를 사용하여 데이터 가져오기, 내보내기, 수정

Dataflow는 데이터를 변환하고 강화하는 관리형 서비스입니다. Spanner용 Dataflow 커넥터를 사용하면 Dataflow 파이프라인에서 Spanner의 데이터를 읽거나 쓸 수 있으며, 원하는 경우 데이터를 변환하거나 수정할 수 있습니다. Spanner와 다른 Google Cloud 제품 간에 데이터를 전송하는 파이프라인을 만들 수도 있습니다.

Dataflow 커넥터는 Spanner 안팎으로 대량의 데이터를 효율적으로 이동하고, 테이블 이동, JOIN이 필요한 일괄 삭제 등과 같이 Partitioned DML에서 지원되지 않는 데이터베이스에 대한 대규모 변환을 수행하는 데 권장되는 방법입니다. 개별 데이터베이스 작업 시에는 다음과 같은 다른 방법으로 데이터를 가져오고 내보낼 수 있습니다.

  • Google Cloud 콘솔을 사용하여 Spanner의 개별 데이터베이스를 Avro 형식으로 Cloud Storage로 내보냅니다.
  • Google Cloud 콘솔을 사용하여 Cloud Storage로 내보낸 파일에서 Spanner로 데이터베이스를 다시 가져옵니다.
  • REST API 또는 Google Cloud CLI를 사용하여 Spanner와 Cloud Storage 간에 및 그 반대로 내보내기 작업이나 가져오기 작업을 실행합니다(Avro 형식도 사용).

Spanner용 Dataflow 커넥터는 Apache Beam 자바 SDK의 일부이며, 위의 작업을 수행하기 위한 API를 제공합니다. PCollection 객체 및 변환과 같이 아래에서 설명하는 일부 개념에 대한 자세한 내용은 Apache Beam 프로그래밍 가이드를 참조하세요.

Maven 프로젝트에 커넥터 추가

Google Cloud Dataflow 커넥터를 Maven 프로젝트에 추가하려면 beam-sdks-java-io-google-cloud-platform Maven 아티팩트를 pom.xml 파일에 종속 항목으로 추가합니다.

예를 들어 pom.xml 파일이 beam.version을 적절한 버전 번호로 설정했다고 가정하면 다음 종속 항목을 추가합니다.

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Spanner에서 데이터 읽기

Spanner에서 읽으려면 SpannerIO.read() 변환을 적용합니다. SpannerIO.Read 클래스의 메서드를 사용하여 읽기를 구성합니다. 변환을 적용하면 PCollection<Struct>가 반환되며, 이 컬렉션의 각 요소는 읽기 작업에서 반환된 개별 행을 나타냅니다. 원하는 출력에 따라 특정 SQL 쿼리를 사용하거나 사용하지 않고 Spanner에서 읽을 수 있습니다.

SpannerIO.read() 변환을 적용하면 강력한 읽기를 수행하여 일관된 데이터 뷰가 반환됩니다. 달리 지정하지 않는 한 읽기를 시작한 시점에 읽은 결과가 스냅샷으로 촬영됩니다. Spanner가 수행할 수 있는 다양한 읽기 유형에 대한 자세한 내용은 읽기를 참조하세요.

쿼리를 사용하여 데이터 읽기

Spanner에서 특정 데이터 세트를 읽으려면 SpannerIO.Read.withQuery() 메서드로 변환을 구성하여 SQL 쿼리를 지정합니다. 예를 들면 다음과 같습니다.

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

쿼리를 지정하지 않고 데이터 읽기

쿼리를 사용하지 않고 데이터베이스에서 읽으려면 SpannerIO.Read.withTable() 메서드를 사용하여 테이블 이름을 지정하고, SpannerIO.Read.withColumns() 메서드를 사용하여 읽을 열 목록을 지정할 수 있습니다. 예를 들면 다음과 같습니다.

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

읽는 행을 제한하려면 SpannerIO.Read.withKeySet() 메서드를 사용하여 읽을 기본 키 집합을 지정하면 됩니다.

지정된 보조 색인을 사용하여 테이블을 읽을 수도 있습니다. readUsingIndex() API 호출과 마찬가지로 색인에는 쿼리 결과에 나타나는 모든 데이터가 포함되어야 합니다.

이렇게 하려면 이전 예시에 표시된 대로 테이블을 지정하고 SpannerIO.Read.withIndex()를 사용하여 원하는 열 값이 포함된 색인을 지정하세요. 색인에는 변환에서 읽어야 하는 모든 열이 저장되어야 합니다. 기본 테이블의 기본 키는 암시적으로 저장됩니다. 예를 들어 SongsBySongName 색인을 사용하여 Songs 테이블을 읽으려면 다음 코드를 사용합니다.

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

트랜잭션 데이터 비활성 제어

변환은 데이터의 일관된 스냅샷에서 실행되도록 보장됩니다. 데이터 비활성을 제어하려면 SpannerIO.Read.withTimestampBound() 메서드를 사용합니다. 자세한 내용은 트랜잭션을 참조하세요.

동일한 트랜잭션의 여러 테이블에서 읽기

데이터 일관성을 보장하기 위해 동일한 시점에 여러 테이블의 데이터를 읽으려면 단일 트랜잭션으로 모든 읽기를 수행합니다. 이렇게 하려면 createTransaction() 변환을 적용하여 PCollectionView<Transaction> 객체를 만듭니다. 그러면 트랜잭션이 생성됩니다. 결과 뷰는 SpannerIO.Read.withTransaction()을 사용하여 읽기 작업에 전달할 수 있습니다.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

사용 가능한 모든 테이블에서 데이터 읽기

Spanner 데이터베이스의 사용할 수 있는 모든 테이블에서 데이터를 읽을 수 있습니다.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

지원되지 않는 쿼리 문제 해결

Dataflow 커넥터는 쿼리 실행 계획의 첫 번째 연산자가 분산 통합인 Spanner SQL 쿼리만 지원합니다. 쿼리를 사용하여 Spanner에서 데이터를 읽으려고 할 때 query does not have a DistributedUnion at the root라는 예외가 발생하면 Spanner의 쿼리 실행 방법 이해의 단계를 따라 Google Cloud 콘솔을 사용하여 쿼리의 실행 계획을 검색하세요.

SQL 쿼리가 지원되지 않으면 쿼리 실행 계획의 첫 번째 연산자로 분산 통합이 포함된 쿼리로 쿼리를 단순화하세요. 쿼리 작동을 막을 가능성이 가장 높은 DISTINCT, GROUP BY, ORDER 연산자와 집계 함수, 테이블 조인을 제거하세요.

쓰기 변형 만들기

Java 파이프라인에 꼭 필요한 경우가 아니라면 newInsertBuilder() 메서드 대신 Mutation 클래스의 newInsertOrUpdateBuilder() 메서드를 사용하세요. Python 파이프라인의 경우 SpannerInsert() 대신 SpannerInsertOrUpdate()를 사용합니다. Dataflow에서는 최소 1회 보장을 제공합니다. 즉, 변형이 여러 번 기록될 수 있습니다. 따라서 INSERT만 변형할 경우 파이프라인 실패를 유발하는 com.google.cloud.spanner.SpannerException: ALREADY_EXISTS 오류가 생성될 수 있습니다. 이 오류를 방지하려면 새 행을 추가하거나 행이 이미 있으면 열 값을 업데이트하는 INSERT_OR_UPDATE 변형을 대신 사용합니다. INSERT_OR_UPDATE 변형은 두 번 이상 적용할 수 있습니다.

Spanner에 쓰기 및 데이터 변환

SpannerIO.write() 변환을 사용하여 입력 행 변형 컬렉션을 실행하면 Dataflow 커넥터로 Spanner에 데이터를 쓸 수 있습니다. Dataflow 커넥터는 효율성을 높이기 위해 변형을 배치로 그룹화합니다.

다음 예시에서는 쓰기 변환을 변형의 PCollection에 적용하는 방법을 보여줍니다.

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

변환이 완료되기 전에 예기치 않게 중지되더라도 이미 적용된 변형은 롤백되지 않습니다.

변형 그룹을 원자적으로 적용

MutationGroup 클래스를 사용하면 하나의 변형 그룹이 원자적으로 함께 적용되도록 할 수 있습니다. MutationGroup의 변형은 동일한 트랜잭션에서의 제출이 보장되지만 트랜잭션이 다시 시도될 수 있습니다.

변형 그룹은 키 공간에서 가깝게 있는 데이터에 영향을 미치는 변형들을 그룹화하는 경우에 최고의 성능을 발휘합니다. Spanner는 상위 테이블에 상위 테이블 데이터와 하위 테이블 데이터를 함께 인터리브하므로, 키 공간에서 데이터가 항상 서로 가깝게 위치합니다. 상위 테이블에 적용되는 하나의 변형과 하위 테이블에 적용되는 추가 변형을 포함하도록 또는 모든 변형이 키 공간에서 서로 가깝게 위치한 데이터를 수정하도록 변형 그룹을 구성하는 것이 좋습니다. Spanner가 상위 및 하위 테이블 데이터를 저장하는 방법에 대한 자세한 내용은 스키마 및 데이터 모델을 참조하세요. 권장되는 테이블 계층 주위에 변형 그룹을 구성하지 않거나 액세스 대상 데이터가 키 공간에서 서로 가깝게 위치하지 않는 경우 Spanner는 2단계 커밋을 수행해야 할 수 있으며, 이로 인해 성능 저하가 발생할 수 있습니다. 자세한 내용은 지역성의 장단점을 참조하세요.

MutationGroup을 사용하려면 SpannerIO.write() 변환을 빌드하고 SpannerIO.Write.grouped() 메서드를 호출합니다. 그러면 MutationGroup 객체의 PCollection에 적용할 수 있는 변환이 반환됩니다.

MutationGroup 생성 시 나열된 첫 번째 변형이 기본 변형이 됩니다. 변형 그룹이 상위 및 하위 테이블 모두에 영향을 미치는 경우, 기본 변형은 상위 테이블에 대한 변형이어야 합니다. 그렇지 않으면 모든 변형을 기본 변형으로 사용할 수 있습니다. Dataflow 커넥터는 변형의 효율적인 배치 작업을 위해 기본 변형을 사용하여 파티션 경계를 결정합니다.

예를 들어 동작을 모니터링하고 검토할 문제가 있는 사용자 동작을 신고하는 애플리케이션이 있다고 가정합니다. 신고된 각 동작에 대해 사용자의 애플리케이션 액세스를 차단하도록 Users 테이블을 업데이트하고 이 이슈를 PendingReviews 테이블에 기록해야 합니다. 두 테이블 모두를 원자적으로 업데이트하려면 MutationGroup을 사용합니다.

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

변형 그룹 생성 시 인수로 제공된 첫 번째 변형이 기본 변형이 됩니다. 이 경우에는 두 테이블이 서로 관련이 없으므로 분명한 기본 변형이 없습니다. 여기서는 userMutation을 먼저 배치하여 기본 변형으로 선택했습니다. 두 변형을 개별적으로 적용하는 것이 더 빠르지만 원자성을 보장하지 않으므로, 이 상황에서는 변형 그룹이 최상의 선택입니다.

다음 단계