diff --git a/examples/tracing/openai/openai_tracing.ipynb b/examples/tracing/openai/openai_tracing.ipynb index a79bae1f..fc5cad02 100644 --- a/examples/tracing/openai/openai_tracing.ipynb +++ b/examples/tracing/openai/openai_tracing.ipynb @@ -41,6 +41,7 @@ "import os\n", "\n", "import openai\n", + "from pydantic import BaseModel\n", "\n", "# OpenAI env variables\n", "os.environ[\"OPENAI_API_KEY\"] = \"YOUR_OPENAI_API_KEY_HERE\"\n", @@ -86,6 +87,14 @@ "That's it! Now you can continue using the traced OpenAI client normally. The data is automatically published to Openlayer and you can start creating tests around it!" ] }, + { + "cell_type": "markdown", + "id": "fb5ebdad", + "metadata": {}, + "source": [ + "### 3.1 Chat Completions API" + ] + }, { "cell_type": "code", "execution_count": null, @@ -104,12 +113,54 @@ "id": "abaf6987-c257-4f0d-96e7-3739b24c7206", "metadata": {}, "outputs": [], - "source": [] + "source": [ + "class Person(BaseModel):\n", + " name: str\n", + " age: int\n", + " occupation: str\n", + "\n", + "# Parse method automatically returns structured Pydantic object\n", + "completion = openai_client.chat.completions.parse(\n", + " model=\"gpt-4o\",\n", + " messages=[\n", + " {\"role\": \"user\", \"content\": \"Extract: John Doe is 30 years old and works as a software engineer\"}\n", + " ],\n", + " response_format=Person,\n", + ")\n", + "\n", + "completion.choices[0].message.parsed" + ] + }, + { + "cell_type": "markdown", + "id": "4e6fb396", + "metadata": {}, + "source": [ + "### 3.2 Responses API" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21369c42", + "metadata": {}, + "outputs": [], + "source": [ + "# Responses API - new unified interface with enhanced metadata\n", + "response = openai_client.responses.create(\n", + " model=\"gpt-4o-mini\",\n", + " input=\"What is 3 + 3?\",\n", + " max_output_tokens=50\n", + ")\n", + "\n", + "# Response is automatically traced\n", + "response" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "base", "language": "python", "name": "python3" }, @@ -123,7 +174,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.18" + "version": "3.12.7" } }, "nbformat": 4, diff --git a/src/openlayer/lib/integrations/async_openai_tracer.py b/src/openlayer/lib/integrations/async_openai_tracer.py index f670fa16..799d64e9 100644 --- a/src/openlayer/lib/integrations/async_openai_tracer.py +++ b/src/openlayer/lib/integrations/async_openai_tracer.py @@ -8,6 +8,7 @@ try: import openai + HAVE_OPENAI = True except ImportError: HAVE_OPENAI = False @@ -20,6 +21,12 @@ create_trace_args, add_to_trace, parse_non_streaming_output_data, + # Import Responses API helper functions + extract_responses_chunk_data, + extract_responses_inputs, + parse_responses_output_data, + extract_responses_usage, + get_responses_model_parameters, ) logger = logging.getLogger(__name__) @@ -28,20 +35,24 @@ def trace_async_openai( client: Union["openai.AsyncOpenAI", "openai.AsyncAzureOpenAI"], ) -> Union["openai.AsyncOpenAI", "openai.AsyncAzureOpenAI"]: - """Patch the AsyncOpenAI or AsyncAzureOpenAI client to trace chat completions. - - The following information is collected for each chat completion: - - start_time: The time when the completion was requested. - - end_time: The time when the completion was received. - - latency: The time it took to generate the completion. - - tokens: The total number of tokens used to generate the completion. - - prompt_tokens: The number of tokens in the prompt. - - completion_tokens: The number of tokens in the completion. - - model: The model used to generate the completion. + """Patch the AsyncOpenAI or AsyncAzureOpenAI client to trace chat completions and responses. + + This function patches both the Chat Completions API (client.chat.completions.create) + and the Responses API (client.responses.create) to provide comprehensive tracing + for both APIs while maintaining backward compatibility. + + The following information is collected for each completion/response: + - start_time: The time when the completion/response was requested. + - end_time: The time when the completion/response was received. + - latency: The time it took to generate the completion/response. + - tokens: The total number of tokens used to generate the completion/response. + - prompt_tokens: The number of tokens in the prompt/input. + - completion_tokens: The number of tokens in the completion/output. + - model: The model used to generate the completion/response. - model_parameters: The parameters used to configure the model. - raw_output: The raw output of the model. - - inputs: The inputs used to generate the completion. - - metadata: Additional metadata about the completion. For example, the time it + - inputs: The inputs used to generate the completion/response. + - metadata: Additional metadata about the completion/response. For example, the time it took to generate the first token, when streaming. Parameters @@ -55,15 +66,15 @@ def trace_async_openai( The patched AsyncOpenAI client. """ if not HAVE_OPENAI: - raise ImportError( - "OpenAI library is not installed. Please install it with: pip install openai" - ) - + raise ImportError("OpenAI library is not installed. Please install it with: pip install openai") + is_azure_openai = isinstance(client, openai.AsyncAzureOpenAI) - create_func = client.chat.completions.create - @wraps(create_func) - async def traced_create_func(*args, **kwargs): + # Patch Chat Completions API + chat_create_func = client.chat.completions.create + + @wraps(chat_create_func) + async def traced_chat_create_func(*args, **kwargs): inference_id = kwargs.pop("inference_id", None) stream = kwargs.get("stream", False) @@ -71,19 +82,51 @@ async def traced_create_func(*args, **kwargs): return handle_async_streaming_create( *args, **kwargs, - create_func=create_func, + create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, + api_type="chat_completions", ) return await handle_async_non_streaming_create( *args, **kwargs, - create_func=create_func, + create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, + api_type="chat_completions", ) - client.chat.completions.create = traced_create_func + client.chat.completions.create = traced_chat_create_func + + # Patch Responses API (if available) + if hasattr(client, "responses"): + responses_create_func = client.responses.create + + @wraps(responses_create_func) + async def traced_responses_create_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + stream = kwargs.get("stream", False) + + if stream: + return handle_async_responses_streaming_create( + *args, + **kwargs, + create_func=responses_create_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + return await handle_async_responses_non_streaming_create( + *args, + **kwargs, + create_func=responses_create_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + client.responses.create = traced_responses_create_func + else: + logger.debug("Responses API not available in this AsyncOpenAI client version") + return client @@ -92,6 +135,7 @@ async def handle_async_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, + api_type: str = "chat_completions", **kwargs, ) -> AsyncIterator[Any]: """Handles the create method when streaming is enabled. @@ -142,16 +186,12 @@ async def handle_async_streaming_create( if delta.function_call.name: collected_function_call["name"] += delta.function_call.name if delta.function_call.arguments: - collected_function_call[ - "arguments" - ] += delta.function_call.arguments + collected_function_call["arguments"] += delta.function_call.arguments elif delta.tool_calls: if delta.tool_calls[0].function.name: collected_function_call["name"] += delta.tool_calls[0].function.name if delta.tool_calls[0].function.arguments: - collected_function_call["arguments"] += delta.tool_calls[ - 0 - ].function.arguments + collected_function_call["arguments"] += delta.tool_calls[0].function.arguments yield chunk @@ -163,15 +203,11 @@ async def handle_async_streaming_create( finally: # Try to add step to the trace try: - collected_output_data = [ - message for message in collected_output_data if message is not None - ] + collected_output_data = [message for message in collected_output_data if message is not None] if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads( - collected_function_call["arguments"] - ) + collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) output_data = collected_function_call trace_args = create_trace_args( @@ -186,13 +222,7 @@ async def handle_async_streaming_create( model_parameters=get_model_parameters(kwargs), raw_output=raw_outputs, id=inference_id, - metadata={ - "timeToFirstToken": ( - (first_token_time - start_time) * 1000 - if first_token_time - else None - ) - }, + metadata={"timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None)}, ) add_to_trace( **trace_args, @@ -212,8 +242,9 @@ async def handle_async_non_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, + api_type: str = "chat_completions", **kwargs, -) -> "openai.types.chat.chat_completion.ChatCompletion": +) -> Union["openai.types.chat.chat_completion.ChatCompletion", Any]: """Handles the create method when streaming is disabled. Parameters @@ -257,8 +288,181 @@ async def handle_async_non_streaming_create( ) # pylint: disable=broad-except except Exception as e: - logger.error( - "Failed to trace the create chat completion request with Openlayer. %s", e + logger.error("Failed to trace the create chat completion request with Openlayer. %s", e) + + return response + + +# -------------------------------- Async Responses API Handlers -------------------------------- # + + +async def handle_async_responses_streaming_create( + create_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> AsyncIterator[Any]: + """Handles the Responses API create method when streaming is enabled (async version). + + Parameters + ---------- + create_func : callable + The Responses API create method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + AsyncIterator[Any] + An async generator that yields the chunks of the response stream. + """ + chunks = await create_func(*args, **kwargs) + + # Create and return a new async generator that processes chunks + collected_output_data = [] + collected_function_call = { + "name": "", + "arguments": "", + } + raw_outputs = [] + start_time = time.time() + end_time = None + first_token_time = None + num_of_completion_tokens = None + latency = None + + try: + i = 0 + async for chunk in chunks: + raw_outputs.append(chunk.model_dump() if hasattr(chunk, "model_dump") else str(chunk)) + if i == 0: + first_token_time = time.time() + if i > 0: + num_of_completion_tokens = i + 1 + i += 1 + + # Handle different types of ResponseStreamEvent + chunk_data = extract_responses_chunk_data(chunk) + + if chunk_data.get("content"): + collected_output_data.append(chunk_data["content"]) + elif chunk_data.get("function_call"): + func_call = chunk_data["function_call"] + if func_call.get("name"): + collected_function_call["name"] += func_call["name"] + if func_call.get("arguments"): + collected_function_call["arguments"] += func_call["arguments"] + + yield chunk + + end_time = time.time() + latency = (end_time - start_time) * 1000 + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed yield chunk. %s", e) + finally: + # Try to add step to the trace + try: + collected_output_data = [message for message in collected_output_data if message is not None] + if collected_output_data: + output_data = "".join(collected_output_data) + else: + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + except json.JSONDecodeError: + # Keep as string if not valid JSON + pass + output_data = collected_function_call + + trace_args = create_trace_args( + end_time=end_time, + inputs=extract_responses_inputs(kwargs), + output=output_data, + latency=latency, + tokens=num_of_completion_tokens, + prompt_tokens=0, + completion_tokens=num_of_completion_tokens, + model=kwargs.get("model", "unknown"), + model_parameters=get_responses_model_parameters(kwargs), + raw_output=raw_outputs, + id=inference_id, + metadata={ + "timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None), + "api_type": "responses", + }, + ) + add_to_trace( + **trace_args, + is_azure_openai=is_azure_openai, + api_type="responses", + ) + + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the Responses API request with Openlayer. %s", + e, + ) + + +async def handle_async_responses_non_streaming_create( + create_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handles the Responses API create method when streaming is disabled (async version). + + Parameters + ---------- + create_func : callable + The Responses API create method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Any + The response object. + """ + start_time = time.time() + response = await create_func(*args, **kwargs) + end_time = time.time() + + # Try to add step to the trace + try: + output_data = parse_responses_output_data(response) + usage_data = extract_responses_usage(response) + + trace_args = create_trace_args( + end_time=end_time, + inputs=extract_responses_inputs(kwargs), + output=output_data, + latency=(end_time - start_time) * 1000, + tokens=usage_data.get("total_tokens", 0), + prompt_tokens=usage_data.get("prompt_tokens", 0), + completion_tokens=usage_data.get("completion_tokens", 0), + model=getattr(response, "model", kwargs.get("model", "unknown")), + model_parameters=get_responses_model_parameters(kwargs), + raw_output=response.model_dump() if hasattr(response, "model_dump") else str(response), + id=inference_id, + metadata={"api_type": "responses"}, ) + add_to_trace( + is_azure_openai=is_azure_openai, + api_type="responses", + **trace_args, + ) + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed to trace the Responses API request with Openlayer. %s", e) + return response diff --git a/src/openlayer/lib/integrations/openai_tracer.py b/src/openlayer/lib/integrations/openai_tracer.py index 0c787aa2..e0ad0774 100644 --- a/src/openlayer/lib/integrations/openai_tracer.py +++ b/src/openlayer/lib/integrations/openai_tracer.py @@ -8,6 +8,7 @@ try: import openai + HAVE_OPENAI = True except ImportError: HAVE_OPENAI = False @@ -23,20 +24,24 @@ def trace_openai( client: Union["openai.OpenAI", "openai.AzureOpenAI"], ) -> Union["openai.OpenAI", "openai.AzureOpenAI"]: - """Patch the OpenAI or AzureOpenAI client to trace chat completions. - - The following information is collected for each chat completion: - - start_time: The time when the completion was requested. - - end_time: The time when the completion was received. - - latency: The time it took to generate the completion. - - tokens: The total number of tokens used to generate the completion. - - prompt_tokens: The number of tokens in the prompt. - - completion_tokens: The number of tokens in the completion. - - model: The model used to generate the completion. + """Patch the OpenAI or AzureOpenAI client to trace chat completions and responses. + + This function patches both the Chat Completions API (client.chat.completions.create) + and the Responses API (client.responses.create) to provide comprehensive tracing + for both APIs while maintaining backward compatibility. + + The following information is collected for each completion/response: + - start_time: The time when the completion/response was requested. + - end_time: The time when the completion/response was received. + - latency: The time it took to generate the completion/response. + - tokens: The total number of tokens used to generate the completion/response. + - prompt_tokens: The number of tokens in the prompt/input. + - completion_tokens: The number of tokens in the completion/output. + - model: The model used to generate the completion/response. - model_parameters: The parameters used to configure the model. - raw_output: The raw output of the model. - - inputs: The inputs used to generate the completion. - - metadata: Additional metadata about the completion. For example, the time it + - inputs: The inputs used to generate the completion/response. + - metadata: Additional metadata about the completion/response. For example, the time it took to generate the first token, when streaming. Parameters @@ -50,15 +55,15 @@ def trace_openai( The patched OpenAI client. """ if not HAVE_OPENAI: - raise ImportError( - "OpenAI library is not installed. Please install it with: pip install openai" - ) - + raise ImportError("OpenAI library is not installed. Please install it with: pip install openai") + is_azure_openai = isinstance(client, openai.AzureOpenAI) - create_func = client.chat.completions.create - @wraps(create_func) - def traced_create_func(*args, **kwargs): + # Patch Chat Completions API + chat_create_func = client.chat.completions.create + + @wraps(chat_create_func) + def traced_chat_create_func(*args, **kwargs): inference_id = kwargs.pop("inference_id", None) stream = kwargs.get("stream", False) @@ -66,19 +71,51 @@ def traced_create_func(*args, **kwargs): return handle_streaming_create( *args, **kwargs, - create_func=create_func, + create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, + api_type="chat_completions", ) return handle_non_streaming_create( *args, **kwargs, - create_func=create_func, + create_func=chat_create_func, inference_id=inference_id, is_azure_openai=is_azure_openai, + api_type="chat_completions", ) - client.chat.completions.create = traced_create_func + client.chat.completions.create = traced_chat_create_func + + # Patch Responses API (if available) + if hasattr(client, "responses"): + responses_create_func = client.responses.create + + @wraps(responses_create_func) + def traced_responses_create_func(*args, **kwargs): + inference_id = kwargs.pop("inference_id", None) + stream = kwargs.get("stream", False) + + if stream: + return handle_responses_streaming_create( + *args, + **kwargs, + create_func=responses_create_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + return handle_responses_non_streaming_create( + *args, + **kwargs, + create_func=responses_create_func, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + client.responses.create = traced_responses_create_func + else: + logger.debug("Responses API not available in this OpenAI client version") + return client @@ -87,6 +124,7 @@ def handle_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, + api_type: str = "chat_completions", **kwargs, ) -> Iterator[Any]: """Handles the create method when streaming is enabled. @@ -149,16 +187,12 @@ def stream_chunks( if delta.function_call.name: collected_function_call["name"] += delta.function_call.name if delta.function_call.arguments: - collected_function_call[ - "arguments" - ] += delta.function_call.arguments + collected_function_call["arguments"] += delta.function_call.arguments elif delta.tool_calls: if delta.tool_calls[0].function.name: collected_function_call["name"] += delta.tool_calls[0].function.name if delta.tool_calls[0].function.arguments: - collected_function_call["arguments"] += delta.tool_calls[ - 0 - ].function.arguments + collected_function_call["arguments"] += delta.tool_calls[0].function.arguments yield chunk end_time = time.time() @@ -169,15 +203,11 @@ def stream_chunks( finally: # Try to add step to the trace try: - collected_output_data = [ - message for message in collected_output_data if message is not None - ] + collected_output_data = [message for message in collected_output_data if message is not None] if collected_output_data: output_data = "".join(collected_output_data) else: - collected_function_call["arguments"] = json.loads( - collected_function_call["arguments"] - ) + collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) output_data = collected_function_call trace_args = create_trace_args( @@ -192,13 +222,7 @@ def stream_chunks( model_parameters=get_model_parameters(kwargs), raw_output=raw_outputs, id=inference_id, - metadata={ - "timeToFirstToken": ( - (first_token_time - start_time) * 1000 - if first_token_time - else None - ) - }, + metadata={"timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None)}, ) add_to_trace( **trace_args, @@ -263,16 +287,23 @@ def create_trace_args( return trace_args -def add_to_trace(is_azure_openai: bool = False, **kwargs) -> None: - """Add a chat completion step to the trace.""" - if is_azure_openai: - tracer.add_chat_completion_step_to_trace( - **kwargs, name="Azure OpenAI Chat Completion", provider="Azure" - ) +def add_to_trace(is_azure_openai: bool = False, api_type: str = "chat_completions", **kwargs) -> None: + """Add a chat completion or responses step to the trace.""" + # Remove api_type from kwargs to avoid passing it to the tracer + kwargs.pop("api_type", None) + + if api_type == "responses": + # Handle Responses API tracing + if is_azure_openai: + tracer.add_chat_completion_step_to_trace(**kwargs, name="Azure OpenAI Response", provider="Azure") + else: + tracer.add_chat_completion_step_to_trace(**kwargs, name="OpenAI Response", provider="OpenAI") else: - tracer.add_chat_completion_step_to_trace( - **kwargs, name="OpenAI Chat Completion", provider="OpenAI" - ) + # Handle Chat Completions API tracing (default behavior) + if is_azure_openai: + tracer.add_chat_completion_step_to_trace(**kwargs, name="Azure OpenAI Chat Completion", provider="Azure") + else: + tracer.add_chat_completion_step_to_trace(**kwargs, name="OpenAI Chat Completion", provider="OpenAI") def handle_non_streaming_create( @@ -280,8 +311,9 @@ def handle_non_streaming_create( *args, is_azure_openai: bool = False, inference_id: Optional[str] = None, + api_type: str = "chat_completions", **kwargs, -) -> "openai.types.chat.chat_completion.ChatCompletion": +) -> Union["openai.types.chat.chat_completion.ChatCompletion", Any]: """Handles the create method when streaming is disabled. Parameters @@ -325,13 +357,403 @@ def handle_non_streaming_create( ) # pylint: disable=broad-except except Exception as e: - logger.error( - "Failed to trace the create chat completion request with Openlayer. %s", e + logger.error("Failed to trace the create chat completion request with Openlayer. %s", e) + + return response + + +# -------------------------------- Responses API Handlers -------------------------------- # + + +def handle_responses_streaming_create( + create_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Iterator[Any]: + """Handles the Responses API create method when streaming is enabled. + + Parameters + ---------- + create_func : callable + The Responses API create method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Iterator[Any] + A generator that yields the chunks of the response stream. + """ + chunks = create_func(*args, **kwargs) + return stream_responses_chunks( + chunks=chunks, + kwargs=kwargs, + inference_id=inference_id, + is_azure_openai=is_azure_openai, + ) + + +def stream_responses_chunks( + chunks: Iterator[Any], + kwargs: Dict[str, any], + is_azure_openai: bool = False, + inference_id: Optional[str] = None, +): + """Streams the chunks of the Responses API and traces the response.""" + collected_output_data = [] + collected_function_call = { + "name": "", + "arguments": "", + } + raw_outputs = [] + start_time = time.time() + end_time = None + first_token_time = None + num_of_completion_tokens = None + latency = None + + try: + i = 0 + for i, chunk in enumerate(chunks): + raw_outputs.append(chunk.model_dump() if hasattr(chunk, "model_dump") else str(chunk)) + if i == 0: + first_token_time = time.time() + if i > 0: + num_of_completion_tokens = i + 1 + + # Handle different types of ResponseStreamEvent + chunk_data = extract_responses_chunk_data(chunk) + + if chunk_data.get("content"): + collected_output_data.append(chunk_data["content"]) + elif chunk_data.get("function_call"): + func_call = chunk_data["function_call"] + if func_call.get("name"): + collected_function_call["name"] += func_call["name"] + if func_call.get("arguments"): + collected_function_call["arguments"] += func_call["arguments"] + + yield chunk + + end_time = time.time() + latency = (end_time - start_time) * 1000 + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed yield chunk. %s", e) + finally: + # Try to add step to the trace + try: + collected_output_data = [message for message in collected_output_data if message is not None] + if collected_output_data: + output_data = "".join(collected_output_data) + else: + if collected_function_call["arguments"]: + try: + collected_function_call["arguments"] = json.loads(collected_function_call["arguments"]) + except json.JSONDecodeError: + # Keep as string if not valid JSON + pass + output_data = collected_function_call + + trace_args = create_trace_args( + end_time=end_time, + inputs=extract_responses_inputs(kwargs), + output=output_data, + latency=latency, + tokens=num_of_completion_tokens, + prompt_tokens=0, + completion_tokens=num_of_completion_tokens, + model=kwargs.get("model", "unknown"), + model_parameters=get_responses_model_parameters(kwargs), + raw_output=raw_outputs, + id=inference_id, + metadata={ + "timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None) + }, + ) + add_to_trace( + **trace_args, + is_azure_openai=is_azure_openai, + api_type="responses", + ) + + # pylint: disable=broad-except + except Exception as e: + logger.error( + "Failed to trace the Responses API request with Openlayer. %s", + e, + ) + + +def handle_responses_non_streaming_create( + create_func: callable, + *args, + is_azure_openai: bool = False, + inference_id: Optional[str] = None, + **kwargs, +) -> Any: + """Handles the Responses API create method when streaming is disabled. + + Parameters + ---------- + create_func : callable + The Responses API create method to handle. + is_azure_openai : bool, optional + Whether the client is an Azure OpenAI client, by default False + inference_id : Optional[str], optional + A user-generated inference id, by default None + + Returns + ------- + Any + The response object. + """ + start_time = time.time() + response = create_func(*args, **kwargs) + end_time = time.time() + + # Try to add step to the trace + try: + output_data = parse_responses_output_data(response) + usage_data = extract_responses_usage(response) + + trace_args = create_trace_args( + end_time=end_time, + inputs=extract_responses_inputs(kwargs), + output=output_data, + latency=(end_time - start_time) * 1000, + tokens=usage_data.get("total_tokens", 0), + prompt_tokens=usage_data.get("prompt_tokens", 0), + completion_tokens=usage_data.get("completion_tokens", 0), + model=getattr(response, "model", kwargs.get("model", "unknown")), + model_parameters=get_responses_model_parameters(kwargs), + raw_output=response.model_dump() if hasattr(response, "model_dump") else str(response), + id=inference_id, ) + add_to_trace( + is_azure_openai=is_azure_openai, + api_type="responses", + **trace_args, + ) + # pylint: disable=broad-except + except Exception as e: + logger.error("Failed to trace the Responses API request with Openlayer. %s", e) + return response +# -------------------------------- Responses API Helper Functions -------------------------------- # + + +def extract_responses_chunk_data(chunk: Any) -> Dict[str, Any]: + """Extract content and function call data from a ResponseStreamEvent chunk. + + Args: + chunk: A ResponseStreamEvent object + + Returns: + Dictionary with content and/or function_call data + """ + result = {} + + try: + # Handle different types of response stream events + chunk_type = getattr(chunk, "type", None) + + if chunk_type == "response.text.delta": + # Text content delta + if hasattr(chunk, "delta") and hasattr(chunk.delta, "text"): + result["content"] = chunk.delta.text + elif chunk_type == "response.function_call.arguments.delta": + # Function call arguments delta + if hasattr(chunk, "delta"): + result["function_call"] = {"arguments": chunk.delta} + elif chunk_type == "response.function_call.name": + # Function call name + if hasattr(chunk, "name"): + result["function_call"] = {"name": chunk.name} + elif hasattr(chunk, "choices") and chunk.choices: + # Fallback to chat-style format if available + choice = chunk.choices[0] + if hasattr(choice, "delta"): + delta = choice.delta + if hasattr(delta, "content") and delta.content: + result["content"] = delta.content + elif hasattr(delta, "function_call"): + func_call = {} + if hasattr(delta.function_call, "name") and delta.function_call.name: + func_call["name"] = delta.function_call.name + if hasattr(delta.function_call, "arguments") and delta.function_call.arguments: + func_call["arguments"] = delta.function_call.arguments + if func_call: + result["function_call"] = func_call + + except Exception as e: + logger.debug("Could not extract chunk data from ResponseStreamEvent: %s", e) + + return result + + +def extract_responses_inputs(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Extract inputs from Responses API parameters. + + Formats the input as a messages array similar to Chat Completions API format: + {"prompt": [{"role": "user", "content": "..."}]} + + Args: + kwargs: The parameters passed to the Responses API + + Returns: + Dictionary with prompt as a messages array + """ + messages = [] + + # Handle different input formats for Responses API + if "conversation" in kwargs: + # Conversation is already in messages format + conversation = kwargs["conversation"] + if isinstance(conversation, list): + messages = conversation + else: + # Single message, wrap it + messages = [{"role": "user", "content": str(conversation)}] + else: + # Build messages array from available parameters + if "instructions" in kwargs: + messages.append({"role": "system", "content": kwargs["instructions"]}) + + if "input" in kwargs: + messages.append({"role": "user", "content": kwargs["input"]}) + elif "prompt" in kwargs: + messages.append({"role": "user", "content": kwargs["prompt"]}) + + # If no user message was added, create a fallback + if not any(msg.get("role") == "user" for msg in messages): + if messages: + # Only system message, add empty user message + messages.append({"role": "user", "content": ""}) + else: + # No messages at all, add placeholder + messages.append({"role": "user", "content": "No input provided"}) + + return {"prompt": messages} + + +def parse_responses_output_data(response: Any) -> Union[str, Dict[str, Any], None]: + """Parses the output data from a Responses API response. + + Args: + response: The Response object from the Responses API + + Returns: + The parsed output data + """ + try: + # Handle Response object structure - check for output first (Responses API structure) + if hasattr(response, "output") and response.output: + if isinstance(response.output, list) and response.output: + # Handle list of output messages + first_output = response.output[0] + if hasattr(first_output, "content") and first_output.content: + # Extract text from content list + if isinstance(first_output.content, list) and first_output.content: + text_content = first_output.content[0] + if hasattr(text_content, "text"): + return text_content.text.strip() + elif hasattr(first_output.content, "text"): + return first_output.content.text.strip() + else: + return str(first_output.content).strip() + elif hasattr(first_output, "text"): + return first_output.text.strip() + elif hasattr(response.output, "text"): + return response.output.text.strip() + elif hasattr(response.output, "content"): + return str(response.output.content).strip() + + # Handle Chat Completions style structure (fallback) + if hasattr(response, "choices") and response.choices: + choice = response.choices[0] + if hasattr(choice, "message"): + message = choice.message + if hasattr(message, "content") and message.content: + return message.content.strip() + elif hasattr(message, "function_call"): + return { + "name": message.function_call.name, + "arguments": json.loads(message.function_call.arguments) + if message.function_call.arguments + else {}, + } + elif hasattr(message, "tool_calls") and message.tool_calls: + tool_call = message.tool_calls[0] + return { + "name": tool_call.function.name, + "arguments": json.loads(tool_call.function.arguments) if tool_call.function.arguments else {}, + } + + # Handle direct text response + if hasattr(response, "text") and response.text: + return response.text.strip() + + except Exception as e: + logger.debug("Could not parse Responses API output data: %s", e) + + return None + + +def extract_responses_usage(response: Any) -> Dict[str, int]: + """Extract token usage from a Responses API response. + + Args: + response: The Response object from the Responses API + + Returns: + Dictionary with token usage information + """ + usage = {"total_tokens": 0, "prompt_tokens": 0, "completion_tokens": 0} + + try: + if hasattr(response, "usage"): + usage_obj = response.usage + # Handle ResponseUsage object with different attribute names + usage["total_tokens"] = getattr(usage_obj, "total_tokens", 0) + # ResponseUsage uses 'input_tokens' instead of 'prompt_tokens' + usage["prompt_tokens"] = getattr(usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0)) + # ResponseUsage uses 'output_tokens' instead of 'completion_tokens' + usage["completion_tokens"] = getattr(usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0)) + elif hasattr(response, "token_usage"): + # Alternative usage attribute name + usage_obj = response.token_usage + usage["total_tokens"] = getattr(usage_obj, "total_tokens", 0) + usage["prompt_tokens"] = getattr(usage_obj, "input_tokens", getattr(usage_obj, "prompt_tokens", 0)) + usage["completion_tokens"] = getattr(usage_obj, "output_tokens", getattr(usage_obj, "completion_tokens", 0)) + except Exception as e: + logger.debug("Could not extract token usage from Responses API response: %s", e) + + return usage + + +def get_responses_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """Gets the model parameters from Responses API kwargs.""" + return { + "max_output_tokens": kwargs.get("max_output_tokens"), + "temperature": kwargs.get("temperature", 1), + "top_p": kwargs.get("top_p", 1), + "reasoning": kwargs.get("reasoning"), + "parallel_tool_calls": kwargs.get("parallel_tool_calls"), + "max_tool_calls": kwargs.get("max_tool_calls"), + "background": kwargs.get("background"), + "truncation": kwargs.get("truncation"), + "include": kwargs.get("include"), + } + + def parse_non_streaming_output_data( response: "openai.types.chat.chat_completion.ChatCompletion", ) -> Union[str, Dict[str, Any], None]: @@ -369,18 +791,14 @@ def parse_non_streaming_output_data( # --------------------------- OpenAI Assistants API -------------------------- # -def trace_openai_assistant_thread_run( - client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run" -) -> None: +def trace_openai_assistant_thread_run(client: "openai.OpenAI", run: "openai.types.beta.threads.run.Run") -> None: """Trace a run from an OpenAI assistant. Once the run is completed, the thread data is published to Openlayer, along with the latency, and number of tokens used.""" if not HAVE_OPENAI: - raise ImportError( - "OpenAI library is not installed. Please install it with: pip install openai" - ) - + raise ImportError("OpenAI library is not installed. Please install it with: pip install openai") + _type_check_run(run) # Do nothing if the run is not completed @@ -393,9 +811,7 @@ def trace_openai_assistant_thread_run( metadata = _extract_run_metadata(run) # Convert thread to prompt - messages = client.beta.threads.messages.list( - thread_id=run.thread_id, order="asc" - ) + messages = client.beta.threads.messages.list(thread_id=run.thread_id, order="asc") prompt = _thread_messages_to_prompt(messages) # Add step to the trace @@ -410,7 +826,7 @@ def trace_openai_assistant_thread_run( # pylint: disable=broad-except except Exception as e: - print(f"Failed to monitor run. {e}") # noqa: T201 + logger.error("Failed to monitor run. %s", e) def _type_check_run(run: "openai.types.beta.threads.run.Run") -> None: