From 1ca28b4d457eeecb50ab13eb3e59e52612a28aa9 Mon Sep 17 00:00:00 2001 From: niushengxiao Date: Wed, 4 Feb 2026 15:46:59 +0800 Subject: [PATCH 1/2] fix: fix mem leak --- lightllm/common/req_manager.py | 26 ++-- lightllm/server/core/objs/req.py | 10 ++ lightllm/server/core/objs/shm_array.py | 6 + lightllm/server/detokenization/manager.py | 1 + lightllm/server/httpserver/manager.py | 2 + .../router/dynamic_prompt/radix_cache.py | 11 +- .../server/router/model_infer/infer_batch.py | 1 + .../mode_backend/generic_post_process.py | 17 ++- test/benchmark/service/benchmark_qps.py | 136 +++++++++++++----- 9 files changed, 146 insertions(+), 64 deletions(-) diff --git a/lightllm/common/req_manager.py b/lightllm/common/req_manager.py index 40c8aa993e..33bdca4475 100644 --- a/lightllm/common/req_manager.py +++ b/lightllm/common/req_manager.py @@ -7,6 +7,7 @@ from lightllm.common.basemodel.triton_kernel.gen_sampling_params import update_req_to_token_id_counter from lightllm.utils.envs_utils import enable_env_vars, get_env_start_args from lightllm.utils.config_utils import get_vocab_size +from lightllm.server.router.model_infer.pin_mem_manager import g_pin_mem_manager logger = init_logger(__name__) @@ -155,7 +156,11 @@ def init_req_sampling_params(self, req): else: self.req_to_out_token_id_counter[req.req_idx].fill_(0) if req.sampling_param.shm_param.input_penalty and req.need_out_token_id_statistics: - prompt_ids = torch.from_numpy(req.shm_req.get_prompt_ids_numpy()).pin_memory().cuda(non_blocking=True) + prompt_ids = g_pin_mem_manager.gen_from_list( + key="prompt_ids_for_penalty", + data=req.shm_req.get_prompt_ids_numpy(), + dtype=torch.int32, + ).cuda(non_blocking=True) token_id_counter( prompt_ids=prompt_ids, out_token_id_counter=self.req_to_out_token_id_counter[req.req_idx] ) @@ -214,22 +219,13 @@ def gen_cpu_out_token_counter_sampling_params(self, req_objs: List): cum_sum_len += len(id_to_count) p_cumsum_seq_len.append(cum_sum_len) - from lightllm.server.router.model_infer.pin_mem_manager import g_pin_mem_manager - - p_token_ids_tensor = g_pin_mem_manager.alloc_pin_tensor( - key="p_token_ids", size=len(p_token_ids), dtype=torch.int32 - ) - p_token_ids_tensor.numpy()[:] = p_token_ids - - p_token_counts_tensor = g_pin_mem_manager.alloc_pin_tensor( - key="p_token_counts", size=len(p_token_counts), dtype=torch.int32 + p_token_ids_tensor = g_pin_mem_manager.gen_from_list(key="p_token_ids", data=p_token_ids, dtype=torch.int32) + p_token_counts_tensor = g_pin_mem_manager.gen_from_list( + key="p_token_counts", data=p_token_counts, dtype=torch.int32 ) - p_token_counts_tensor.numpy()[:] = p_token_counts - - p_cumsum_seq_len_tensor = g_pin_mem_manager.alloc_pin_tensor( - key="p_cumsum_seq_len", size=len(p_cumsum_seq_len), dtype=torch.int32 + p_cumsum_seq_len_tensor = g_pin_mem_manager.gen_from_list( + key="p_cumsum_seq_len", data=p_cumsum_seq_len, dtype=torch.int32 ) - p_cumsum_seq_len_tensor.numpy()[:] = p_cumsum_seq_len return ( p_token_ids_tensor.cuda(non_blocking=True), diff --git a/lightllm/server/core/objs/req.py b/lightllm/server/core/objs/req.py index f489aac9c2..40dc6cd98e 100644 --- a/lightllm/server/core/objs/req.py +++ b/lightllm/server/core/objs/req.py @@ -227,6 +227,16 @@ def link_logprobs_shm_array(self): self.shm_logprobs.link_shm() return + def release_shm_arrays(self): + """释放共享内存连接,防止内存泄露(仅 detach,不 unlink)""" + if hasattr(self, "shm_prompt_ids") and self.shm_prompt_ids is not None: + self.shm_prompt_ids.detach_shm() + self.shm_prompt_ids = None + if hasattr(self, "shm_logprobs") and self.shm_logprobs is not None: + self.shm_logprobs.detach_shm() + self.shm_logprobs = None + return + def get_prompt_ids(self): return self.shm_prompt_ids.arr[: self.input_len].tolist() diff --git a/lightllm/server/core/objs/shm_array.py b/lightllm/server/core/objs/shm_array.py index c5ad512c6b..0759e36816 100644 --- a/lightllm/server/core/objs/shm_array.py +++ b/lightllm/server/core/objs/shm_array.py @@ -32,3 +32,9 @@ def close_shm(self): self.shm.unlink() self.shm = None self.arr = None + + def detach_shm(self): + if self.shm is not None: + self.shm.close() + self.shm = None + self.arr = None diff --git a/lightllm/server/detokenization/manager.py b/lightllm/server/detokenization/manager.py index 389171ba8a..f9027bb6eb 100644 --- a/lightllm/server/detokenization/manager.py +++ b/lightllm/server/detokenization/manager.py @@ -161,6 +161,7 @@ def remove_finished_reqs(self): for decode_req in finished_reqs: decode_req.req.can_released_mark = True logger.info(f"detoken release req id {decode_req.req.request_id}") + decode_req.req.release_shm_arrays() self.shm_req_manager.put_back_req_obj(decode_req.req) self.req_id_to_out.pop(decode_req.request_id, None) return diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 212e037e90..353d97d779 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -686,6 +686,8 @@ async def recycle_resource_loop(self): for req_status in release_req_status: self.req_id_to_out_inf.pop(req_status.group_req_objs.group_req_id, None) for req in req_status.group_req_objs.shm_req_objs: + req.shm_prompt_ids.close_shm() + req.shm_logprobs.close_shm() await self.shm_req_manager.async_put_back_req_obj(req) await self.shm_req_manager.async_release_req_index(req.index_in_shm_mem) await self._release_multimodal_resources(req_status.group_req_objs.multimodal_params) diff --git a/lightllm/server/router/dynamic_prompt/radix_cache.py b/lightllm/server/router/dynamic_prompt/radix_cache.py index c517748984..2d2c820c29 100644 --- a/lightllm/server/router/dynamic_prompt/radix_cache.py +++ b/lightllm/server/router/dynamic_prompt/radix_cache.py @@ -38,8 +38,8 @@ def split_node(self, prefix_len): split_parent_node = TreeNode() split_parent_node.parent = self.parent split_parent_node.parent.children[self.token_id_key[0].item()] = split_parent_node - split_parent_node.token_id_key = self.token_id_key[0:prefix_len] - split_parent_node.token_mem_index_value = self.token_mem_index_value[0:prefix_len] + split_parent_node.token_id_key = self.token_id_key[0:prefix_len].clone() + split_parent_node.token_mem_index_value = self.token_mem_index_value[0:prefix_len].clone() split_parent_node.children = {} split_parent_node.children[self.token_id_key[prefix_len].item()] = self split_parent_node.ref_counter = self.ref_counter @@ -58,8 +58,8 @@ def split_node(self, prefix_len): def add_and_return_new_child(self, token_id_key, token_mem_index_value): child = TreeNode() - child.token_id_key = token_id_key - child.token_mem_index_value = token_mem_index_value + child.token_id_key = token_id_key.clone() + child.token_mem_index_value = token_mem_index_value.clone() first_token_key = child.token_id_key[0].item() assert first_token_key not in self.children.keys() self.children[first_token_key] = child @@ -241,7 +241,8 @@ def match_prefix(self, key, update_refs=False): value = torch.zeros((0,), device="cpu", dtype=self._value_dtype) return tree_node, len(value), value else: - self.dec_node_ref_counter(self.root_node) + if update_refs: + self.dec_node_ref_counter(self.root_node) return None, 0, None def _match_prefix_helper( diff --git a/lightllm/server/router/model_infer/infer_batch.py b/lightllm/server/router/model_infer/infer_batch.py index 4b8b3c538f..dc7b4d186b 100644 --- a/lightllm/server/router/model_infer/infer_batch.py +++ b/lightllm/server/router/model_infer/infer_batch.py @@ -160,6 +160,7 @@ def _filter(self, finished_request_ids: List[int]): free_req_index.append(req.req_idx) # logger.info(f"infer release req id {req.shm_req.request_id}") req.shm_req.shm_infer_released = True + req.shm_req.release_shm_arrays() self.shm_req_manager.put_back_req_obj(req.shm_req) free_token_index = custom_cat(free_token_index) diff --git a/lightllm/server/router/model_infer/mode_backend/generic_post_process.py b/lightllm/server/router/model_infer/mode_backend/generic_post_process.py index e2ccf290e8..ca3901ebd0 100644 --- a/lightllm/server/router/model_infer/mode_backend/generic_post_process.py +++ b/lightllm/server/router/model_infer/mode_backend/generic_post_process.py @@ -3,6 +3,7 @@ from lightllm.common.basemodel.triton_kernel.apply_penalty import apply_penalty from lightllm.common.basemodel.triton_kernel.apply_penalty_gpu_cache import apply_penalty_gpu_cache from lightllm.server.router.model_infer.infer_batch import InferReq, g_infer_context +from lightllm.server.router.model_infer.pin_mem_manager import g_pin_mem_manager from lightllm.utils.envs_utils import get_env_start_args @@ -16,7 +17,7 @@ def sample(logits: torch.Tensor, reqs: List[InferReq], eos_id: List[int] = [2]): b_mask_eos_reqs, is_all_greedy, ) = _get_post_sample_tensors(reqs) - eos_ids = torch.tensor(eos_id, dtype=torch.int32, device="cpu", pin_memory=True).cuda(non_blocking=True) + eos_ids = g_pin_mem_manager.gen_from_list(key="eos_ids", data=eos_id, dtype=torch.int32).cuda(non_blocking=True) sampling_params_manager = g_infer_context.req_manager.req_sampling_params_manager @@ -128,12 +129,14 @@ def _get_post_sample_tensors(reqs: List[InferReq]): is_all_greedy = False req_idxes.append(req_obj.req_idx) - req_idxes_cpu = torch.tensor(req_idxes, dtype=torch.int32, device="cpu", pin_memory=True) - temperatures_cpu = torch.tensor(temperatures, dtype=torch.float, device="cpu", pin_memory=True) - top_ps_cpu = torch.tensor(top_ps, dtype=torch.float, device="cpu", pin_memory=True) - top_ks_cpu = torch.tensor(top_ks, dtype=torch.int32, device="cpu", pin_memory=True) - length_penalty_param_cpu = torch.tensor(length_penalty_param, dtype=torch.int32, device="cpu", pin_memory=True) - mask_eos_reqs_cpu = torch.tensor(mask_eos_reqs, dtype=torch.bool, device="cpu", pin_memory=True) + req_idxes_cpu = g_pin_mem_manager.gen_from_list(key="req_idxes", data=req_idxes, dtype=torch.int32) + temperatures_cpu = g_pin_mem_manager.gen_from_list(key="temperatures", data=temperatures, dtype=torch.float32) + top_ps_cpu = g_pin_mem_manager.gen_from_list(key="top_ps", data=top_ps, dtype=torch.float32) + top_ks_cpu = g_pin_mem_manager.gen_from_list(key="top_ks", data=top_ks, dtype=torch.int32) + length_penalty_param_cpu = g_pin_mem_manager.gen_from_list( + key="length_penalty_param", data=length_penalty_param, dtype=torch.int32 + ) + mask_eos_reqs_cpu = g_pin_mem_manager.gen_from_list(key="mask_eos_reqs", data=mask_eos_reqs, dtype=torch.bool) return ( req_idxes_cpu.cuda(non_blocking=True), diff --git a/test/benchmark/service/benchmark_qps.py b/test/benchmark/service/benchmark_qps.py index abf312ee21..8249ae2c49 100644 --- a/test/benchmark/service/benchmark_qps.py +++ b/test/benchmark/service/benchmark_qps.py @@ -103,6 +103,10 @@ def get_custom_input_data(data_path, output_len, tokenizer, range_ratio): model_name = [] +# Minimal fix: one retry on transient network errors. +_DEFAULT_RETRY = 1 + + async def async_post_stream_openai(url, prompt, max_new_tokens, session): try: text_input, input_len = prompt @@ -116,21 +120,34 @@ async def async_post_stream_openai(url, prompt, max_new_tokens, session): "best_of": 1, } headers = {"Content-Type": "application/json"} - used_time = [] - start_time = time.time() - last_time = start_time - async with session.post(url, headers=headers, json=data) as response: - if response.status != 200: - return [] - - async for line in response.content: - line = line.strip() - if line: - current_time = time.time() - elapsed_time = current_time - last_time - used_time.append(elapsed_time) - last_time = current_time - return used_time, input_len + + for attempt in range(_DEFAULT_RETRY + 1): + used_time = [] + start_time = time.time() + last_time = start_time + try: + async with session.post(url, headers=headers, json=data) as response: + if response.status != 200: + return [] + + try: + async for line in response.content: + line = line.strip() + if line: + current_time = time.time() + elapsed_time = current_time - last_time + used_time.append(elapsed_time) + last_time = current_time + except Exception: + # server may disconnect mid-stream; keep partial timings if any. + pass + + if used_time or attempt >= _DEFAULT_RETRY: + return used_time, input_len + except Exception as e: + if attempt >= _DEFAULT_RETRY: + print(e) + return [] except Exception as e: print(e) pass @@ -149,21 +166,33 @@ async def async_post_stream_lightllm(url, prompt, max_new_tokens, session): }, } headers = {"Content-Type": "application/json"} - used_time = [] - start_time = time.time() - last_time = start_time - async with session.post(url, headers=headers, json=data) as response: - if response.status != 200: - return [] - - async for line in response.content: - if line and line.startswith(b"data:"): - # print(line) - current_time = time.time() - elapsed_time = current_time - last_time - used_time.append(elapsed_time) - last_time = current_time - return used_time, input_len + + for attempt in range(_DEFAULT_RETRY + 1): + used_time = [] + start_time = time.time() + last_time = start_time + try: + async with session.post(url, headers=headers, json=data) as response: + if response.status != 200: + return [] + + try: + async for line in response.content: + if line and line.startswith(b"data:"): + current_time = time.time() + elapsed_time = current_time - last_time + used_time.append(elapsed_time) + last_time = current_time + except Exception: + # server may disconnect mid-stream; keep partial timings if any. + pass + + if used_time or attempt >= _DEFAULT_RETRY: + return used_time, input_len + except Exception as e: + if attempt >= _DEFAULT_RETRY: + print(e) + return [] except Exception as e: print(e) pass @@ -187,6 +216,7 @@ async def continuous_sender( while not stop_send.is_set(): if not continuous_send and sent_count[0] >= max_count: break + prompt = prompts[prompt_index % len(prompts)] max_tokens = max_new_tokens[prompt_index % len(max_new_tokens)] @@ -212,18 +242,42 @@ async def response_collector( force_terminate, pending_tasks, ): + # 单个请求在 collector 侧的最大等待时间,避免网络异常导致永久卡住 + task_timeout_s = 600 try: while True: try: task = await asyncio.wait_for(request_queue.get(), timeout=1.0) - result, input_len = await task - request_queue.task_done() - assert result is not None - if len(result) >= 1 and not stop_send.is_set(): - results.append((result, input_len)) + result = None + input_len = 0 + try: + try: + result_tuple = await asyncio.wait_for(task, timeout=task_timeout_s) + except asyncio.TimeoutError: + print("\nError collecting response: task timeout") + if not task.done(): + task.cancel() + result_tuple = None + + if isinstance(result_tuple, tuple) and len(result_tuple) == 2: + result, input_len = result_tuple + else: + result = None + input_len = 0 + except Exception as e: + print(f"\nError collecting response: {e}") + finally: + # 确保队列不会因为 continue/exception 而永久积压 + request_queue.task_done() + + # 无论成功失败都推进计数,避免等待 remaining responses 时卡死 current_count = counter[0] + 1 counter[0] = current_count print(f"\rfinished_reqs:{current_count} / target_reqs:{reqs_num} / sent_reqs:{sent_count[0]}", end="") + + if result is not None: + if len(result) >= 1 and not stop_send.is_set(): + results.append((result, input_len)) if len(results) >= reqs_num and not stop_send.is_set(): end_time[0] = time.time() print("\nReached target number of responses") @@ -245,6 +299,7 @@ async def response_collector( continue except Exception as e: print(f"\nError collecting response: {e}") + continue finally: if force_terminate: for task in pending_tasks: @@ -253,7 +308,15 @@ async def response_collector( async def run_continuous_benchmark( - async_task, url, prompts, max_new_tokens, reqs_num, num_clients, input_qps, force_terminate, continuous_send + async_task, + url, + prompts, + max_new_tokens, + reqs_num, + num_clients, + input_qps, + force_terminate, + continuous_send, ): request_queue = asyncio.Queue() stop_event = asyncio.Event() @@ -414,7 +477,6 @@ def main(): ) ) loop.close() - print(len(results)) first_token_time = [] decode_token_time = [] request_time = [] From e47689f39b156742a527d0866cfea607b8fc8dde Mon Sep 17 00:00:00 2001 From: hiworldwzj <30762946+hiworldwzj@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:08:48 +0800 Subject: [PATCH 2/2] fix --- lightllm/server/core/objs/req.py | 10 ---------- lightllm/server/core/objs/shm_array.py | 6 ------ lightllm/server/detokenization/manager.py | 1 - lightllm/server/httpserver/manager.py | 2 -- lightllm/server/router/dynamic_prompt/radix_cache.py | 8 ++++---- lightllm/server/router/model_infer/infer_batch.py | 1 - 6 files changed, 4 insertions(+), 24 deletions(-) diff --git a/lightllm/server/core/objs/req.py b/lightllm/server/core/objs/req.py index 40dc6cd98e..f489aac9c2 100644 --- a/lightllm/server/core/objs/req.py +++ b/lightllm/server/core/objs/req.py @@ -227,16 +227,6 @@ def link_logprobs_shm_array(self): self.shm_logprobs.link_shm() return - def release_shm_arrays(self): - """释放共享内存连接,防止内存泄露(仅 detach,不 unlink)""" - if hasattr(self, "shm_prompt_ids") and self.shm_prompt_ids is not None: - self.shm_prompt_ids.detach_shm() - self.shm_prompt_ids = None - if hasattr(self, "shm_logprobs") and self.shm_logprobs is not None: - self.shm_logprobs.detach_shm() - self.shm_logprobs = None - return - def get_prompt_ids(self): return self.shm_prompt_ids.arr[: self.input_len].tolist() diff --git a/lightllm/server/core/objs/shm_array.py b/lightllm/server/core/objs/shm_array.py index 0759e36816..c5ad512c6b 100644 --- a/lightllm/server/core/objs/shm_array.py +++ b/lightllm/server/core/objs/shm_array.py @@ -32,9 +32,3 @@ def close_shm(self): self.shm.unlink() self.shm = None self.arr = None - - def detach_shm(self): - if self.shm is not None: - self.shm.close() - self.shm = None - self.arr = None diff --git a/lightllm/server/detokenization/manager.py b/lightllm/server/detokenization/manager.py index f9027bb6eb..389171ba8a 100644 --- a/lightllm/server/detokenization/manager.py +++ b/lightllm/server/detokenization/manager.py @@ -161,7 +161,6 @@ def remove_finished_reqs(self): for decode_req in finished_reqs: decode_req.req.can_released_mark = True logger.info(f"detoken release req id {decode_req.req.request_id}") - decode_req.req.release_shm_arrays() self.shm_req_manager.put_back_req_obj(decode_req.req) self.req_id_to_out.pop(decode_req.request_id, None) return diff --git a/lightllm/server/httpserver/manager.py b/lightllm/server/httpserver/manager.py index 353d97d779..212e037e90 100644 --- a/lightllm/server/httpserver/manager.py +++ b/lightllm/server/httpserver/manager.py @@ -686,8 +686,6 @@ async def recycle_resource_loop(self): for req_status in release_req_status: self.req_id_to_out_inf.pop(req_status.group_req_objs.group_req_id, None) for req in req_status.group_req_objs.shm_req_objs: - req.shm_prompt_ids.close_shm() - req.shm_logprobs.close_shm() await self.shm_req_manager.async_put_back_req_obj(req) await self.shm_req_manager.async_release_req_index(req.index_in_shm_mem) await self._release_multimodal_resources(req_status.group_req_objs.multimodal_params) diff --git a/lightllm/server/router/dynamic_prompt/radix_cache.py b/lightllm/server/router/dynamic_prompt/radix_cache.py index 2d2c820c29..88b099459b 100644 --- a/lightllm/server/router/dynamic_prompt/radix_cache.py +++ b/lightllm/server/router/dynamic_prompt/radix_cache.py @@ -38,8 +38,8 @@ def split_node(self, prefix_len): split_parent_node = TreeNode() split_parent_node.parent = self.parent split_parent_node.parent.children[self.token_id_key[0].item()] = split_parent_node - split_parent_node.token_id_key = self.token_id_key[0:prefix_len].clone() - split_parent_node.token_mem_index_value = self.token_mem_index_value[0:prefix_len].clone() + split_parent_node.token_id_key = self.token_id_key[0:prefix_len] + split_parent_node.token_mem_index_value = self.token_mem_index_value[0:prefix_len] split_parent_node.children = {} split_parent_node.children[self.token_id_key[prefix_len].item()] = self split_parent_node.ref_counter = self.ref_counter @@ -58,8 +58,8 @@ def split_node(self, prefix_len): def add_and_return_new_child(self, token_id_key, token_mem_index_value): child = TreeNode() - child.token_id_key = token_id_key.clone() - child.token_mem_index_value = token_mem_index_value.clone() + child.token_id_key = token_id_key + child.token_mem_index_value = token_mem_index_value first_token_key = child.token_id_key[0].item() assert first_token_key not in self.children.keys() self.children[first_token_key] = child diff --git a/lightllm/server/router/model_infer/infer_batch.py b/lightllm/server/router/model_infer/infer_batch.py index dc7b4d186b..4b8b3c538f 100644 --- a/lightllm/server/router/model_infer/infer_batch.py +++ b/lightllm/server/router/model_infer/infer_batch.py @@ -160,7 +160,6 @@ def _filter(self, finished_request_ids: List[int]): free_req_index.append(req.req_idx) # logger.info(f"infer release req id {req.shm_req.request_id}") req.shm_req.shm_infer_released = True - req.shm_req.release_shm_arrays() self.shm_req_manager.put_back_req_obj(req.shm_req) free_token_index = custom_cat(free_token_index)