設定 Pub/Sub 通知

本文說明如何設定註記事件更新的通知。

容器分析會針對自動掃描發現的安全漏洞和其他中繼資料,透過 Pub/Sub 提供通知。系統建立或更新註記或例項時,就會向每個 API 版本的對應主題發布訊息。請使用您使用的 API 版本的主題。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Enable the Container Analysis API.

    Enable the API

  4. Install the Google Cloud CLI.

  5. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  6. To initialize the gcloud CLI, run the following command:

    gcloud init
  7. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  8. Enable the Container Analysis API.

    Enable the API

  9. Install the Google Cloud CLI.

  10. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  11. To initialize the gcloud CLI, run the following command:

    gcloud init
  12. 瞭解如何為專案中的中繼資料設定存取權控管。如果您只使用由 Artifact Analysis 容器掃描建立的安全漏洞事件中繼資料,請略過這個步驟。
  13. 建立 Pub/Sub 主題

    啟用 Artifact Analysis API 後,Artifact Analysis 會自動建立以下主題 ID 的 Pub/Sub 主題:

    • container-analysis-notes-v1
    • container-analysis-occurrences-v1

    如果不慎刪除或遺失主題,您可以自行加入。舉例來說,如果 Google Cloud貴機構有機構政策限制,要求使用客戶自行管理的加密金鑰 (CMEK) 進行加密,則可能會缺少主題。如果 Pub/Sub API 位於此限制的拒絕清單中,服務就無法自動使用Google-owned and Google-managed encryption keys建立主題。

    如何使用 Google-owned and Google-managed encryption keys建立主題:

    控制台

    1. 前往 Google Cloud 控制台的 Pub/Sub 主題頁面。

      開啟 Pub/Sub 主題頁面

    2. 按一下 [Create Topic] (建立主題)

    3. 輸入主題 ID:

      container-analysis-notes-v1
      

      以便名稱與 URI 相符:

      projects/PROJECT_ID/topics/container-analysis-notes-v1
      

      其中 PROJECT_ID 是您的 Google Cloud 專案 ID

    4. 按一下 [建立]。

    5. 輸入主題 ID:

      container-analysis-occurrences-v1
      

      以便名稱與 URI 相符:

      projects/PROJECT_ID/topics/container-analysis-occurrences-v1
      

    gcloud

    在您的殼層或終端機視窗中執行下列指令:

    gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-notes-v1
    gcloud pubsub topics create projects/PROJECT_ID/topics/container-analysis-occurrences-v1
    

    如要進一步瞭解 gcloud pubsub topics 指令,請參閱 topics 說明文件

    如要使用 CMEK 加密功能建立主題,請參閱 Pub/Sub 的主題加密操作說明

    只要系統建立或更新註記或例項,就會向相關主題發布訊息,但您也必須建立 Pub/Sub 訂閱項目,才能監聽事件並接收 Pub/Sub 服務傳送的訊息。

    建立 Pub/Sub 訂閱項目

    如要監聽事件,請建立與主題相關聯的 Pub/Sub 訂閱項目:

    控制台

    1. 前往Google Cloud 控制台的 Pub/Sub 訂閱項目頁面。

      開啟 Pub/Sub 訂閱項目頁面

    2. 按一下 [Create Subscription] (建立訂閱項目)

    3. 輸入訂閱名稱。例如「notes」

    4. 輸入註記主題的 URI:

      projects/PROJECT_ID/topics/container-analysis-notes-v1
      

      其中 PROJECT_ID 是您的 Google Cloud 專案 ID

    5. 按一下 [建立]。

    6. 使用 URI 替例項建立另一個訂閱項目:

      projects/PROJECT_ID/topics/container-analysis-occurrences-v1
      

    gcloud

    如要接收 Pub/Sub 事件,您必須先建立與 container-analysis-occurrences-v1 主題相關聯的訂閱項目:

    gcloud pubsub subscriptions create \
        --topic container-analysis-occurrences-v1 occurrences
    

    您之後可以使用新訂閱提取有關例項的訊息:

    gcloud pubsub subscriptions pull \
        --auto-ack occurrences
    

    Java

    如要瞭解如何安裝及使用 Artifact Analysis 用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Java API 參考說明文件

    如要向 Artifact Analysis 驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

    import com.google.cloud.pubsub.v1.AckReplyConsumer;
    import com.google.cloud.pubsub.v1.MessageReceiver;
    import com.google.cloud.pubsub.v1.Subscriber;
    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.PushConfig;
    import com.google.pubsub.v1.Subscription;
    import com.google.pubsub.v1.SubscriptionName;
    import com.google.pubsub.v1.TopicName;
    import io.grpc.StatusRuntimeException;
    import java.io.IOException;
    import java.lang.InterruptedException;
    import java.util.concurrent.TimeUnit;
    
    public class Subscriptions {
      // Handle incoming Occurrences using a Cloud Pub/Sub subscription
      public static int pubSub(String subId, long timeoutSeconds, String projectId)
          throws InterruptedException {
        // String subId = "my-occurrence-subscription";
        // long timeoutSeconds = 20;
        // String projectId = "my-project-id";
        Subscriber subscriber = null;
        MessageReceiverExample receiver = new MessageReceiverExample();
    
        try {
          // Subscribe to the requested Pub/Sub channel
          ProjectSubscriptionName subName = ProjectSubscriptionName.of(projectId, subId);
          subscriber = Subscriber.newBuilder(subName, receiver).build();
          subscriber.startAsync().awaitRunning();
          // Sleep to listen for messages
          TimeUnit.SECONDS.sleep(timeoutSeconds);
        } finally {
          // Stop listening to the channel
          if (subscriber != null) {
            subscriber.stopAsync();
          }
        }
        // Print and return the number of Pub/Sub messages received
        System.out.println(receiver.messageCount);
        return receiver.messageCount;
      }
    
      // Custom class to handle incoming Pub/Sub messages
      // In this case, the class will simply log and count each message as it comes in
      static class MessageReceiverExample implements MessageReceiver {
        public int messageCount = 0;
    
        @Override
        public synchronized void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
          // Every time a Pub/Sub message comes in, print it and count it
          System.out.println("Message " + messageCount + ": " + message.getData().toStringUtf8());
          messageCount += 1;
          // Acknowledge the message
          consumer.ack();
        }
      }
    
      // Creates and returns a Pub/Sub subscription object listening to the Occurrence topic
      public static Subscription createOccurrenceSubscription(String subId, String projectId) 
          throws IOException, StatusRuntimeException, InterruptedException {
        // This topic id will automatically receive messages when Occurrences are added or modified
        String topicId = "container-analysis-occurrences-v1";
        TopicName topicName = TopicName.of(projectId, topicId);
        SubscriptionName subName = SubscriptionName.of(projectId, subId);
    
        SubscriptionAdminClient client = SubscriptionAdminClient.create();
        PushConfig config = PushConfig.getDefaultInstance();
        Subscription sub = client.createSubscription(subName, topicName, config, 0);
        return sub;
      }
    }

    Go

    如要瞭解如何安裝及使用 Artifact Analysis 用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Go API 參考說明文件

    如要向 Artifact Analysis 驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

    
    import (
    	"context"
    	"fmt"
    	"io"
    	"sync"
    	"time"
    
    	pubsub "cloud.google.com/go/pubsub"
    )
    
    // occurrencePubsub handles incoming Occurrences using a Cloud Pub/Sub subscription.
    func occurrencePubsub(w io.Writer, subscriptionID string, timeout time.Duration, projectID string) (int, error) {
    	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
    	// timeout := time.Duration(20) * time.Second
    	ctx := context.Background()
    
    	var mu sync.Mutex
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return -1, fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	// Subscribe to the requested Pub/Sub channel.
    	sub := client.Subscription(subscriptionID)
    	count := 0
    
    	// Listen to messages for 'timeout' seconds.
    	ctx, cancel := context.WithTimeout(ctx, timeout)
    	defer cancel()
    	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    		mu.Lock()
    		count = count + 1
    		fmt.Fprintf(w, "Message %d: %q\n", count, string(msg.Data))
    		msg.Ack()
    		mu.Unlock()
    	})
    	if err != nil {
    		return -1, fmt.Errorf("sub.Receive: %w", err)
    	}
    	// Print and return the number of Pub/Sub messages received.
    	fmt.Fprintln(w, count)
    	return count, nil
    }
    
    // createOccurrenceSubscription creates a new Pub/Sub subscription object listening to the Occurrence topic.
    func createOccurrenceSubscription(subscriptionID, projectID string) error {
    	// subscriptionID := fmt.Sprintf("my-occurrences-subscription")
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	// This topic id will automatically receive messages when Occurrences are added or modified
    	topicID := "container-analysis-occurrences-v1"
    	topic := client.Topic(topicID)
    	config := pubsub.SubscriptionConfig{Topic: topic}
    	_, err = client.CreateSubscription(ctx, subscriptionID, config)
    	return fmt.Errorf("client.CreateSubscription: %w", err)
    }
    

    Node.js

    如要瞭解如何安裝及使用 Artifact Analysis 用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Node.js API 參考說明文件

    如要向 Artifact Analysis 驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

    /**
     * TODO(developer): Uncomment these variables before running the sample
     */
    // const projectId = 'your-project-id', // Your GCP Project ID
    // const subscriptionId = 'my-sub-id', // A user-specified subscription to the 'container-analysis-occurrences-v1' topic
    // const timeoutSeconds = 30 // The number of seconds to listen for the new Pub/Sub Messages
    
    // Import the pubsub library and create a client, topic and subscription
    const {PubSub} = require('@google-cloud/pubsub');
    const pubsub = new PubSub({projectId});
    const subscription = pubsub.subscription(subscriptionId);
    
    // Handle incoming Occurrences using a Cloud Pub/Sub subscription
    let count = 0;
    const messageHandler = message => {
      count++;
      message.ack();
    };
    
    // Listen for new messages until timeout is hit
    subscription.on('message', messageHandler);
    
    setTimeout(() => {
      subscription.removeListener('message', messageHandler);
      console.log(`Polled ${count} occurrences`);
    }, timeoutSeconds * 1000);

    Ruby

    如要瞭解如何安裝及使用 Artifact Analysis 用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Ruby API 參考說明文件

    如要向 Artifact Analysis 驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

    # subscription_id = "A user-specified identifier for the new subscription"
    # timeout_seconds = "The number of seconds to listen for new Pub/Sub messages"
    # project_id      = "Your Google Cloud project ID"
    
    require "google/cloud/pubsub"
    
    pubsub = Google::Cloud::Pubsub.new project: project_id
    topic = pubsub.topic "container-analysis-occurrences-v1"
    subscription = topic.subscribe subscription_id
    
    count = 0
    subscriber = subscription.listen do |received_message|
      count += 1
      # Process incoming occurrence here
      puts "Message #{count}: #{received_message.data}"
      received_message.acknowledge!
    end
    subscriber.start
    # Wait for incomming occurrences
    sleep timeout_seconds
    subscriber.stop.wait!
    subscription.delete
    # Print and return the total number of Pub/Sub messages received
    puts "Total Messages Received: #{count}"
    count

    Python

    如要瞭解如何安裝及使用 Artifact Analysis 用戶端程式庫,請參閱「Artifact Analysis 用戶端程式庫」。詳情請參閱 Artifact Analysis Python API 參考說明文件

    如要向 Artifact Analysis 驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定驗證機制」。

    import time
    
    from google.api_core.exceptions import AlreadyExists
    from google.cloud.pubsub import SubscriberClient
    from google.cloud.pubsub_v1.subscriber.message import Message
    
    
    def pubsub(subscription_id: str, timeout_seconds: int, project_id: str) -> int:
        """Respond to incoming occurrences using a Cloud Pub/Sub subscription."""
        # subscription_id := 'my-occurrences-subscription'
        # timeout_seconds = 20
        # project_id = 'my-gcp-project'
    
        client = SubscriberClient()
        subscription_name = client.subscription_path(project_id, subscription_id)
        receiver = MessageReceiver()
        client.subscribe(subscription_name, receiver.pubsub_callback)
    
        # listen for 'timeout' seconds
        for _ in range(timeout_seconds):
            time.sleep(1)
        # print and return the number of pubsub messages received
        print(receiver.msg_count)
        return receiver.msg_count
    
    
    class MessageReceiver:
        """Custom class to handle incoming Pub/Sub messages."""
    
        def __init__(self) -> None:
            # initialize counter to 0 on initialization
            self.msg_count = 0
    
        def pubsub_callback(self, message: Message) -> None:
            # every time a pubsub message comes in, print it and count it
            self.msg_count += 1
            print(f"Message {self.msg_count}: {message.data}")
            message.ack()
    
    
    def create_occurrence_subscription(subscription_id: str, project_id: str) -> bool:
        """Creates a new Pub/Sub subscription object listening to the
        Container Analysis Occurrences topic."""
        # subscription_id := 'my-occurrences-subscription'
        # project_id = 'my-gcp-project'
    
        topic_id = "container-analysis-occurrences-v1"
        client = SubscriberClient()
        topic_name = f"projects/{project_id}/topics/{topic_id}"
        subscription_name = client.subscription_path(project_id, subscription_id)
        success = True
        try:
            client.create_subscription({"name": subscription_name, "topic": topic_name})
        except AlreadyExists:
            # if subscription already exists, do nothing
            pass
        else:
            success = False
        return success
    
    

    訂閱者應用程式只會收到建立訂閱項目後發布至主題的訊息。

    Pub/Sub 酬載採用 JSON 格式,其結構定義如下所示:

    注意:

    {
        "name": "projects/PROJECT_ID/notes/NOTE_ID",
        "kind": "NOTE_KIND",
        "notificationTime": "NOTIFICATION_TIME",
    }

    例項:

    {
        "name": "projects/PROJECT_ID/occurrences/OCCURRENCE_ID",
        "kind": "NOTE_KIND",
        "notificationTime": "NOTIFICATION_TIME",
    }

    其中:

    • NOTE_KINDNoteKind 中的其中一個值。
    • NOTIFICATION_TIME 是採用 RFC 3339 UTC Zulu 格式的時間戳記,精確度達奈秒單位。

    查看詳細資料

    如要進一步瞭解附註或事件,您可以存取「構件分析」中儲存的中繼資料。舉例來說,您可以要求取得特定事件的所有詳細資料。請參閱「調查安全漏洞」中的操作說明。

    後續步驟

    • 如要瞭解如何使用 Artifact Analysis 儲存及管理自訂中繼資料,請參閱「建立自訂附註和事件」一文。

    • 您可以使用認證與安全漏洞掃描,避免在您的部署環境中執行具有已知安全問題的映像檔。如需操作說明,請參閱「使用 Kritis Signer 建立認證」一文。