22# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
33from contextlib import contextmanager
44
5+ import numpy as np
56import torch
67
78from vllm .v1 .outputs import (
89 AsyncModelRunnerOutput ,
910 LogprobsTensors ,
1011 ModelRunnerOutput ,
11- SamplerOutput ,
1212)
13+ from vllm .v1 .worker .gpu .sample .output import SamplerOutput
1314
1415
1516class AsyncOutput (AsyncModelRunnerOutput ):
@@ -34,29 +35,18 @@ def __init__(
3435 with torch .cuda .stream (self .copy_stream ):
3536 self .copy_stream .wait_stream (default_stream )
3637
37- # NOTE(woosuk): We must ensure that CPU tensors are not freed
38- # before the device-to-host copy is fully completed. For instance,
39- # operations like
40- # self.sampled_token_np = ...to("cpu", non_blocking=True).numpy()
41- # are unsafe because the underlying CPU tensor can be prematurely freed and
42- # reused by other tensors before the asynchronous copy finishes, potentially
43- # causing race conditions. To prevent this, we delay freeing by holding
44- # references until the copy event signals completion.
45- # Likewise, we also need to keep the reference to the GPU tensors.
46- # This is done by keeping the reference to sampler_output and
47- # model_runner_output.
48- self .sampled_token_ids = sampler_output .sampled_token_ids .to (
49- "cpu" , non_blocking = True
50- )
38+ self .sampled_token_ids = async_copy_to_np (sampler_output .sampled_token_ids )
5139 if sampler_output .logprobs_tensors is not None :
5240 self .logprobs_tensors : LogprobsTensors | None = (
5341 sampler_output .logprobs_tensors .to_cpu_nonblocking ()
5442 )
5543 else :
5644 self .logprobs_tensors = None
57- self .num_sampled_tokens_cpu = num_sampled_tokens .to (
58- "cpu" , non_blocking = True
59- )
45+ if sampler_output .num_nans is not None :
46+ self .num_nans = async_copy_to_np (sampler_output .num_nans )
47+ else :
48+ self .num_nans = None
49+ self .num_sampled_tokens_np = async_copy_to_np (num_sampled_tokens )
6050 self .prompt_logprobs_dict : dict [str , LogprobsTensors | None ] = {}
6151 if self .model_runner_output .prompt_logprobs_dict :
6252 for k , v in self .model_runner_output .prompt_logprobs_dict .items ():
@@ -68,18 +58,25 @@ def __init__(
6858
6959 def get_output (self ) -> ModelRunnerOutput :
7060 self .copy_event .synchronize ()
71- num_sampled_tokens_np = self .num_sampled_tokens_cpu .numpy ()
7261
7362 # NOTE(woosuk): The following code is to ensure compatibility with
7463 # the existing model runner.
7564 # Going forward, we should keep the data structures as NumPy arrays
7665 # rather than Python lists.
7766 sampled_token_ids : list [list [int ]] = self .sampled_token_ids .tolist ()
7867 num_reqs = len (sampled_token_ids )
68+ num_sampled_tokens = self .num_sampled_tokens_np .tolist ()
7969 for i in range (num_reqs ):
80- del sampled_token_ids [i ][num_sampled_tokens_np [i ] :]
70+ del sampled_token_ids [i ][num_sampled_tokens [i ] :]
8171 self .model_runner_output .sampled_token_ids = sampled_token_ids
8272
73+ if self .num_nans is not None :
74+ num_nans = self .num_nans .tolist ()
75+ self .model_runner_output .num_nans_in_logits = {
76+ req_id : num_nans [i ]
77+ for i , req_id in enumerate (self .model_runner_output .req_ids )
78+ }
79+
8380 if self .logprobs_tensors is not None :
8481 self .model_runner_output .logprobs = self .logprobs_tensors .tolists ()
8582 self .model_runner_output .prompt_logprobs_dict = self .prompt_logprobs_dict
@@ -95,3 +92,7 @@ def async_barrier(event: torch.cuda.Event | None):
9592 finally :
9693 if event is not None :
9794 event .record ()
95+
96+
97+ def async_copy_to_np (x : torch .Tensor ) -> np .ndarray :
98+ return x .to ("cpu" , non_blocking = True ).numpy ()
0 commit comments