CCAI 語音轉錄

CCAI Transcription 可將串流音訊資料即時轉換為轉錄文字。Agent Assist 會根據文字提供建議,因此必須先轉換音訊資料才能使用。您也可以搭配使用 CCAI Insights 和轉錄的串流音訊,收集有關服務專員對話的即時資料 (例如主題建模)。

如要轉錄串流音訊以搭配 CCAI 使用,有兩種方法:使用 SIPREC 功能,或以音訊資料做為酬載發出 gRPC 呼叫。本頁說明如何使用 gRPC 呼叫轉錄串流音訊資料。

CCAI Transcription 採用 Speech-to-Text 串流語音辨識技術。Speech-to-Text 提供多種辨識模型,包括標準版和強化版。只有搭配強化電話通話模型使用時,系統才會在正式發布層級支援 CCAI 轉錄功能。

必要條件

建立對話設定檔

如要建立對話設定檔,請使用 Agent Assist 主控台,或直接在 ConversationProfile 資源上呼叫 create 方法。

如要使用 CCAI 轉錄功能,建議您在對話中傳送音訊資料時,將 ConversationProfile.stt_config 設為預設 InputAudioConfig

在對話執行階段取得轉錄稿

如要在對話執行階段取得轉錄稿,您必須為對話建立參與者,並傳送每位參與者的音訊資料。

建立參與者

參與者可分為三種類型。如要進一步瞭解這些角色的用途,請參閱參考說明文件。在 participant 上呼叫 create 方法,然後指定 role。只有END_USERHUMAN_AGENT參與者可以撥打StreamingAnalyzeContent,這是取得轉錄稿的必要條件。

傳送音訊資料並取得轉錄稿

您可以使用 StreamingAnalyzeContent 將參與者的音訊傳送至 Google 並取得轉錄稿,並搭配下列參數:

  • 串流中的第一個要求必須是 InputAudioConfig。(這裡設定的欄位會覆寫 ConversationProfile.stt_config 中的對應設定)。請勿傳送任何音訊輸入內容,直到第二個要求為止。

    • audioEncoding 必須設為 AUDIO_ENCODING_LINEAR_16AUDIO_ENCODING_MULAW
    • model:這是要用於轉錄音訊的 Speech-to-Text 模型。 將這個欄位設為 telephony。變體不會影響轉錄品質,因此您可以保留「語音模型變體」未指定,或選擇「使用最佳可用模型」
    • singleUtterance 應設為 false,才能獲得最佳轉錄品質。如果 singleUtterancefalse,您不應預期 END_OF_SINGLE_UTTERANCE,但可以依賴 StreamingAnalyzeContentResponse.recognition_result 內的 isFinal==true 半關閉串流。
    • 選用其他參數:下列參數為選用。如要存取這些參數,請與您的 Google 代表聯絡。
      • languageCode:音訊的 language_code。預設值為 en-US
      • alternativeLanguageCodes:音訊中可能偵測到的其他語言。Agent Assist 會在語音開始時,使用 language_code 欄位自動偵測語言,並在後續所有對話回合中沿用該語言。「alternativeLanguageCodes」欄位可讓你指定更多選項,供 Agent Assist 選擇。
      • phraseSets語音轉文字模型調整 phraseSet 資源名稱。如要搭配 CCAI Transcription 使用模型調適功能,請先使用 Speech-to-Text API 建立 phraseSet,並在此指定資源名稱。
  • 傳送含有音訊酬載的第二個要求後,您應該會開始從串流接收一些 StreamingAnalyzeContentResponses

    • 當您看到 is_final 設為 true 時,可以半關閉串流 (或停止以某些語言 (例如 Python) 傳送)。StreamingAnalyzeContentResponse.recognition_result
    • 半關閉串流後,伺服器會傳回包含最終轉錄稿的回應,以及可能的 Dialogflow 建議或 Agent Assist 建議。
  • 最終轉錄稿會顯示在下列位置:

    • StreamingAnalyzeContentResponse.message.content
    • 如果啟用 Pub/Sub 通知,您也可以在 Pub/Sub 中查看轉錄稿。
  • 關閉先前的串流後,再開始新的串流。

    • 重新傳送音訊:系統會根據新串流的開始時間,重新傳送上次回應 speech_end_offset 後產生的音訊資料,以確保 StreamingAnalyzeContent 轉錄品質。is_final=true
  • 下圖說明串流的運作方式。

串流辨識要求程式碼範例

下列程式碼範例說明如何傳送串流轉錄要求:

Python

如要向 Agent Assist 進行驗證,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Google Cloud Dialogflow API sample code using the StreamingAnalyzeContent
API.

Also please contact Google to get credentials of this project and set up the
credential file json locations by running:
export GOOGLE_APPLICATION_CREDENTIALS=<cred_json_file_location>

Example usage:
    export GOOGLE_CLOUD_PROJECT='cloud-contact-center-ext-demo'
    export CONVERSATION_PROFILE='FnuBYO8eTBWM8ep1i-eOng'
    export GOOGLE_APPLICATION_CREDENTIALS='/Users/ruogu/Desktop/keys/cloud-contact-center-ext-demo-78798f9f9254.json'
    python streaming_transcription.py

Then started to talk in English, you should see transcription shows up as you speak.

Say "Quit" or "Exit" to stop.
"""

import os
import re
import sys

from google.api_core.exceptions import DeadlineExceeded

import pyaudio

from six.moves import queue

import conversation_management
import participant_management

PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
CONVERSATION_PROFILE_ID = os.getenv("CONVERSATION_PROFILE")

# Audio recording parameters
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10)  # 100ms
RESTART_TIMEOUT = 160  # seconds
MAX_LOOKBACK = 3  # seconds

YELLOW = "\033[0;33m"


class ResumableMicrophoneStream:
    """Opens a recording stream as a generator yielding the audio chunks."""

    def __init__(self, rate, chunk_size):
        self._rate = rate
        self.chunk_size = chunk_size
        self._num_channels = 1
        self._buff = queue.Queue()
        self.is_final = False
        self.closed = True
        # Count the number of times the stream analyze content restarts.
        self.restart_counter = 0
        self.last_start_time = 0
        # Time end of the last is_final in millisec since last_start_time.
        self.is_final_offset = 0
        # Save the audio chunks generated from the start of the audio stream for
        # replay after restart.
        self.audio_input_chunks = []
        self.new_stream = True
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=self._num_channels,
            rate=self._rate,
            input=True,
            frames_per_buffer=self.chunk_size,
            # Run the audio stream asynchronously to fill the buffer object.
            # This is necessary so that the input device's buffer doesn't
            # overflow while the calling thread makes network requests, etc.
            stream_callback=self._fill_buffer,
        )

    def __enter__(self):
        self.closed = False
        return self

    def __exit__(self, type, value, traceback):
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        # Signal the generator to terminate so that the client's
        # streaming_recognize method will not block the process termination.
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(self, in_data, *args, **kwargs):
        """Continuously collect data from the audio stream, into the buffer in
        chunksize."""

        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self):
        """Stream Audio from microphone to API and to local buffer"""
        try:
            # Handle restart.
            print("restart generator")
            # Flip the bit of is_final so it can continue stream.
            self.is_final = False
            total_processed_time = self.last_start_time + self.is_final_offset
            processed_bytes_length = (
                int(total_processed_time * SAMPLE_RATE * 16 / 8) / 1000
            )
            self.last_start_time = total_processed_time
            # Send out bytes stored in self.audio_input_chunks that is after the
            # processed_bytes_length.
            if processed_bytes_length != 0:
                audio_bytes = b"".join(self.audio_input_chunks)
                # Lookback for unprocessed audio data.
                need_to_process_length = min(
                    int(len(audio_bytes) - processed_bytes_length),
                    int(MAX_LOOKBACK * SAMPLE_RATE * 16 / 8),
                )
                # Note that you need to explicitly use `int` type for substring.
                need_to_process_bytes = audio_bytes[(-1) * need_to_process_length :]
                yield need_to_process_bytes

            while not self.closed and not self.is_final:
                data = []
                # Use a blocking get() to ensure there's at least one chunk of
                # data, and stop iteration if the chunk is None, indicating the
                # end of the audio stream.
                chunk = self._buff.get()

                if chunk is None:
                    return
                data.append(chunk)
                # Now try to the rest of chunks if there are any left in the _buff.
                while True:
                    try:
                        chunk = self._buff.get(block=False)

                        if chunk is None:
                            return
                        data.append(chunk)

                    except queue.Empty:
                        break
                self.audio_input_chunks.extend(data)
                if data:
                    yield b"".join(data)
        finally:
            print("Stop generator")


def main():
    """start bidirectional streaming from microphone input to Dialogflow API"""
    # Create conversation.
    conversation = conversation_management.create_conversation(
        project_id=PROJECT_ID, conversation_profile_id=CONVERSATION_PROFILE_ID
    )

    conversation_id = conversation.name.split("conversations/")[1].rstrip()

    # Create end user participant.
    end_user = participant_management.create_participant(
        project_id=PROJECT_ID, conversation_id=conversation_id, role="END_USER"
    )
    participant_id = end_user.name.split("participants/")[1].rstrip()

    mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
    print(mic_manager.chunk_size)
    sys.stdout.write(YELLOW)
    sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n')
    sys.stdout.write("End (ms)       Transcript Results/Status\n")
    sys.stdout.write("=====================================================\n")

    with mic_manager as stream:
        while not stream.closed:
            terminate = False
            while not terminate:
                try:
                    print(f"New Streaming Analyze Request: {stream.restart_counter}")
                    stream.restart_counter += 1
                    # Send request to streaming and get response.
                    responses = participant_management.analyze_content_audio_stream(
                        conversation_id=conversation_id,
                        participant_id=participant_id,
                        sample_rate_herz=SAMPLE_RATE,
                        stream=stream,
                        timeout=RESTART_TIMEOUT,
                        language_code="en-US",
                        single_utterance=False,
                    )

                    # Now, print the final transcription responses to user.
                    for response in responses:
                        if response.message:
                            print(response)
                        if response.recognition_result.is_final:
                            print(response)
                            # offset return from recognition_result is relative
                            # to the beginning of audio stream.
                            offset = response.recognition_result.speech_end_offset
                            stream.is_final_offset = int(
                                offset.seconds * 1000 + offset.microseconds / 1000
                            )
                            transcript = response.recognition_result.transcript
                            # Half-close the stream with gRPC (in Python just stop yielding requests)
                            stream.is_final = True
                            # Exit recognition if any of the transcribed phrase could be
                            # one of our keywords.
                            if re.search(r"\b(exit|quit)\b", transcript, re.I):
                                sys.stdout.write(YELLOW)
                                sys.stdout.write("Exiting...\n")
                                terminate = True
                                stream.closed = True
                                break
                except DeadlineExceeded:
                    print("Deadline Exceeded, restarting.")

            if terminate:
                conversation_management.complete_conversation(
                    project_id=PROJECT_ID, conversation_id=conversation_id
                )
                break


if __name__ == "__main__":
    main()