Dataproc 選用 Hudi 元件

當您使用選用元件功能建立 Dataproc 叢集時,可以安裝 Hudi 等其他元件。本頁說明如何選擇在 Dataproc 叢集中安裝 Hudi 元件。

在 Dataproc 叢集中安裝 Apache Hudi 元件時,會安裝 Hudi 程式庫,並設定叢集中的 Spark 和 Hive,以便與 Hudi 搭配使用。

相容的 Dataproc 映像檔版本

您可以在使用下列 Dataproc 映像檔版本建立的 Dataproc 叢集中安裝 Hudi 元件:

建立含有 Hudi 叢集的 Dataproc 時,系統會將下列 Spark 和 Hive 屬性設定為與 Hudi 搭配運作。

設定檔 屬性 預設值
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

安裝元件

請在建立 Dataproc 叢集時安裝 Hudi 元件。

Dataproc 映像檔版本頁面會列出每個 Dataproc 映像檔版本中包含的 Hudi 元件版本。

控制台

  1. 啟用元件。
    • 在 Google Cloud 控制台中,開啟 Dataproc 的「Create a cluster」(建立叢集) 頁面。選取「設定叢集」面板。
    • 在「元件」部分中:
      • 在「選用元件」下方,選取「Hudi」元件。

gcloud 指令

如要建立包含 Hudi 元件的 Dataproc 叢集,請使用指令搭配 --optional-components 標記。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

更改下列內容:

  • CLUSTER_NAME:必填。新的叢集名稱。
  • REGION:必填。叢集區域
  • DATAPROC_IMAGE:選用。您可以使用這個選用標記,指定非預設的 Dataproc 映像檔版本 (請參閱「預設 Dataproc 映像檔版本」)。
  • PROPERTIES:選用。您可以使用這個選用的標記來設定 Hudi 元件屬性,這些屬性會使用 hudi: 檔案前置字元指定 (例如 properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE)。
    • Hudi 元件版本屬性:您可以選擇指定 dataproc:hudi.version 屬性注意:Dataproc 會設定 Hudi 元件版本,以便與 Dataproc 叢集映像檔版本相容。如果您設定這個屬性,如果指定的版本與叢集映像檔不相容,叢集建立作業就會失敗。
    • Spark 和 Hive 屬性:Dataproc 會在建立叢集時設定 Hudi 相關的 Spark 和 Hive 屬性。您不需要在建立叢集或提交工作時設定這些值。

REST API

您可以透過 Dataproc API 安裝 Hudi 元件,方法是使用 clusters.create 要求中的 SoftwareConfig.Component

提交工作來讀取及寫入 Hudi 資料表

使用 Hudi 元件建立叢集後,您就可以提交可讀取及寫入 Hudi 資料表的 Spark 和 Hive 工作。

gcloud CLI 示例:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

PySpark 工作範例

以下 PySpark 檔案會建立、讀取及寫入 Hudi 資料表。

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

下列 gcloud CLI 指令會將 PySpark 範例檔案提交至 Dataproc。

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

使用 Hudi CLI

Hudi CLI 位於 Dataproc 叢集主要節點的 /usr/lib/hudi/cli/hudi-cli.sh 中。您可以使用 Hudi CLI 查看 Hudi 資料表結構定義、提交內容和統計資料,以及手動執行管理作業,例如排定壓縮作業 (請參閱「使用 hudi-cli」)。

如要啟動 Hudi CLI 並連線至 Hudi 資料表,請按照下列步驟操作:

  1. 使用 SSH 連線至主要節點
  2. 執行 /usr/lib/hudi/cli/hudi-cli.sh。 指令提示會變更為 hudi->
  3. 執行 connect --path gs://my-bucket/my-hudi-table
  4. 執行指令,例如 desc (可用來說明資料表結構定義) 或 commits show (可用來顯示修訂版本記錄)。
  5. 如要停止 CLI 工作階段,請執行 exit

後續步驟