diff --git a/api/core/model_runtime/model_providers/__base/large_language_model.py b/api/core/model_runtime/model_providers/__base/large_language_model.py index bbbdec61d1..c32ab0879e 100644 --- a/api/core/model_runtime/model_providers/__base/large_language_model.py +++ b/api/core/model_runtime/model_providers/__base/large_language_model.py @@ -83,19 +83,21 @@ def _merge_tool_call_delta( tool_call.function.arguments += delta.function.arguments -def _build_llm_result_from_first_chunk( +def _build_llm_result_from_chunks( model: str, prompt_messages: Sequence[PromptMessage], chunks: Iterator[LLMResultChunk], ) -> LLMResult: """ - Build a single `LLMResult` from the first returned chunk. + Build a single `LLMResult` by accumulating all returned chunks. - This is used for `stream=False` because the plugin side may still implement the response via a chunked stream. + Some models only support streaming output (e.g. Qwen3 open-source edition) + and the plugin side may still implement the response via a chunked stream, + so all chunks must be consumed and concatenated into a single ``LLMResult``. - Note: - This function always drains the `chunks` iterator after reading the first chunk to ensure any underlying - streaming resources are released (e.g., HTTP connections owned by the plugin runtime). + The ``usage`` is taken from the last chunk that carries it, which is the + typical convention for streaming responses (the final chunk contains the + aggregated token counts). """ content = "" content_list: list[PromptMessageContentUnionTypes] = [] @@ -104,24 +106,27 @@ def _build_llm_result_from_first_chunk( tools_calls: list[AssistantPromptMessage.ToolCall] = [] try: - first_chunk = next(chunks, None) - if first_chunk is not None: - if isinstance(first_chunk.delta.message.content, str): - content += first_chunk.delta.message.content - elif isinstance(first_chunk.delta.message.content, list): - content_list.extend(first_chunk.delta.message.content) + for chunk in chunks: + if isinstance(chunk.delta.message.content, str): + content += chunk.delta.message.content + elif isinstance(chunk.delta.message.content, list): + content_list.extend(chunk.delta.message.content) - if first_chunk.delta.message.tool_calls: - _increase_tool_call(first_chunk.delta.message.tool_calls, tools_calls) + if chunk.delta.message.tool_calls: + _increase_tool_call(chunk.delta.message.tool_calls, tools_calls) - usage = first_chunk.delta.usage or LLMUsage.empty_usage() - system_fingerprint = first_chunk.system_fingerprint + if chunk.delta.usage: + usage = chunk.delta.usage + if chunk.system_fingerprint: + system_fingerprint = chunk.system_fingerprint + except Exception: + logger.exception("Error while consuming non-stream plugin chunk iterator.") + raise finally: - try: - for _ in chunks: - pass - except Exception: - logger.debug("Failed to drain non-stream plugin chunk iterator.", exc_info=True) + # Drain any remaining chunks to release underlying streaming resources (e.g. HTTP connections). + close = getattr(chunks, "close", None) + if callable(close): + close() return LLMResult( model=model, @@ -174,7 +179,7 @@ def _normalize_non_stream_plugin_result( ) -> LLMResult: if isinstance(result, LLMResult): return result - return _build_llm_result_from_first_chunk(model=model, prompt_messages=prompt_messages, chunks=result) + return _build_llm_result_from_chunks(model=model, prompt_messages=prompt_messages, chunks=result) def _increase_tool_call( diff --git a/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py b/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py index cfdeef6a8d..09d527cb12 100644 --- a/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py +++ b/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py @@ -103,16 +103,16 @@ def test__normalize_non_stream_plugin_result__empty_iterator_defaults(): assert result.system_fingerprint is None -def test__normalize_non_stream_plugin_result__closes_chunk_iterator(): +def test__normalize_non_stream_plugin_result__accumulates_all_chunks(): + """All chunks are accumulated from the iterator.""" prompt_messages = [UserPromptMessage(content="hi")] - chunk = _make_chunk(content="hello", usage=LLMUsage.empty_usage()) closed: list[bool] = [] def _chunk_iter(): try: - yield chunk - yield _make_chunk(content="ignored", usage=LLMUsage.empty_usage()) + yield _make_chunk(content="hello", usage=LLMUsage.empty_usage()) + yield _make_chunk(content=" world", usage=LLMUsage.empty_usage()) finally: closed.append(True) @@ -122,5 +122,5 @@ def test__normalize_non_stream_plugin_result__closes_chunk_iterator(): result=_chunk_iter(), ) - assert result.message.content == "hello" + assert result.message.content == "hello world" assert closed == [True]