BigQuery Studio 노트북에서 PySpark 코드 실행

이 문서에서는 BigQuery Python 노트북에서 PySpark 코드를 실행하는 방법을 보여줍니다.

시작하기 전에

아직 만들지 않았다면 Google Cloud 프로젝트와 Cloud Storage 버킷을 만듭니다.

  1. 프로젝트 설정

    1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
    2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    3. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

      Go to project selector

    5. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

    6. 사용할 수 있는 버킷이 없는 경우 프로젝트에 Cloud Storage 버킷을 만듭니다.

    7. 메모장 설정하기

      1. 노트북 사용자 인증 정보: 기본적으로 노트북 세션은 사용자 인증 정보를 사용합니다. 세션에 서비스 계정 사용자 인증 정보를 지정하려면 Dataproc Worker (roles/dataproc.worker 역할)가 있어야 합니다. 자세한 내용은 Dataproc Serverless 서비스 계정을 참고하세요.
      2. 노트북 런타임: 다른 런타임을 선택하지 않는 한 노트북은 기본 Vertex 런타임을 사용합니다. 자체 런타임을 정의하려면 Google Cloud 콘솔의 런타임 페이지에서 런타임을 만듭니다.
    8. 가격 책정

      가격 정보는 BigQuery Notebook 런타임 가격 책정을 참고하세요.

      BigQuery Studio Python 노트북 열기

      1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

        BigQuery로 이동

      2. 세부정보 창의 탭 표시줄에서 + 기호 옆에 있는 화살표를 클릭한 다음 Notebook을 클릭합니다.

      BigQuery Studio 노트북에서 Spark 세션 만들기

      BigQuery Studio Python 노트북을 사용하여 Spark Connect 대화형 세션을 만들 수 있습니다. 각 BigQuery Studio 노트북에는 하나의 활성 Dataproc Serverless 세션만 연결할 수 있습니다.

      다음과 같은 방법으로 BigQuery Studio Python 노트북에서 Spark 세션을 만들 수 있습니다.

      • 노트북에서 단일 세션을 구성하고 만듭니다.
      • Spark용 Dataproc Serverless Interactive 세션 템플릿에서 Spark 세션을 구성한 후 템플릿을 사용하여 노트북에서 세션을 구성하고 만듭니다. BigQuery는 템플릿화된 Spark 세션 탭에 설명된 대로 템플릿화된 세션 코딩을 시작하는 데 도움이 되는 Query using Spark 기능을 제공합니다.

      단일 세션

      새 노트북에서 Spark 세션을 만들려면 다음 단계를 따르세요.

      1. 편집기 창의 탭 표시줄에서 + 기호 옆에 있는 화살표 드롭다운을 클릭한 다음 Notebook을 클릭합니다.

        새 노트북을 만드는 '+' 버튼이 있는 BigQuery 인터페이스를 보여주는 스크린샷
      2. 노트북 셀에서 다음 코드를 복사하여 실행하여 기본 Spark 세션을 구성하고 만듭니다.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      
      import pyspark.sql.connect.functions as f
      
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      다음을 바꿉니다.

      • APP_NAME: 세션의 이름입니다(선택사항).
      • 선택적 세션 설정: Dataproc API Session 설정을 추가하여 세션을 맞춤설정할 수 있습니다. 다음은 몇 가지 예입니다.
        • RuntimeConfig:
          session.runtime.config 옵션을 보여주는 코드 도움말
          • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
          • session.runtime_config.container_image = path/to/container/image
        • EnvironmentConfig:
          session-environment-config-execution-config 옵션을 보여주는 코드 도움말
          • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
          • session.environment_config.execution_config.ttl = {"seconds": VALUE}
          • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

      템플릿이 적용된 Spark 세션

      노트북 셀에 코드를 입력하고 실행하여 기존 Dataproc Serverless 세션 템플릿을 기반으로 Spark 세션을 만들 수 있습니다. 노트북 코드에 제공하는 모든 session 구성 설정은 세션 템플릿에 설정된 동일한 설정을 재정의합니다.

      빠르게 시작하려면 Query using Spark 템플릿을 사용하여 노트북에 Spark 세션 템플릿 코드를 미리 채웁니다.

      1. 편집기 창의 탭 표시줄에서 + 기호 옆에 있는 화살표 드롭다운을 클릭한 다음 Notebook을 클릭합니다.
        새 노트북을 만드는 '+' 버튼이 있는 BigQuery 인터페이스를 보여주는 스크린샷
      2. 템플릿으로 시작에서 Spark를 사용하여 쿼리를 클릭한 다음 템플릿 사용을 클릭하여 노트북에 코드를 삽입합니다.
        템플릿으로 시작하기 위한 BigQuery UI 선택사항
      3. 참고에 설명된 대로 변수를 지정합니다.
      4. 노트북에 삽입된 추가 샘플 코드 셀을 삭제할 수 있습니다.
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      # Configure the session with an existing session template.
      session_template = "SESSION_TEMPLATE"
      session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      

      다음을 바꿉니다.

      • PROJECT: Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열된 프로젝트 ID입니다.
      • LOCATION: 노트북 세션이 실행되는 Compute Engine 리전입니다. 제공되지 않으면 기본 위치는 노트북을 만드는 VM의 리전입니다.
      • SESSION_TEMPLATE: 기존 Dataproc Serverless 대화형 세션 템플릿의 이름입니다. 세션 구성 설정은 템플릿에서 가져옵니다. 템플릿은 다음 설정도 지정해야 합니다.

        • 런타임 버전 2.3+
        • 노트북 유형: Spark Connect

          예:

          Spark Connect 필수 설정을 보여주는 스크린샷
      • APP_NAME: 세션의 이름입니다(선택사항).

      BigQuery Studio 노트북에서 PySpark 코드 작성 및 실행

      노트북에서 Spark 세션을 만든 후 세션을 사용하여 노트북에서 Spark 노트북 코드를 실행합니다.

      Spark Connect PySpark API 지원: Spark Connect 노트북 세션은 DataFrame, Functions, Column을 비롯한 대부분의 PySpark API를 지원하지만 SparkContext, RDD 및 기타 PySpark API는 지원하지 않습니다. 자세한 내용은 Spark 3.5에서 지원되는 항목을 참고하세요.

      Dataproc 관련 API: Dataproc는 addArtifacts 메서드를 확장하여 PyPI 패키지를 Spark 세션에 동적으로 추가하는 작업을 간소화합니다. 목록은 pip install와 마찬가지로 version-scheme 형식으로 지정할 수 있습니다. 이렇게 하면 Spark Connect 서버에 모든 클러스터 노드에 패키지와 종속 항목을 설치하도록 지시하여 작업자가 UDF에 이를 사용할 수 있도록 합니다.

      textdistancerandom2를 사용하는 UDF가 작업자 노드에서 실행되도록 클러스터에 지정된 textdistance 버전과 호환되는 최신 random2 라이브러리를 설치하는 예시입니다.

      spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
      

      노트북 코드 도움말: BigQuery Studio 노트북은 클래스 또는 메서드 이름 위로 포인터를 가져가면 코드 도움말을 제공하고 코드를 입력할 때 코드 완성 도움말을 제공합니다.

      다음 예에서는 DataprocSparkSession을 입력합니다. 이 클래스 이름 위로 포인터를 가져가면 코드 완성 및 문서 도움말이 표시됩니다.

      코드 문서 및 코드 완성 도움말 예시

      BigQuery Studio 노트북 PySpark 예시

      이 섹션에서는 다음 작업을 실행하는 PySpark 코드가 포함된 BigQuery Studio Python 노트북 예시를 제공합니다.

      • 공개 셰익스피어 데이터 세트를 대상으로 단어 수를 집계합니다.
      • BigLake 메타스토어에 저장된 메타데이터가 포함된 Iceberg 테이블을 만듭니다.

      Wordcount

      다음 Pyspark 예에서는 Spark 세션을 만든 다음 공개 bigquery-public-data.samples.shakespeare 데이터 세트에서 단어 발생 횟수를 셉니다.

      # Basic wordcount example
      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      session = Session()
      
      # Create the Spark session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Run a wordcount on the public Shakespeare dataset.
      df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
      words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
      word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
      word_counts_df.show()
      

      다음을 바꿉니다.

      • APP_NAME: 세션의 이름입니다(선택사항).

      출력:

      셀 출력에는 wordcount 출력의 샘플이 나열됩니다. Google Cloud 콘솔에서 세션 세부정보를 보려면 대화형 세션 세부정보 보기 링크를 클릭합니다. Spark 세션을 모니터링하려면 세션 세부정보 페이지에서 Spark UI 보기를 클릭합니다.

      콘솔의 세션 세부정보 페이지에 있는 Spark UI 보기 버튼
      Interactive Session Detail View: LINK
      +------------+-----+
      |        word|count|
      +------------+-----+
      |           '|   42|
      |       ''All|    1|
      |     ''Among|    1|
      |       ''And|    1|
      |       ''But|    1|
      |    ''Gamut'|    1|
      |       ''How|    1|
      |        ''Lo|    1|
      |      ''Look|    1|
      |        ''My|    1|
      |       ''Now|    1|
      |         ''O|    1|
      |      ''Od's|    1|
      |       ''The|    1|
      |       ''Tis|    4|
      |      ''When|    1|
      |       ''tis|    1|
      |      ''twas|    1|
      |          'A|   10|
      |'ARTEMIDORUS|    1|
      +------------+-----+
      only showing top 20 rows
      

      Iceberg 테이블

      PySpark 코드를 실행하여 BigLake 메타스토어 메타데이터가 포함된 Iceberg 테이블 만들기

      다음 코드 예에서는 BigLake metastore에 저장된 테이블 메타데이터로 sample_iceberg_table를 만든 다음 테이블을 쿼리합니다.

      from google.cloud.dataproc_spark_connect import DataprocSparkSession
      from google.cloud.dataproc_v1 import Session
      import pyspark.sql.connect.functions as f
      # Create the Dataproc Serverless session.
      session = Session()
      # Set the session configuration for BigLake Metastore with the Iceberg environment.
      project = "PROJECT"
      region = "REGION"
      subnet_name = "SUBNET_NAME"
      location = "LOCATION"
      session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
      warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
      catalog = "CATALOG_NAME"
      namespace = "NAMESPACE"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
      session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
      # Create the Spark Connect session.
      spark = (
         DataprocSparkSession.builder
           .appName("APP_NAME")
           .dataprocSessionConfig(session)
           .getOrCreate()
      )
      # Create the namespace in BigQuery.
      spark.sql(f"USE `{catalog}`;")
      spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
      spark.sql(f"USE `{namespace}`;")
      # Create the Iceberg table.
      spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
      spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
      spark.sql("DESCRIBE sample_iceberg_table;")
      # Insert table data and query the table.
      spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
      # Alter table, then query and display table data and schema.
      spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
      spark.sql("DESCRIBE sample_iceberg_table;")
      df = spark.sql("SELECT * FROM sample_iceberg_table")
      df.show()
      df.printSchema()
      

      참고:

      • PROJECT: Google Cloud 콘솔 대시보드프로젝트 정보 섹션에 나열된 프로젝트 ID입니다.
      • REGIONSUBNET_NAME: Compute Engine 리전과 세션 리전의 서브넷 이름을 지정합니다. Dataproc Serverless는 지정된 서브넷에 비공개 Google 액세스 (PGA)를 사용 설정합니다.
      • LOCATION: 기본 BigQuery_metastore_config.locationspark.sql.catalog.{catalog}.gcp_locationUS이지만 지원되는 BigQuery 위치는 모두 선택할 수 있습니다.
      • BUCKETWAREHOUSE_DIRECTORY: Iceberg 웨어하우스 디렉터리에 사용되는 Cloud Storage 버킷 및 폴더입니다.
      • CATALOG_NAMENAMESPACE: Iceberg 카탈로그 이름과 네임스페이스가 결합되어 Iceberg 테이블(catalog.namespace.table_name)을 식별합니다.
      • APP_NAME: 세션의 이름입니다(선택사항).

      셀 출력에는 추가된 열이 있는 sample_iceberg_table가 표시되고 Google Cloud 콘솔에 대화형 세션 세부정보 페이지로 연결되는 링크가 표시됩니다. 세션 세부정보 페이지에서 Spark UI 보기를 클릭하여 Spark 세션을 모니터링할 수 있습니다.

      Interactive Session Detail View: LINK
      +---+---------+------------+
      | id|     data|newDoubleCol|
      +---+---------+------------+
      |  1|first row|        NULL|
      +---+---------+------------+
      
      root
       |-- id: integer (nullable = true)
       |-- data: string (nullable = true)
       |-- newDoubleCol: double (nullable = true)
      

      BigQuery에서 테이블 세부정보 보기

      BigQuery에서 Iceberg 테이블 세부정보를 확인하려면 다음 단계를 따르세요.

      1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

        BigQuery로 이동

      2. 프로젝트 리소스 창에서 프로젝트를 클릭한 다음 네임스페이스를 클릭하여 sample_iceberg_table 테이블을 표시합니다. 세부정보 표를 클릭하여 카탈로그 테이블 구성 열기 정보를 확인합니다.

        입력 및 출력 형식은 Iceberg에서 사용하는 표준 Hadoop InputFormatOutputFormat 클래스 형식입니다.

        BigQuery UI에 표시된 Iceberg 테이블 메타데이터

      기타 예

      Pandas DataFrame (df)에서 Spark DataFrame (sdf)를 만듭니다.

      sdf = spark.createDataFrame(df)
      sdf.show()
      

      Spark DataFrames에서 집계를 실행합니다.

      from pyspark.sql import functions as F
      
      sdf.groupby("segment").agg(
         F.mean("total_spend_per_user").alias("avg_order_value"),
         F.approx_count_distinct("user_id").alias("unique_customers")
      ).show()
      

      Spark-BigQuery 커넥터를 사용하여 BigQuery에서 읽습니다.

      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","my-bigquery-dataset")
      
      sdf = spark.read.format('bigquery') \
       .load(query)
      

      Gemini Code Assist로 Spark 코드 작성

      Gemini Code Assist에 노트북에서 PySpark 코드를 생성해 달라고 요청할 수 있습니다. Gemini Code Assist는 관련 BigQuery 및 Dataproc Metastore 테이블과 스키마를 가져와 사용하여 코드 응답을 생성합니다.

      노트북에서 Gemini Code Assist 코드를 생성하려면 다음 단계를 따르세요.

      1. 툴바에서 + 코드를 클릭하여 새 코드 셀을 삽입합니다. 새 코드 셀에 Start coding or generate with AI이 표시됩니다. 생성을 클릭합니다.

      2. 생성 편집기에서 자연어 프롬프트를 입력한 다음 enter 아이콘을 클릭합니다. 프롬프트에 spark 또는 pyspark 키워드를 포함해야 합니다.

        샘플 프롬프트:

        create a spark dataframe from order_items and filter to orders created in 2024
        

        샘플 출력:

        spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
        df = spark.sql("SELECT * FROM order_items")
        

      Gemini Code Assist 코드 생성 도움말

      • Gemini Code Assist가 관련 테이블과 스키마를 가져오도록 하려면 Dataproc Metastore 인스턴스에 Data Catalog 동기화를 사용 설정하세요.

      • 사용자 계정에 데이터 카탈로그 쿼리 테이블에 대한 액세스 권한이 있는지 확인합니다. 이렇게 하려면 DataCatalog.Viewer 역할을 할당합니다.

      Spark 세션 종료

      다음 작업 중 하나를 수행하여 BigQuery Studio 노트북에서 Spark Connect 세션을 중지할 수 있습니다.

      • 노트북 셀에서 spark.stop()를 실행합니다.
      • 노트북에서 런타임을 종료합니다.
        1. 런타임 선택기를 클릭한 다음 세션 관리를 클릭합니다.
          세션 선택 관리
        2. 활성 세션 대화상자에서 종료 아이콘을 클릭한 다음 종료를 클릭합니다.
          활성 세션 대화상자에서 세션 선택 종료

      BigQuery Studio 노트북 코드 조정

      다음과 같은 방법으로 BigQuery Studio 노트북 코드를 조정할 수 있습니다.

      Google Cloud 콘솔에서 노트북 코드 예약

      다음과 같은 방법으로 노트북 코드를 예약할 수 있습니다.

      노트북 코드를 Dataproc Serverless 일괄 워크로드로 실행

      BigQuery Studio 노트북 코드를 Dataproc 서버리스 일괄 워크로드로 실행하려면 다음 단계를 완료하세요.

      1. 로컬 터미널 또는 Cloud Shell에서 노트북 코드를 파일로 다운로드합니다.

        1. Google Cloud 콘솔의 BigQuery Studio 페이지에 있는 탐색기 패널에서 노트북을 엽니다.

        2. 파일 메뉴에서 다운로드를 선택한 다음 Download .py를 선택하여 노트북 코드를 다운로드합니다.

          Explorer 페이지의 File(파일) > Download(다운로드) 메뉴
      2. requirements.txt를 생성합니다.

        1. .py 파일을 저장한 디렉터리에 pipreqs를 설치합니다.
          pip install pipreqs
          
        2. pipreqs를 실행하여 requirements.txt를 생성합니다.

          pipreqs filename.py
          

        3. gsutil 도구를 사용하여 로컬 requirements.txt 파일을 Cloud Storage의 버킷에 복사합니다.

          gsutil cp requirements.txt gs://BUCKET/
          
      3. 다운로드한 .py 파일을 수정하여 Spark 세션 코드를 업데이트합니다.

        1. 셸 스크립트 명령어를 삭제하거나 주석 처리합니다.

        2. Spark 세션을 구성하는 코드를 삭제한 다음 구성 매개변수를 일괄 워크로드 제출 매개변수로 지정합니다. (Spark 배치 워크로드 제출 참고)

          예:

          • 코드에서 다음 세션 서브넷 구성 줄을 삭제합니다.

            session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
            

          • 배치 워크로드를 실행할 때 --subnet 플래그를 사용하여 서브넷을 지정합니다.

            gcloud dataproc batches submit pyspark \
            --subnet=SUBNET_NAME
            
        3. 간단한 세션 생성 코드 스니펫을 사용합니다.

          • 간소화하기 전의 다운로드된 노트북 코드 샘플

            from google.cloud.dataproc_spark_connect import DataprocSparkSession
            from google.cloud.dataproc_v1 import Session
            

            session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

          • 단순화 후 일괄 워크로드 코드

            from pyspark.sql import SparkSession
            

            spark = SparkSession \ .builder \ .getOrCreate()

      4. 일괄 워크로드를 실행합니다.

        1. 자세한 내용은 Spark 일괄 워크로드 제출을 참고하세요.

          • requirements.txt 파일이 포함된 Cloud Storage 버킷을 가리키도록 --deps-bucket 플래그를 포함해야 합니다.

            예:

          gcloud dataproc batches submit pyspark FILENAME.py \
              --region=REGION \
              --deps-bucket=BUCKET \
              --version=2.3 
          

          참고:

          • FILENAME: 다운로드하여 수정한 노트북 코드 파일의 이름입니다.
          • REGION: 클러스터가 있는 Compute Engine 리전
          • BUCKET requirements.txt 파일이 포함된 Cloud Storage 버킷의 이름입니다.
          • --version: 배치 워크로드를 실행하기 위해 spark 런타임 버전 2.3이 선택됩니다.
      5. 코드를 커밋합니다.

        1. 일괄 워크로드 코드를 테스트한 후에는 CI/CD 파이프라인의 일부로 GitHub, GitLab, Bitbucket과 같은 git 클라이언트를 사용하여 .ipynb 또는 .py 파일을 저장소에 커밋할 수 있습니다.
      6. Cloud Composer로 일괄 워크로드를 예약합니다.

        1. 자세한 내용은 Cloud Composer로 Dataproc Serverless 워크로드 실행을 참고하세요.

      노트북 오류 문제 해결하기

      Spark 코드가 포함된 셀에서 오류가 발생하면 셀 출력에서 대화형 세션 세부정보 보기 링크를 클릭하여 오류를 해결할 수 있습니다 (단어 수 및 Iceberg 표 예 참고).

      알려진 문제 및 해결 방법

      오류: Python 버전 3.10로 만든 노트북 런타임이 Spark 세션에 연결하려고 하면 PYTHON_VERSION_MISMATCH 오류가 발생할 수 있습니다.

      해결 방법: Python 버전 3.11로 런타임을 다시 만듭니다.

      다음 단계