가끔 보자, 하늘.

OpenAI speech to speech 샘플 본문

개발 이야기/개발 및 서비스

OpenAI speech to speech 샘플

가온아 2025. 5. 21. 09:00

급히 데모를 하나 만들게 있어서 PC에서 OpenAI speech to speech를 구현하면서 진행된 작업을 간단히 정리했습니다. 변화가 꽤 있어서 그런지 AI들이 제대로 코드를 많들지 못해 정리해 봅니다. 공식 메뉴얼은 이 링크를 참고하세요


설정 순서는 다음과 같은 순서로 설정합니다.

1. 초기화 과정 : 오디오 입력/출력 스트림 초기화 -> websocket 연결 -> 세션 초기화

2-1. 마이크 입력 처리 : input_stream.read -> encode base64 -> websocket.send

2-2. 사운드 출력 처리  : receive websocket ->

  2-2-1. response.audio.delta 확인 -> decode base64 -> output_stream.write 

   2-2-2. response.done 확인-> 음성 데이터 전송 완료. 텍스트 전송

2025년 5월 현재 기준으로 작성된 코드이고 버전에 따라 다를 수 있습니다. 참고하세요.

초기 설정은 아래와 같이 합니다.

 

.
.

async def run_s2s():
	# OpenAI Realtime API WebSocket URI
    # model명을 url에 명시합니다.
    uri = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview" 
    headers = {
        "Authorization": f"Bearer {API_KEY}", # API_KEY는 OpenAI API 키를 넣어주세요.
        "OpenAI-Beta": "realtime=v1" # OpenAI Realtime API 베타 헤더
    }

    try:
        # 오디오 스트림 초기화
        input_stream = audio.open(format=FORMAT,
                                  channels=CHANNELS,
                                  rate=RATE,
                                  input=True,
                                  frames_per_buffer=CHUNK)
        print("마이크 입력 스트림이 열렸습니다.")

        output_stream = audio.open(format=FORMAT,
                                   channels=CHANNELS,
                                   rate=RATE,
                                   output=True,
                                   frames_per_buffer=CHUNK)
        print("스피커 출력 스트림이 열렸습니다.")

		# websocket 연결합니다.
        async with websockets.connect(
            uri, 
            additional_headers=headers, 
            ping_interval=None, # 클라이언트 자동 ping 비활성화
            ping_timeout=75 # timeout 60초. 60초 이하로 설정하면 timeout되어 대화가 중단됩니다.
        ) as websocket:
            print("WebSocket 연결 성공.")

            session_init_message = {
                "event_id": "event_123", # 
                "type": "session.update",
                "session": {
                    "modalities": ["text","audio"],
                    "instructions": "You are a helpful assistant.", # 필요한 instruction을 설정하세요.
                    "voice": "alloy", # nova, echo, fable, onyx, nova, shimmer 등으로 설정 변경 가능합니다.
                    "input_audio_format": "pcm16",
                    "output_audio_format": "pcm16",
                    "input_audio_transcription": {
                        "model": "whisper-1"
                    },
                    "turn_detection": {
                        "type": "server_vad",
                        "threshold": 0.5,
                        "prefix_padding_ms": 300,
                        "silence_duration_ms": 500,
                        "create_response": True
                    },
                    "tools": [
                        {
                            "type": "function",
                            "name": "get_weather",
                            "description": "Get the current weather...",
                            "parameters": {
                                "type": "object",
                                "properties": {
                                    "location": { "type": "string" }
                                },
                                "required": ["location"]
                            }
                        }
                    ],
                    "tool_choice": "auto",
                    "temperature": 0.8,
                    "max_response_output_tokens": "inf"
                }
            }            
            print(f"세션 초기화 요청 전송 중: {json.dumps(session_init_message)}")
            await websocket.send(json.dumps(session_init_message))
            
            # 세션 초기화 응답 수신
            init_response_raw = await websocket.recv()
            init_response = json.loads(init_response_raw)
            print(f"세션 초기화 응답 수신: {init_response}")

            if init_response.get("type") == "session.created":
                print(f"세션 생성 성공. Session ID: {init_response.get('session').get('id')}")
            elif init_response.get("type") == "error":
                print(f"세션 생성 오류: {init_response.get('message')}")
                return
            else:
                print(f"예상치 못한 세션 초기화 응답: {init_response}")
                return
                
            #  음성을 입력받아 전달합니다.
            send_task = asyncio.create_task(send_audio())
            # 전달받은 음성을 출력합니다.
            receive_task = asyncio.create_task(receive_responses())

			# asyncio.wait는 나열된 비동기 작업을 끝날때까지 대기합니다. 
            # FIRST_COMPLETED 은 두 작업 하라나도 먼저 끝나면 바로 반환됩니다. 
            done, pending = await asyncio.wait(
                [send_task, receive_task],
                return_when=asyncio.FIRST_COMPLETED,
            )

            for task in pending:
                task.cancel()

    except websockets.exceptions.InvalidURI:
        print(f"오류: 잘못된 WebSocket URI입니다: {uri}")
    except websockets.exceptions.InvalidHandshake as e:
        print(f"WebSocket 핸드셰이크 실패: {e}. API 키 또는 'OpenAI-Beta' 헤더를 확인하세요.")
    except ConnectionRefusedError:
        print("오류: WebSocket 연결이 거부되었습니다. 서버 상태 및 네트워크를 확인하세요.")
    except Exception as e:
        print(f"처리 중 예외 발생: {e}")
    finally:
        print("스트림 및 오디오 리소스 정리 중...")
        if input_stream:
            if input_stream.is_active():
                input_stream.stop_stream()
            input_stream.close()
            print("마이크 입력 스트림이 닫혔습니다.")

        if output_stream:
            if output_stream.is_active():
                output_stream.stop_stream()
            output_stream.close()
            print("스피커 출력 스트림이 닫혔습니다.")

        if audio:
            audio.terminate()
            print("PyAudio가 종료되었습니다.")
        print("정리 완료.")

if __name__ == "__main__":
    try:
        asyncio.run(send_receive())
    except KeyboardInterrupt:
        print("\n 프로그램이 사용자에 의해 중단되었습니다.")
    finally:
        print("프로그램 종료.")

아래는 마이크 입력 및 응답 요청 함수입니다. 

async def send_audio():
    try:
        while True:
            data = input_stream.read(CHUNK, exception_on_overflow=False)
            encoded_data = base64.b64encode(data).decode('utf-8') # base64로 인코딩해서 전달합니다.
            audio_input_message = {
                "type": "input_audio_buffer.append",
                "audio": encoded_data
            }
            await websocket.send(json.dumps(audio_input_message))
            await asyncio.sleep(0.01) 
    except websockets.exceptions.ConnectionClosed:
        print("오디오 전송 중 WebSocket 연결이 닫혔습니다.")
    except Exception as e:
        print(f"오디오 전송 중 오류 발생: {e}")
    finally:
        print("오디오 전송 루프 종료.")

응답을 받아 처리하는 함수는 다음과 같습니다. 이 부분에서 메세지가 구글링을 하거나 ai에게 물어봐도 너무 다르게 나와서 혼동스러웠네요. 역시 그래도 공식 메뉴얼이 맞긴 한데 순차적으로 정리된게 아니라 여기저기 정보가 흩어져 있어 좀 해매긴 했네요.

async def receive_responses():
    try:
        async for message_raw in websocket:
            try:
                response_data = json.loads(message_raw)
            except json.JSONDecodeError as e:
                print(f"수신 메시지 JSON 디코딩 오류: {e} - 원시 메시지: {message_raw}")
                continue

            # 응답으로 전송된 음성 메세지가 모두 전달된 후 마지막으로 전달되는 응답
            if response_data.get("type") == "response.done": 
                resp = response_data.get("response")
                if resp:
                    print(f"resp 응답 완료! {json.dumps(resp, indent=2)}")
                else:
                    print("응답 완료 메시지에 응답 데이터 없음.")

                resp2 = resp.get("output")
                if resp2 and len(resp2) > 0:
                    content = resp2[0].get("content")
                    if content:
                        transcript = resp2[0].get("content")[0].get("transcript")
                        if transcript:
                            print(f"수신 완료 텍스트  : {transcript}")
                    else:
                        print("응답 데이터에 텍스트 없음.")
            # 응답으로 음성 메세지가 전송 중
            elif response_data.get("type") == "response.audio.delta":
                audio_b64 = response_data.get("delta")
                print("response.audio.delta 수신됨.")
                if audio_b64:
                    audio_content = base64.b64decode(audio_b64)
                    output_stream.write(audio_content)
                else:
                    print("응답 데이터에 오디오 없음.")
            .
            .
            .
            elif response_data.get("type") == "session.terminated":
                print(f"세션 종료됨: {response_data.get('reason', '이유 명시 안됨')}")
                break 

            elif response_data.get("type") == "error":
                error_message = response_data.get('message')
                print(f"API 오류 수신: {error_message}")
                print(f"전체 오류 응답: {json.dumps(response_data, indent=2)}") # 전체 오류 메시지 출력
                break # 오류 발생 시 수신 루프 종료
    except websockets.exceptions.ConnectionClosedOK:
        print("WebSocket 연결이 정상적으로 닫혔습니다 (수신 측).")
    except websockets.exceptions.ConnectionClosedError as e:
        print(f"WebSocket 연결 오류로 닫혔습니다 (수신 측): {e}")
    except json.JSONDecodeError as e:
        print(f"수신 메시지 JSON 디코딩 오류: {e} - 메시지: {message_raw}")
    except Exception as e:
        print(f"응답 처리 중 오류 발생: {e}")
    finally:
        print("응답 수신 루프 종료.")

응답 메세지 이외에도 timeout 관련된 이슈들이 좀 있었습니다. speech to speech로 몇 마디 주고 받다보면 timeout 에러 발생하더군요.

WebSocket 연결 오류로 닫혔습니다 (수신 측): received 1011 (internal error) keepalive ping timeout; then sent 1011 (internal error) keepalive ping timeout

처음에는 클라이언트 ping도 설정했었는데 계속 연결이 끝어져 서버의 ping이 안와서 timeout 이 발생한 듯 했습니다. websockets 라이브러리에서 설정된 시간대로 자동 처리 되는거라 서버측의 ping이 45초가 아닐 듯 했습니다. 

찾다보니 공식 문서에서는 45초로 명시되어 있는데 speech to speech 관련 문서에는 명시되어 있지는 않았습니다. 그래서 여러 값을 바꾸다보니 60초 이상으로 하니 문제가 발생하지 않더군요. 그러다 gemini 2.5 pro에 물어보니 ( 그 전에는 chatgpt와 github copilot에만 요청했었..) 60초라고 답변을 하네요. -_-a

에고... 여러모로 조금 귀찮긴 했습니다. 

풀 코드는 이 링크(https://github.com/blackwitch/example_openai_speech2speech)를 참고하세요. 이 예제는 응답 중 사용자 개입은 처리하지 않았습니다. 필요하신분은 수정해서 사용 부탁드려요.

그럼 좋은 하루 되세요. ^^

반응형