Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/uipath/_cli/cli_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from uipath._events._event_bus import EventBus
from uipath._utils._bindings import ResourceOverwritesContext
from uipath.eval._helpers import auto_discover_entrypoint
from uipath.platform.chat import set_llm_concurrency
from uipath.platform.common import UiPathConfig
from uipath.telemetry._track import flush_events
from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter
Expand Down Expand Up @@ -106,6 +107,12 @@ def setup_reporting_prereq(no_report: bool) -> bool:
type=click.Path(exists=False),
help="File path where traces will be written in JSONL format",
)
@click.option(
"--max-llm-concurrency",
type=int,
default=20,
help="Maximum concurrent LLM requests (default: 20)",
)
def eval(
entrypoint: str | None,
eval_set: str | None,
Expand All @@ -118,6 +125,7 @@ def eval(
report_coverage: bool,
model_settings_id: str,
trace_file: str | None,
max_llm_concurrency: int,
) -> None:
"""Run an evaluation set against the agent.

Expand All @@ -131,7 +139,11 @@ def eval(
enable_mocker_cache: Enable caching for LLM mocker responses
report_coverage: Report evaluation coverage
model_settings_id: Model settings ID to override agent settings
trace_file: File path where traces will be written in JSONL format
max_llm_concurrency: Maximum concurrent LLM requests
"""
set_llm_concurrency(max_llm_concurrency)

should_register_progress_reporter = setup_reporting_prereq(no_report)

result = Middlewares.next(
Expand Down
4 changes: 4 additions & 0 deletions src/uipath/platform/chat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
ToolParametersDefinition,
ToolPropertyDefinition,
)
from .llm_throttle import get_llm_semaphore, set_llm_concurrency

__all__ = [
# Conversations Service
Expand All @@ -39,6 +40,9 @@
"EmbeddingModels",
"UiPathLlmChatService",
"UiPathOpenAIService",
# LLM Throttling
"get_llm_semaphore",
"set_llm_concurrency",
# LLM Gateway Models
"ToolPropertyDefinition",
"ToolParametersDefinition",
Expand Down
46 changes: 25 additions & 21 deletions src/uipath/platform/chat/_llm_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
ToolChoice,
ToolDefinition,
)
from .llm_throttle import get_llm_semaphore

# Common constants
API_VERSION = "2024-10-21" # Standard API version for OpenAI-compatible endpoints
Expand Down Expand Up @@ -189,13 +190,14 @@ async def embeddings(
)
endpoint = Endpoint("/" + endpoint)

response = await self.request_async(
"POST",
endpoint,
json={"input": input},
params={"api-version": API_VERSION},
headers=DEFAULT_LLM_HEADERS,
)
async with get_llm_semaphore():
response = await self.request_async(
"POST",
endpoint,
json={"input": input},
params={"api-version": API_VERSION},
headers=DEFAULT_LLM_HEADERS,
)

return TextEmbedding.model_validate(response.json())

Expand Down Expand Up @@ -315,13 +317,14 @@ class Country(BaseModel):
# Use provided dictionary format directly
request_body["response_format"] = response_format

response = await self.request_async(
"POST",
endpoint,
json=request_body,
params={"api-version": API_VERSION},
headers=DEFAULT_LLM_HEADERS,
)
async with get_llm_semaphore():
response = await self.request_async(
"POST",
endpoint,
json=request_body,
params={"api-version": API_VERSION},
headers=DEFAULT_LLM_HEADERS,
)

return ChatCompletion.model_validate(response.json())

Expand Down Expand Up @@ -546,13 +549,14 @@ class Country(BaseModel):
"X-UiPath-LlmGateway-NormalizedApi-ModelName": model,
}

response = await self.request_async(
"POST",
endpoint,
json=request_body,
params={"api-version": NORMALIZED_API_VERSION},
headers=headers,
)
async with get_llm_semaphore():
response = await self.request_async(
"POST",
endpoint,
json=request_body,
params={"api-version": NORMALIZED_API_VERSION},
headers=headers,
)

return ChatCompletion.model_validate(response.json())

Expand Down
49 changes: 49 additions & 0 deletions src/uipath/platform/chat/llm_throttle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""LLM request throttling utilities.

This module provides concurrency control for LLM API requests to prevent
overwhelming the system with simultaneous calls.
"""

import asyncio

DEFAULT_LLM_CONCURRENCY = 20
_llm_concurrency_limit: int = DEFAULT_LLM_CONCURRENCY
_llm_semaphore: asyncio.Semaphore | None = None
_llm_semaphore_loop: asyncio.AbstractEventLoop | None = None


def get_llm_semaphore() -> asyncio.Semaphore:
"""Get the LLM semaphore, creating with configured limit if not set.

The semaphore is recreated if called from a different event loop than
the one it was originally created in. This prevents "bound to a different
event loop" errors when using multiple asyncio.run() calls.
"""
global _llm_semaphore, _llm_semaphore_loop

loop = asyncio.get_running_loop()

# Recreate semaphore if it doesn't exist or if the event loop changed
if _llm_semaphore is None or _llm_semaphore_loop is not loop:
_llm_semaphore = asyncio.Semaphore(_llm_concurrency_limit)
_llm_semaphore_loop = loop

return _llm_semaphore


def set_llm_concurrency(limit: int) -> None:
"""Set the max concurrent LLM requests. Call before making any LLM calls.

Args:
limit: Maximum number of concurrent LLM requests allowed (must be > 0).

Raises:
ValueError: If limit is less than 1.
"""
if limit < 1:
raise ValueError("LLM concurrency limit must be at least 1")

global _llm_concurrency_limit, _llm_semaphore, _llm_semaphore_loop
_llm_concurrency_limit = limit
_llm_semaphore = None
_llm_semaphore_loop = None
Loading