@@ -1290,8 +1290,13 @@ struct server_context_impl {
12901290 res->id_slot = slot.id ;
12911291
12921292 res->index = slot.task ->index ;
1293- res->content = slot.generated_text ;
1294- res->tokens = std::move (slot.generated_tokens );
1293+ // in stream mode, content and tokens are already in last partial chunk
1294+ res->content = slot.task ->params .stream ? " " : slot.generated_text ;
1295+ if (slot.task ->params .stream ) {
1296+ res->tokens = llama_tokens{};
1297+ } else {
1298+ res->tokens = std::move (slot.generated_tokens );
1299+ }
12951300 res->timings = slot.get_timings ();
12961301 res->prompt = slot.task ->tokens .detokenize (ctx, true );
12971302 res->response_fields = std::move (slot.task ->params .response_fields );
@@ -2603,9 +2608,9 @@ static std::unique_ptr<server_res_generator> handle_completions_impl(
26032608 task.params .res_type = res_type;
26042609 task.params .oaicompat_cmpl_id = completion_id;
26052610 task.params .oaicompat_model = ctx_server.model_name ;
2611+ states.emplace_back (task.params .oaicompat_chat_syntax );
26062612
26072613 tasks.push_back (std::move (task));
2608- states.emplace_back (task.params .oaicompat_chat_syntax );
26092614 }
26102615
26112616 rd.post_tasks (std::move (tasks));
@@ -2664,70 +2669,83 @@ static std::unique_ptr<server_res_generator> handle_completions_impl(
26642669 res->status = 200 ;
26652670 res->content_type = " text/event-stream" ;
26662671 res->next = [res_this = res.get (), res_type, &should_stop, states = std::move (states)](std::string & output) mutable -> bool {
2667- if (should_stop ()) {
2668- SRV_DBG (" %s" , " stopping streaming due to should_stop condition\n " );
2669- return false ; // should_stop condition met
2670- }
2671-
2672- if (!res_this->data .empty ()) {
2673- // flush the first chunk
2674- output = std::move (res_this->data );
2675- res_this->data .clear ();
2676- return true ;
2677- }
2678-
2679- server_response_reader & rd = res_this->rd ;
2680-
2681- // check if there is more data
2682- if (!rd.has_next ()) {
2672+ static auto format_error = [](task_response_type res_type, const json & res_json) {
26832673 if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) {
2684- // Anthropic doesn't send [DONE], message_stop was already sent
2685- output = " " ;
2686- } else if (res_type != TASK_RESPONSE_TYPE_NONE) {
2687- output = " data: [DONE]\n\n " ;
2688- } else {
2689- output = " " ;
2690- }
2691- SRV_DBG (" %s" , " all results received, terminating stream\n " );
2692- return false ; // no more data, terminate
2693- }
2694-
2695- // receive subsequent results
2696- auto result = rd.next (should_stop);
2697- if (result == nullptr ) {
2698- SRV_DBG (" %s" , " stopping streaming due to should_stop condition\n " );
2699- return false ; // should_stop condition met
2700- }
2701-
2702- // send the results
2703- if (result->is_error ()) {
2704- json res_json = result->to_json ();
2705- if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) {
2706- output = format_anthropic_sse ({
2674+ return format_anthropic_sse ({
27072675 {" event" , " error" },
27082676 {" data" , res_json},
27092677 });
27102678 } else {
2711- output = format_oai_sse (json {{ " error" , res_json }});
2679+ return format_oai_sse (json {{ " error" , res_json }});
27122680 }
2713- SRV_DBG (" %s" , " error received during streaming, terminating stream\n " );
2714- return false ; // terminate on error
2715- } else {
2716- GGML_ASSERT (
2717- dynamic_cast <server_task_result_cmpl_partial*>(result.get ()) != nullptr
2718- || dynamic_cast <server_task_result_cmpl_final*>(result.get ()) != nullptr
2719- );
2720- result->update (states[result->get_index ()]); // update generation state
2721- json res_json = result->to_json ();
2722- if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) {
2723- output = format_anthropic_sse (res_json);
2681+ };
2682+
2683+ try {
2684+ if (should_stop ()) {
2685+ SRV_DBG (" %s" , " stopping streaming due to should_stop condition\n " );
2686+ return false ; // should_stop condition met
2687+ }
2688+
2689+ if (!res_this->data .empty ()) {
2690+ // flush the first chunk
2691+ output = std::move (res_this->data );
2692+ res_this->data .clear ();
2693+ return true ;
2694+ }
2695+
2696+ server_response_reader & rd = res_this->rd ;
2697+
2698+ // check if there is more data
2699+ if (!rd.has_next ()) {
2700+ if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) {
2701+ // Anthropic doesn't send [DONE], message_stop was already sent
2702+ output = " " ;
2703+ } else if (res_type != TASK_RESPONSE_TYPE_NONE) {
2704+ output = " data: [DONE]\n\n " ;
2705+ } else {
2706+ output = " " ;
2707+ }
2708+ SRV_DBG (" %s" , " all results received, terminating stream\n " );
2709+ return false ; // no more data, terminate
2710+ }
2711+
2712+ // receive subsequent results
2713+ auto result = rd.next (should_stop);
2714+ if (result == nullptr ) {
2715+ SRV_DBG (" %s" , " stopping streaming due to should_stop condition\n " );
2716+ return false ; // should_stop condition met
2717+ }
2718+
2719+ // send the results
2720+ if (result->is_error ()) {
2721+ json res_json = result->to_json ();
2722+ output = format_error (res_type, res_json);
2723+ SRV_DBG (" %s" , " error received during streaming, terminating stream\n " );
2724+ return false ; // terminate on error
27242725 } else {
2725- output = format_oai_sse (res_json);
2726+ GGML_ASSERT (
2727+ dynamic_cast <server_task_result_cmpl_partial*>(result.get ()) != nullptr
2728+ || dynamic_cast <server_task_result_cmpl_final*>(result.get ()) != nullptr
2729+ );
2730+ result->update (states[result->get_index ()]); // update generation state
2731+ json res_json = result->to_json ();
2732+ if (res_type == TASK_RESPONSE_TYPE_ANTHROPIC) {
2733+ output = format_anthropic_sse (res_json);
2734+ } else {
2735+ output = format_oai_sse (res_json);
2736+ }
27262737 }
2727- }
27282738
2729- // has next data, continue
2730- return true ;
2739+ // has next data, continue
2740+ return true ;
2741+
2742+ } catch (const std::exception & e) {
2743+ json error_json = format_error_response (e.what (), ERROR_TYPE_SERVER);
2744+ output = format_error (res_type, error_json);
2745+
2746+ // terminate on exception
2747+ return false ;
2748+ }
27312749 };
27322750 }
27332751
0 commit comments