Dataflow를 사용하여 Kafka에서 BigQuery로 데이터 쓰기

이 문서에서는 Apache Kafka에서 BigQuery로 스트리밍하는 Dataflow 파이프라인을 만들고 배포하는 방법에 대한 개략적인 안내를 제공합니다.

Apache Kafka는 이벤트 스트리밍을 위한 오픈소스 플랫폼입니다. Kafka는 분산 아키텍처에서 느슨하게 결합된 구성요소 간의 통신을 사용 설정하기 위해 일반적으로 사용됩니다. Dataflow를 사용하여 Kafka에서 이벤트를 읽고, 이를 처리하고, 추가 분석을 위해 BigQuery 테이블에 결과를 기록할 수 있습니다.

Kafka 이벤트를 BigQuery로 읽기

Google은 Kafka-BigQuery 파이프라인을 구성하는 Dataflow 템플릿을 제공합니다. 이 템플릿은 Apache Beam SDK에 제공된 BigQueryIO 커넥터를 사용합니다.

이 템플릿을 사용하려면 다음 단계를 수행합니다.

  1. Google Cloud 또는 다른 위치에 Kafka를 배포합니다.
  2. 네트워킹을 구성합니다.
  3. Identity and Access Management(IAM) 권한을 설정합니다.
  4. 이벤트 데이터를 변환하는 함수를 작성합니다.
  5. BigQuery 출력 테이블을 만듭니다.
  6. Dataflow 템플릿을 배포합니다.

Kafka 배포

Google Cloud 내에서 Compute Engine 가상 머신(VM) 인스턴스에 Kafka 클러스터를 배포하거나 타사 관리형 Kafka 서비스를 사용할 수 있습니다. Google Cloud의 배포 옵션에 대한 자세한 내용은 Apache Kafka란?을 참조하세요. Google Cloud Marketplace에서 타사 Kafka 솔루션을 찾을 수 있습니다.

또는 Google Cloud 외부에 있는 기존 Kafka 클러스터가 있을 수 있습니다. 예를 들어 온프레미스 또는 다른 퍼블릭 클라우드에 배포된 기존 워크로드가 있을 수 있습니다.

네트워킹 구성

기본적으로 Dataflow는 기본 Virtual Private Cloud(VPC) 네트워크 내에서 인스턴스를 실행합니다. Kafka 구성에 따라 Dataflow에 대해 다른 네트워크 및 서브넷을 구성해야 할 수도 있습니다. 자세한 내용은 네트워크 및 서브네트워크 지정을 참조하세요. 네트워크를 구성할 때 Dataflow 작업자 머신이 Kafka 브로커에 도달할 수 있도록 방화벽 규칙을 만듭니다.

VPC 서비스 제어를 사용하는 경우 Kafka 클러스터를 VPC 서비스 제어 경계 내에 배치하거나 승인된 VPN 또는 Cloud Interconnect로 경계를 확장합니다.

Kafka 클러스터가 Google Cloud 외부에 배포된 경우 Dataflow와 Kafka 클러스터 간에 네트워크 연결을 만들어야 합니다. 각각의 장단점에 따라 몇 가지 네트워킹 옵션이 있습니다.

예측 가능한 성능과 안정성을 위해 가장 좋은 옵션은 Dedicated Interconnect이지만, 이 옵션은 제3자가 새로운 회로를 프로비저닝해야 하므로 설정하는 데 오래 걸릴 수 있습니다. 공개 IP 기반 토폴로지에서는 네트워킹 작업이 거의 필요하지 않으므로 빠른 시작이 가능합니다.

다음 두 섹션에서는 이러한 옵션을 자세히 설명합니다.

공유 RFC 1918 주소 공간

Dedicated Interconnect와 IPsec VPN 모두 Virtual Private Cloud(VPC)의 RFC 1918 IP 주소에 직접 액세스하여 Kafka 구성을 단순화할 수 있습니다. VPN 기반 토폴로지를 사용하고 있다면 처리량이 높은 VPN 설정을 고려할 수 있습니다.

기본적으로 Dataflow는 기본 VPC 네트워크의 인스턴스를 실행합니다. Google Cloud의 서브네트워크를 해당 Kafka 클러스터에 연결하는 경로가 Cloud Router에 명시적으로 정의된 비공개 네트워크 토폴로지에서는 Dataflow 인스턴스를 어디에 둘지를 더 자세히 제어해야 합니다. Dataflow를 사용하여 networksubnetwork 실행 매개변수를 구성할 수 있습니다.

Dataflow가 수평 확장을 시도함에 따라 인스턴스를 실행하기에 충분한 IP 주소가 해당 서브네트워크에 있는지 확인합니다. 또한 Dataflow 인스턴스를 실행하기 위한 별도의 네트워크를 만들 때 프로젝트의 모든 가상 머신 간에 TCP 트래픽을 사용 설정하는 방화벽 규칙이 있는지 확인합니다. 기본 네트워크에는 이 방화벽 규칙이 이미 구성되어 있습니다.

공개 IP 주소 공간

이 아키텍처는 전송 계층 보안(TLS)을 사용하여 외부 클라이언트와 Kafka 간의 트래픽을 보호하고 브로커 간 통신에 암호화되지 않은 트래픽을 사용합니다. Kafka 리스너가 내부 및 외부 통신에 모두 사용되는 네트워크 인터페이스에 바인딩되는 경우에는 리스너 구성이 간편합니다. 그러나 많은 경우에 클러스터에 속한 Kafka 브로커의 외부 공지 주소는 Kafka가 사용하는 내부 네트워크 인터페이스와 다릅니다. 이러한 시나리오에서는 advertised.listeners 속성을 사용할 수 있습니다.

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

외부 클라이언트는 'SSL' 채널을 통해 포트 9093으로 연결하고, 내부 클라이언트는 일반 텍스트 채널을 통해 포트 9092로 연결합니다. advertised.listeners에 주소를 지정할 때는 외부 및 내부 트래픽에서 동일한 인스턴스로 확인되는 DNS 이름(이 샘플에서는 kafkabroker-n.mydomain.com)을 사용합니다. 공개 IP 주소는 내부 트래픽에서 확인에 실패할 수 있으므로 사용하지 못할 가능성이 높습니다.

IAM 권한 설정

Dataflow 작업은 2개의 IAM 서비스 계정을 사용합니다.

  • Dataflow 서비스는 Dataflow 서비스 계정을 사용하여 VM 만들기와 같이 Google Cloud 리소스를 조작합니다.
  • Dataflow 작업자 VM은 작업자 서비스 계정을 사용하여 파이프라인의 파일 및 기타 리소스에 액세스합니다. 이 서비스 계정에는 BigQuery 출력 테이블에 대한 쓰기 액세스 권한이 필요합니다. 또한 파이프라인 작업이 참조하는 다른 리소스에 대한 액세스도 필요합니다.

이러한 두 서비스 계정에 적절한 역할이 있는지 확인합니다. 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.

BigQuery용 데이터 변환

Kafka-BigQuery 템플릿은 하나 이상의 Kafka 주제에서 이벤트를 읽고 이를 BigQuery 테이블에 기록하는 파이프라인을 만듭니다. 선택적으로 BigQuery에 기록되기 전 이벤트 데이터를 변환하는 JavaScript 사용자 정의 함수(UDF)를 제공할 수 있습니다.

파이프라인의 출력은 출력 테이블의 스키마와 일치하는 JSON 형식 데이터여야 합니다. Kafka 이벤트 데이터가 이미 JSON 형식이면 일치하는 스키마를 사용하여 BigQuery 테이블을 만들고 이벤트를 BigQuery에 직접 전달할 수 있습니다. 그렇지 않으면 이벤트 데이터를 입력으로 사용하고 BigQuery 테이블과 일치하는 JSON 데이터를 반환하는 UDF를 작성합니다.

예를 들어 이벤트 데이터에 2개 필드가 포함된다고 가정해보세요.

  • name(문자열)
  • customer_id(정수)

Dataflow 파이프라인의 출력은 다음과 비슷할 수 있습니다.

{ "name": "Alice", "customer_id": 1234 }

이벤트 데이터가 아직 JSON 형식이 아니라고 가정하면, 다음과 같이 데이터를 변환하는 UDF를 작성합니다.

// UDF
function process(eventData) {
  var name;
  var customer_id;

  // TODO Parse the event data to extract the name and customer_id fields.

  // Return a JSON payload.
  return JSON.stringify({ name: name, customer_id: customer_id });
}

이 UDF는 이벤트 필터링, 개인 식별 정보(PII) 삭제, 추가 필드로 데이터 강화와 같이 이벤트 데이터에 대해 추가 처리를 수행할 수 있습니다.

템플릿의 UDF 작성에 대한 자세한 내용은 UDF로 Dataflow 템플릿 확장을 참조하세요. JavaScript 파일을 Cloud Storage에 업로드합니다.

BigQuery 출력 테이블 만들기

템플릿을 실행하기 전 BigQuery 출력 테이블을 만듭니다. 테이블 스키마가 파이프라인의 JSON 출력과 호환되어야 합니다. JSON 페이로드의 각 속성에 대해 파이프라인이 동일한 이름의 BigQuery 테이블 열에 값을 기록합니다. JSON에서 누락된 속성은 NULL 값으로 해석됩니다.

이전 예시를 사용하여 BigQuery 테이블에 다음 열이 포함됩니다.

열 이름 데이터 유형
name STRING
customer_id INTEGER

CREATE TABLE SQL 문을 사용하여 테이블을 만듭니다.

CREATE TABLE my_dataset.kafka_events (name STRING, customer_id INTEGER);

또는 JSON 정의 파일을 사용하여 테이블 스키마를 지정할 수 있습니다. 자세한 내용은 BigQuery 문서에서 스키마 지정을 참조하세요.

Dataflow 작업 실행

BigQuery 테이블을 만든 후 Dataflow 템플릿을 실행합니다.

콘솔

Google Cloud 콘솔을 사용하여 Dataflow 작업을 만들려면 다음 단계를 수행합니다.

  1. Google Cloud 콘솔의 Dataflow 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기를 클릭합니다.
  3. 작업 이름 필드에 작업 이름을 입력합니다.
  4. 리전 엔드포인트에 리전을 선택합니다.
  5. 'Kafka-BigQuery' 템플릿을 선택합니다.
  6. 필수 매개변수 아래에 BigQuery 출력 테이블 이름을 입력합니다. 테이블이 이미 존재하고 올바른 스키마가 있어야 합니다.
  7. 선택적 매개변수 표시를 클릭하고 최소한 다음 매개변수의 값을 입력합니다.

    • 입력을 읽어올 Kafka 주제
    • 쉼표로 구분된 Kafka 부트스트랩 서버 목록
    • 서비스 계정 이메일

    필요에 따라 추가 매개변수를 입력합니다. 특히 다음을 지정해야 할 수 있습니다.

    • 네트워킹: 기본 네트워크 이외의 VPC 네트워크를 사용하려면 네트워크 및 서브넷을 지정합니다.
    • UDF: JavaScript UDF를 사용하려면 스크립트의 Cloud Storage 위치와 호출할 JavaScript 함수 이름을 지정합니다.

gcloud

Google Cloud CLI를 사용하여 Dataflow 작업을 만들려면 다음 명령어를 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters inputTopics=KAFKA_TOPICS \
--parameters bootstrapServers=BOOTSTRAP_SERVERS \
--parameters outputTableSpec=OUTPUT_TABLE \
--parameters serviceAccount=IAM_SERVICE_ACCOUNT \
--parameters javascriptTextTransformGcsPath=UDF_SCRIPT_PATH \
--parameters javascriptTextTransformFunctionName=UDF_FUNCTION_NAME \
--network VPC_NETWORK_NAME \
--subnetwork SUBNET_NAME

다음 변수를 바꿉니다.

  • JOB_NAME: 선택한 작업 이름입니다.
  • LOCATION: 작업을 실행할 리전입니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요.
  • KAFKA_TOPICS: 읽어올 쉼표로 구분된 Kafka 주제 목록입니다.
  • BOOTSTRAP_SERVERS: 쉼표로 구분된 Kafka 부트스트랩 서버 목록입니다. 예: 127:9092,127.0.0.1:9093
  • OUTPUT_TABLE: PROJECT_ID:DATASET_NAME.TABLE_NAME으로 지정된 BigQuery 출력 테이블입니다. 예를 들면 my_project:dataset1.table1입니다.
  • IAM_SERVICE_ACCOUNT: 선택사항. 작업을 실행할 서비스 계정의 이메일 주소입니다.
  • UDF_SCRIPT_PATH: 선택사항. UDF가 포함된 JavaScript 파일에 대한 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-function.js입니다.
  • UDF_FUNCTION_NAME: 선택사항. UDF로 호출할 JavaScript 함수의 이름입니다.
  • VPC_NETWORK_NAME: 선택사항. 작업자를 할당할 네트워크입니다.
  • SUBNET_NAME: 선택사항. 작업자를 할당할 서브네트워크입니다.

데이터 유형

이 섹션에서는 BigQuery 테이블 스키마에서 여러 데이터 유형을 처리하는 방법을 설명합니다.

내부적으로 JSON 메시지가 TableRow 객체로 변환되고 TableRow 필드 값이 BigQuery 유형으로 변환됩니다.

스칼라 유형

다음 예시에서는 문자열, 숫자, 불리언, 날짜/시간, 간격, 지역 유형을 포함하여 서로 다른 스칼라 데이터 유형을 사용하여 BigQuery 테이블을 만듭니다.

CREATE TABLE  my_dataset.kafka_events (
    string_col STRING,
    integer_col INT64,
    float_col FLOAT64,
    decimal_col DECIMAL,
    bool_col BOOL,
    date_col DATE,
    dt_col DATETIME,
    ts_col TIMESTAMP,
    interval_col INTERVAL,
    geo_col GEOGRAPHY
);

다음은 호환되는 필드가 포함된 JSON 페이로드입니다.

{
  "string_col": "string_val",
  "integer_col": 10,
  "float_col": 3.142,
  "decimal_col": 5.2E11,
  "bool_col": true,
  "date_col": "2022-07-01",
  "dt_col": "2022-07-01 12:00:00.00",
  "ts_col": "2022-07-01T12:00:00.00Z",
  "interval_col": "0-13 370 48:61:61",
  "geo_col": "POINT(1 2)"
}

참고:

  • TIMESTAMP 열의 경우 JavaScript Date.toJSON 메서드를 사용하여 값 형식을 지정할 수 있습니다.
  • GEOGRAPHY 열의 경우 잘 알려진 텍스트(WKT) 또는 문자열로 형식이 지정된 GeoJSON을 사용하여 지역을 지정할 수 있습니다. 자세한 내용은 지리 공간 데이터 로드를 참조하세요.

BigQuery의 데이터 유형에 대한 자세한 내용은 데이터 유형을 참조하세요.

배열

ARRAY 데이터 유형을 사용하여 BigQuery에 배열을 저장할 수 있습니다. 다음 예시에서 JSON 페이로드에는 해당 값이 JSON 배열인 scores라는 속성이 포함되어 있습니다.

{"name":"Emily","scores":[10,7,10,9]}

다음 CREATE TABLE SQL 문은 호환되는 스키마를 사용하여 BigQuery 테이블을 만듭니다.

CREATE TABLE my_dataset.kafka_events (name STRING, scores ARRAY<INTEGER>);

결과 테이블은 다음과 같습니다.

+-------+-------------+
| name  |   scores    |
+-------+-------------+
| Emily | [10,7,10,9] |
+-------+-------------+

구조

BigQuery의 STRUCT 데이터 유형에는 이름 지정된 필드가 순서대로 정렬된 목록이 포함되어 있습니다. STRUCT를 사용하여 일관된 스키마를 따르는 JSON 객체를 보유할 수 있습니다.

다음 예시에서 JSON 페이로드에는 해당 값이 JSON 객체인 val라는 속성이 포함되어 있습니다.

{"name":"Emily","val":{"a":"yes","b":"no"}}

다음 CREATE TABLE SQL 문은 호환되는 스키마를 사용하여 BigQuery 테이블을 만듭니다.

CREATE TABLE my_dataset.kafka_events (name STRING, val STRUCT<a STRING, b STRING>);

결과 테이블은 다음과 같습니다.

+-------+----------------------+
| name  |         val          |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

반구조화된 이벤트 데이터

Kafka 이벤트 데이터가 엄격한 스키마를 따르지 않을 경우 BigQuery에 JSON 데이터 유형(미리보기)으로 저장하는 것이 좋습니다. JSON 데이터를 JSON 데이터 유형으로 저장하면 이벤트 스키마를 미리 정의할 필요가 없습니다. 데이터 수집 후 필드 액세스(점 표기법) 및 배열 액세스 연산자를 사용하여 출력 테이블을 쿼리할 수 있습니다.

먼저 JSON 열을 사용하여 테이블을 만듭니다.

-- Create the BigQuery table
CREATE TABLE my_dataset.kafka_events (event_data JSON);

그런 다음 JSON 객체 내에서 이벤트 페이로드를 래핑하는 JavaScript UDF를 정의합니다.

// UDF
function process(eventData) {
  var json;

  // TODO Convert the event data to JSON.

  return JSON.stringify({ "event_data": json });
}

데이터가 BigQuery에 기록되면 필드 액세스 연산자를 사용하여 개별 필드를 쿼리할 수 있습니다. 예를 들어 다음 쿼리는 각 레코드에 대해 name 필드 값을 반환합니다.

SELECT event_data.name FROM my_dataset1.kafka_events;

BigQuery에서 JSON 사용에 대한 자세한 내용은 Google 표준 SQL의 JSON 데이터 작업을 참조하세요.

오류 및 로깅

파이프라인을 실행할 때 오류가 발생하거나 개별 Kafka 이벤트를 처리하는 동안 오류가 발생할 수 있습니다.

파이프라인 오류 처리에 대한 자세한 내용은 파이프라인 문제 해결 및 디버깅을 참조하세요.

작업이 성공적으로 실행되지만 개별 Kafka 이벤트를 처리할 때 오류가 발생하면 파이프라인 작업이 BigQuery의 테이블에 오류 레코드를 기록합니다. 작업 자체는 실패하지 않고 이벤트 수준 오류가 Dataflow 작업 로그에 오류로 표시되지 않습니다.

파이프라인 작업이 오류 레코드를 저장할 테이블을 자동으로 만듭니다. 기본적으로 테이블 이름은 'output_table_error_records'입니다. 여기서 output_table은 출력 테이블의 이름입니다. 예를 들어 출력 테이블 이름이 kafka_events이면 오류 테이블 이름이 kafka_events_error_records입니다. outputDeadletterTable 템플릿 매개변수를 설정하여 다른 이름을 지정할 수 있습니다.

outputDeadletterTable=my_project:dataset1.errors_table

가능한 오류는 다음과 같습니다.

  • 잘못된 형식의 JSON을 포함하는 직렬화 오류
  • 테이블 스키마 및 JSON 데이터의 불일치로 인한 유형 변환 오류
  • 테이블 스키마에 없는 JSON 데이터의 추가 필드

오류 메시지 예시:

오류 유형 이벤트 데이터 errorMessage
직렬화 오류 'Hello world' json을 테이블 행으로 직렬화할 수 없음: 'Hello world'
유형 변환 오류 {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
알 수 없는 입력란 {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

다음 단계