diff --git a/pyproject.toml b/pyproject.toml index ef9a7342..b2d7eacb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -109,3 +109,4 @@ hint = [ [project.scripts] agentlab-assistant = "agentlab.ui_assistant:main" agentlab-xray = "agentlab.analyze.agent_xray:main" +agentlab-mentor = "agentlab.agents.hitl_agent.launch_hint_ui:main" diff --git a/src/agentlab/agents/agent_utils.py b/src/agentlab/agents/agent_utils.py index 29219d2d..179a94d2 100644 --- a/src/agentlab/agents/agent_utils.py +++ b/src/agentlab/agents/agent_utils.py @@ -1,6 +1,11 @@ +import copy + from PIL import Image, ImageDraw from playwright.sync_api import Page +from agentlab.analyze import overlay_utils +from agentlab.llm.llm_utils import img_to_base_64 + def draw_mouse_pointer(image: Image.Image, x: int, y: int) -> Image.Image: """ @@ -128,3 +133,24 @@ def zoom_webpage(page: Page, zoom_factor: float = 1.5): page.evaluate(f"document.documentElement.style.zoom='{zoom_factor*100}%'") return page + + +def overlay_action(obs, action): + """Overlays actions on screenshot in-place""" + act_img = copy.deepcopy(obs["screenshot"]) + act_img = Image.fromarray(act_img) + + new_obs_properties = copy.deepcopy(obs["extra_element_properties"]) + import os + + if os.getenv("AGENTLAB_USE_RETINA"): + # HACK: divide everything by 2 in the obs + # TODO: make this more robust by changing login in annotate_action directly (or maybe in the obs section?) + for key, value in new_obs_properties.items(): + try: + new_obs_properties[key]["bbox"] = [elem / 2 for elem in value["bbox"]] + except: + pass + + overlay_utils.annotate_action(act_img, action, properties=new_obs_properties) + return img_to_base_64(act_img) diff --git a/src/agentlab/agents/hitl_agent/base_multi_candidate_agent.py b/src/agentlab/agents/hitl_agent/base_multi_candidate_agent.py new file mode 100644 index 00000000..81a0db08 --- /dev/null +++ b/src/agentlab/agents/hitl_agent/base_multi_candidate_agent.py @@ -0,0 +1,50 @@ +from typing_extensions import Protocol + +from agentlab.agents.agent_args import AgentArgs + + +class MultiCandidateAgent(Protocol): + """ + Protocol for agents that generate multiple candidates for get_action. + + This protocol defines the contract for agents that can generate + multiple candidate actions and allow selection of one of them for execution. + """ + + def get_candidate_generations( + self, obs: dict, hint: list[str] | None = None, n_candidates: int = 3 + ) -> "list[dict]": + """ + Generate multiple candidate actions for the given observation. + + You can pass extra info in agent_info to update internal state of the + agent based on the selected candidate. Your internal state management + should be robust to multiple calls to the get_candidate_generations method + in a single step. + + Args: + obs: The current observation dictionary containing environment state + hint: Optional list of hint strings to guide candidate generation + n_candidates: Number of candidate actions to generate + """ + ... + + def update_agent_state_from_selected_candidate(self, output: dict): + """ + Update the agent's internal state based on the selected candidate. + This can include any memory or planning updates. + + Args: + output: The selected candidate action dictionary + """ + pass + + +class MultiCandidateAgentArgs(AgentArgs): + def make_agent(self) -> MultiCandidateAgent: ... + + def __post_init__(self): + """Prefix subagent name with 'MC-'.""" + super().__post_init__() + if hasattr(self, "agent_name") and self.agent_name: + self.agent_name = "MC-" + self.agent_name diff --git a/src/agentlab/agents/hitl_agent/generic_human_guided_agent.py b/src/agentlab/agents/hitl_agent/generic_human_guided_agent.py new file mode 100644 index 00000000..e8d31688 --- /dev/null +++ b/src/agentlab/agents/hitl_agent/generic_human_guided_agent.py @@ -0,0 +1,362 @@ +import base64 +import copy +import io +import re +from dataclasses import Field, asdict, dataclass +from typing import Dict, List + +import bgym +import numpy as np +from browsergym.experiments.agent import AgentInfo +from PIL import Image + +from agentlab.agents import dynamic_prompting as dp +from agentlab.agents.agent_utils import overlay_action +from agentlab.agents.generic_agent.generic_agent import GenericAgent, GenericAgentArgs +from agentlab.agents.generic_agent.generic_agent_prompt import MainPrompt +from agentlab.agents.hitl_agent.hint_labelling import ( + HintLabeling, + HintLabelingInputs, +) +from agentlab.llm.llm_utils import ( + Discussion, + HumanMessage, + SystemMessage, + img_to_base_64, +) +from agentlab.llm.tracking import cost_tracker_decorator + + +class CandidatesGeneration(dp.PromptElement): + # Ask for multiple alternatives; each candidate must contain and . + def __init__(self, hint: list[str] | None = None, n_candidates=3) -> None: + self.hint = hint + self.n_candidates = n_candidates + self.hint_prompt = "\n".join(f"{i}. {c}" for i, c in enumerate(hint, 1)) if hint else "" + super().__init__(True) + self._prompt = [ + dict( + type="text", + text=f""" + You are a web agent. Propose {self.n_candidates} alternative next steps for the current page. + {('Use the Hints:' + self.hint_prompt) if self.hint else ""}\n + Return EACH candidate wrapped as numbered tags: + ... + ... + + Inside every candidate you MUST include: + ...why this action is appropriate now... + ...ONE atomic, executable action string... + + Do not include any extra text outside the candidate tags. + Use this format: + + Explain why Candidate One is chosen + Candidate One Action + + + + Explain why Candidate Two is chosen + Candidate Two Action + + # Example + + The login button is visible and proceeding will reveal the auth form. + click(role="button", name="Log in") + + + + User might need to enter email first; the email field is focused and visible. + fill(bid="a112", text="user@example.com") + + """, + ) + ] + + # Regex patterns for numbered candidates only + _NUM_BLOCK = re.compile( + r"<\s*candidate[_ ]generation[_ ](?P[0-9]+)\s*>(?P.*?)<\s*/\s*candidate[_ ]generation[_ ](?P=idx)\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + _THINK_PATTERN = re.compile( + r"<\s*think\s*>(?P.*?)<\s*/\s*think\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + _ACTION_PATTERN = re.compile( + r"<\s*action\s*>(?P.*?)<\s*/\s*action\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + + def _parse_answer(self, text_answer: str) -> Dict[str, Dict[str, str]]: + """Extract up to n_candidates candidates, using numbered tags only. + + Args: + text_answer: The text response containing candidate generation tags. + + Returns: + Dictionary mapping candidate names to their think and action content. + Format: {"candidate_generation_1": {"think": "...", "action": "..."}, ...} + """ + result = { + f"candidate_generation_{i+1}": {"think": "", "action": ""} + for i in range(self.n_candidates) + } + + if not isinstance(text_answer, str): + return result + + matches: List[re.Match] = list(self._NUM_BLOCK.finditer(text_answer)) + # Sort by numeric index + matches_sorted = sorted(matches, key=lambda m: int(m.group("idx"))) + for i, m in enumerate(matches_sorted[: self.n_candidates]): + body = m.group("body").strip() + think_m = self._THINK_PATTERN.search(body) + action_m = self._ACTION_PATTERN.search(body) + result[f"candidate_generation_{i+1}"] = { + "think": (think_m.group("think").strip() if think_m else ""), + "action": (action_m.group("action").strip() if action_m else ""), + } + + return result + + +@dataclass +class MultipleProposalGenericAgentArgs(GenericAgentArgs): + + def make_agent(self): + return MultipleProposalGenericAgent( + chat_model_args=self.chat_model_args, flags=self.flags, max_retry=self.max_retry + ) + + def __post_init__(self): + """Prefix subagent name with 'HITL-'.""" + super().__post_init__() + if hasattr(self, "agent_name") and self.agent_name: + self.agent_name = "HITL-" + self.agent_name + + +class MultipleProposalGenericAgent(GenericAgent): + + def __init__( + self, + chat_model_args, + flags, + max_retry: int = 4, + ): + super().__init__(chat_model_args, flags, max_retry) + self.ui = None # Single HintLabeling instance + + def get_candidate_generation( + self, + sys_prompt: SystemMessage, + human_prompt: HumanMessage, + hint: list[str] | None = None, + n_candidates=3, + ) -> tuple[Dict[str, Dict[str, str]], Discussion]: + + cg = CandidatesGeneration(hint=hint, n_candidates=n_candidates) + candidates_prompt = HumanMessage(cg.prompt) + chat_messages = Discussion([sys_prompt, human_prompt, candidates_prompt]) + output = self.chat_llm(chat_messages) + candidates = cg._parse_answer(output["content"]) + self.step_n_human_intervention_rounds += 1 + msg_to_add_to_xray = Discussion([sys_prompt, human_prompt]) + + return candidates, msg_to_add_to_xray + + @cost_tracker_decorator + def get_action(self, obs): + # reset vars + step_hint = [] + self.step_n_human_intervention_rounds = 0 + self.obs_history.append(obs) + main_prompt = MainPrompt( + action_set=self.action_set, + obs_history=self.obs_history, + actions=self.actions, + memories=self.memories, + thoughts=self.thoughts, + previous_plan=self.plan, + step=self.plan_step, + flags=self.flags, + ) + + max_prompt_tokens, max_trunc_itr = self._get_maxes() + + system_prompt = SystemMessage(dp.SystemPrompt().prompt) + + human_prompt = dp.fit_tokens( + shrinkable=main_prompt, + max_prompt_tokens=max_prompt_tokens, + model_name=self.chat_model_args.model_name, + max_iterations=max_trunc_itr, + additional_prompts=system_prompt, + ) + # Initialize UI once outside the loop + if self.ui is None: + self.ui = HintLabeling(headless=False) + # Show initial waiting state + initial_inputs = HintLabelingInputs( + goal=( + obs.get("goal_object", [{}])[0].get("text", "") + if obs.get("goal_object") + else "" + ), + error_feedback="", + screenshot=(img_to_base_64(obs["screenshot"]) if "screenshot" in obs else ""), + screenshots=[], # no overlay screenshots yet + axtree=obs.get("axtree_txt", ""), + hints=[], + suggestions=[], # no suggestions yet + ) + self.ui.update_context(initial_inputs) + + # Generate first candidates + candidates, chat_messages = self.get_candidate_generation( + sys_prompt=system_prompt, + human_prompt=human_prompt, + hint=step_hint if step_hint else None, + ) + suggestions = [ + { + "id": key.split("_")[-1], + "action": candidate["action"], + "think": candidate["think"], + } + for key, candidate in candidates.items() + ] + # List of Images as base64 - create overlay screenshots for each suggestion + screenshots = [overlay_action(obs, choice["action"]) for choice in suggestions] + + while True: + try: + hint_labeling_inputs = HintLabelingInputs( + goal=( + obs.get("goal_object", [{}])[0].get("text", "") + if obs.get("goal_object") + else "" + ), + error_feedback=obs.get("last_action_error", ""), + screenshot=(img_to_base_64(obs["screenshot"]) if "screenshot" in obs else ""), + screenshots=screenshots, # list of overlay screenshots for hover + axtree=obs.get("axtree_txt", ""), + hints=step_hint, + suggestions=suggestions, + ) + + self.ui.update_context(hint_labeling_inputs) + response = self.ui.wait_for_response(timeout=600) + + if response["type"] == "reprompt": + new_hints = response["payload"].get("hints", []) + step_hint = list(new_hints) if isinstance(new_hints, list) else step_hint + candidates, chat_messages = self.get_candidate_generation( + sys_prompt=system_prompt, + human_prompt=human_prompt, + hint=step_hint if step_hint else None, + ) + suggestions = [ + { + "id": key.split("_")[-1], + "action": candidate["action"], + "think": candidate["think"], + } + for key, candidate in candidates.items() + ] + # Regenerate screenshots for new suggestions + screenshots = [overlay_action(obs, choice["action"]) for choice in suggestions] + # Continue the loop to show new suggestions + elif response["type"] == "step": + selected_action = response["payload"]["action"] + choice_idx = None + for i, candidate in enumerate(suggestions, 1): + if candidate["action"] == selected_action: + choice_idx = i + break + if choice_idx is None: + choice_idx = 1 + ans_dict = candidates[f"candidate_generation_{choice_idx}"] + break + else: + ans_dict = candidates["candidate_generation_1"] + break + + except KeyboardInterrupt: + print("User cancelled the operation") + if self.ui: + self.ui.close() + raise + except Exception as e: + print(f"Error in human intervention UI: {e}") + if self.ui: + self.ui.close() + self.ui = None + # Raise exception instead of falling back to console input + raise RuntimeError(f"Human intervention UI failed: {e}") from e + + # TODO: Refactor as discussed with ALAC. + stats = self.chat_llm.get_stats() + self.plan = ans_dict.get("plan", self.plan) + self.plan_step = ans_dict.get("step", self.plan_step) + self.actions.append(ans_dict["action"]) + self.memories.append(ans_dict.get("memory", None)) + self.thoughts.append(ans_dict.get("think", None)) + agent_info = AgentInfo( + think=ans_dict.get("think", None), + chat_messages=chat_messages, + stats=stats, + extra_info={ + "chat_model_args": asdict(self.chat_model_args), + "step_hints": step_hint, + "n_human_intervention_rounds": self.step_n_human_intervention_rounds, + "candidates": candidates, + "suggestions": suggestions, + }, + ) + return ans_dict["action"], agent_info + + +def get_base_agent(llm_config): + """Creates and returns a MultipleProposalGenericAgentArgs instance with + specified LLM configuration from CHAT_MODEL_ARGS_DICT. + + Args: + llm_config: The LLM configuration key to use from CHAT_MODEL_ARGS_DICT. + + Returns: + MultipleProposalGenericAgentArgs: Configured agent arguments instance. + """ + + from agentlab.agents.generic_agent.tmlr_config import BASE_FLAGS + from agentlab.llm.llm_configs import CHAT_MODEL_ARGS_DICT + + return MultipleProposalGenericAgentArgs( + chat_model_args=CHAT_MODEL_ARGS_DICT[llm_config], + flags=BASE_FLAGS, + ) + + +HUMAN_GUIDED_GENERIC_AGENT = get_base_agent("openai/gpt-5-mini-2025-08-07") + +if __name__ == "__main__": + import logging + + from agentlab.agents.hitl_agent.generic_human_guided_agent import ( + HUMAN_GUIDED_GENERIC_AGENT, + ) + from agentlab.experiments.study import Study + + agent_configs = [HUMAN_GUIDED_GENERIC_AGENT] + benchmark = bgym.DEFAULT_BENCHMARKS["miniwob"]() + benchmark = benchmark.subset_from_glob("task_name", "*book*") + benchmark.env_args_list = benchmark.env_args_list[3:4] + + for env_args in benchmark.env_args_list: + env_args.max_steps = 100 # max human steps + env_args.headless = True + + Study(agent_configs, benchmark, logging_level=logging.WARNING).run( + n_jobs=1, + parallel_backend="sequential", + n_relaunch=1, + ) diff --git a/src/agentlab/agents/hitl_agent/hint_labelling.py b/src/agentlab/agents/hitl_agent/hint_labelling.py new file mode 100644 index 00000000..f1120f02 --- /dev/null +++ b/src/agentlab/agents/hitl_agent/hint_labelling.py @@ -0,0 +1,166 @@ +import json +import logging +from importlib import resources +from queue import Queue +from typing import Dict, List, Optional + +import playwright.sync_api +from browsergym.core import _get_global_playwright +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + +HINT_LABELING_DIR = resources.files("agentlab.agents.hitl_agent.hint_labelling_ui_files") + + +class HintLabelingInputs(BaseModel): + goal: str + error_feedback: str = "" + screenshot: str # base64 screenshot (original/current) + screenshots: List[str] = Field(default_factory=list) # list of base64 screenshots for hover + axtree: str + hints: List[str] = Field(default_factory=list) + suggestions: List[Dict[str, str]] = Field(default_factory=list) + + +class HintLabeling: + def __init__(self, headless: bool, *args, **kwargs): + pw_opt = _get_global_playwright() + pw: playwright.sync_api.Playwright = pw_opt # type: ignore[assignment] + self.browser = pw.chromium.launch(headless=headless) + self.context = self.browser.new_context( + no_viewport=True, + ) + self.page = self.context.new_page() + self._resp_queue = Queue() + + self.page.route("**/api/reprompt", self._route_reprompt) + self.page.route("**/api/submit", self._route_submit) + self.page.set_content(get_hint_labeling_ui(HINT_LABELING_DIR)) + + # internal state + self._context = None + self._running = False + + def _route_reprompt( + self, route: playwright.sync_api.Route, request: playwright.sync_api.Request + ): + logger.info("Route hit: %s %s", request.method, request.url) + try: + body = json.loads(request.post_data or "{}") + except Exception: + body = {} + # enqueue output 1 (reprompt) + hints = body.get("hints") + if not isinstance(hints, list): + # Back-compat: accept single 'hint' string + h = body.get("hint") + hints = [h] if isinstance(h, str) and h.strip() else [] + msg = {"type": "reprompt", "payload": {"hints": hints}} + self._resp_queue.put(msg) + # Respond something minimal so UI doesn’t break; it will be refreshed by a later update_context() + route.fulfill( + status=200, + content_type="application/json", + body=json.dumps({"suggestions": []}), + ) + + def _route_submit(self, route: playwright.sync_api.Route, request: playwright.sync_api.Request): + logger.info("Route hit: %s %s", request.method, request.url) + try: + body = json.loads(request.post_data or "{}") + except Exception: + body = {} + # Map UI payload -> your step shape + msg = { + "type": "step", + "payload": { + "think": body.get("think", ""), + "action": body.get("action", ""), + }, + } + self._resp_queue.put(msg) + # UI expects 200 JSON; we can optionally send new suggestions here too. + route.fulfill( + status=200, + content_type="application/json", + body=json.dumps({"suggestions": []}), + ) + + def _to_ui_bootstrap(self, ctx: HintLabelingInputs) -> dict: + return { + "goal": ctx.goal, + "error_feedback": ctx.error_feedback, + "screenshot": ctx.screenshot, + "screenshots": ctx.screenshots, # list of screenshots for hover + "axtree": ctx.axtree, + "hints": ctx.hints, + "suggestions": ctx.suggestions, + } + + def update_context(self, context: HintLabelingInputs): + self._context = context + ui_payload = self._to_ui_bootstrap(context) + # call JS function with arg (no string concat) + self.page.evaluate("(d) => updateContext(d)", ui_payload) + + def wait_for_response(self, timeout: Optional[float] = 600) -> dict: + """ + Wait until the page makes a request to /api/reprompt or /api/submit, + then parse the request body and return it in your schema. + + Args: + timeout (Optional[float]): Maximum time to wait for the request in seconds. If None or 0, + waits indefinitely. Defaults to 600 seconds. + + Returns: + dict: A dictionary containing the parsed response with 'type' and 'payload' keys. + For /api/reprompt: {'type': 'reprompt', 'payload': {'hints': list[str]}} + For /api/submit: {'type': 'step', 'payload': {'think': str, 'action': str}} + + """ + logger.info("Waiting for response from Hint Labeling UI...") + + def is_api(req: playwright.sync_api.Request) -> bool: + u = req.url + return ( + u.endswith("/api/reprompt") or u.endswith("/api/submit") + ) and req.method == "POST" + + # This pumps Playwright internally; no busy waiting. + with self.page.expect_request( + is_api, timeout=(timeout * 1000 if timeout else 0) + ) as req_info: + req = req_info.value + + body_text = req.post_data or "{}" + try: + body = json.loads(body_text) + except Exception as e: + print("JSON parse error:", e) + body = {} + + if req.url.endswith("/api/reprompt"): + hints = body.get("hints") + if not isinstance(hints, list): + h = body.get("hint") + hints = [h] if isinstance(h, str) and h.strip() else [] + msg = {"type": "reprompt", "payload": {"hints": hints}} + else: + msg = { + "type": "step", + "payload": {"think": body.get("think", ""), "action": body.get("action", "")}, + } + + logger.info("Response received: %s", msg) + return msg + + def close(self): + self.context.close() + self.browser.close() + + +def get_hint_labeling_ui(hint_labeling_dir) -> str: + with open(hint_labeling_dir / "hint_labeling_ui.html", "r") as file: + hint_labeling_html = file.read() + return hint_labeling_html diff --git a/src/agentlab/agents/hitl_agent/hint_labelling_ui_files/hint_labeling_ui.html b/src/agentlab/agents/hitl_agent/hint_labelling_ui_files/hint_labeling_ui.html new file mode 100644 index 00000000..a2c7b540 --- /dev/null +++ b/src/agentlab/agents/hitl_agent/hint_labelling_ui_files/hint_labeling_ui.html @@ -0,0 +1,703 @@ + + + + + + + Agent Reprompt UI + + + +
+ +
+
+ + +
+ +
+ +
+
+

Goal

+
+
+
+

Error Feedback

+
+
+
+ + +
+ +
+ +
+

Hints

+ + + +
+ + +
+

Suggestions

+
+ + + +
+
+ + +
+
+ + +
+
+
+ screenshot +
+ + +
+
+
+ + + +
+ + + + \ No newline at end of file diff --git a/src/agentlab/agents/hitl_agent/hitl_agent.py b/src/agentlab/agents/hitl_agent/hitl_agent.py new file mode 100644 index 00000000..9b84793b --- /dev/null +++ b/src/agentlab/agents/hitl_agent/hitl_agent.py @@ -0,0 +1,205 @@ +from dataclasses import dataclass +from typing import Optional + +import bgym +import playwright +from browsergym.experiments.agent import Agent + +from agentlab.agents.agent_args import AgentArgs +from agentlab.agents.agent_utils import overlay_action +from agentlab.agents.hitl_agent.base_multi_candidate_agent import MultiCandidateAgent +from agentlab.agents.hitl_agent.hint_labelling import ( + HintLabeling, + HintLabelingInputs, +) +from agentlab.llm.llm_utils import img_to_base_64 +from agentlab.llm.tracking import cost_tracker_decorator + + +class HumanInTheLoopAgent(Agent): + + def __init__( + self, + subagent_args, # Type: any object with MultiCandidateAgent interface + ): + self.subagent: MultiCandidateAgent = subagent_args.make_agent() + super().__init__() + self.ui = None + + @cost_tracker_decorator + def get_action(self, obs): + # reset vars + step_n_human_intervention_rounds = 0 + step_hint = [] + + # Initialize UI once outside the loop + if self.ui is None: + self.ui = HintLabeling(headless=False) + # Show initial waiting state + initial_inputs = HintLabelingInputs( + goal=( + obs.get("goal_object", [{}])[0].get("text", "") + if obs.get("goal_object") + else "" + ), + error_feedback="", + screenshot=(img_to_base_64(obs["screenshot"]) if "screenshot" in obs else ""), + screenshots=[], # no overlay screenshots yet + axtree=obs.get("axtree_txt", ""), + hints=[], + suggestions=[], # no suggestions yet + ) + self.ui.update_context(initial_inputs) + + # Generate first candidates + candidates = self.subagent.get_candidate_generations(obs, hint=None, n_candidates=3) + step_n_human_intervention_rounds += 1 + suggestions = [{"action": c["action"], "think": c["agent_info"].think} for c in candidates] + # List of Images as base64 - create overlay screenshots for each suggested action + screenshots = [overlay_action(obs, choice["action"]) for choice in suggestions] + + while True: + try: + hint_labeling_inputs = HintLabelingInputs( + goal=( + obs.get("goal_object", [{}])[0].get("text", "") + if obs.get("goal_object") + else "" + ), + error_feedback=obs.get("last_action_error", ""), + screenshot=(img_to_base_64(obs["screenshot"]) if "screenshot" in obs else ""), + screenshots=screenshots, # list of overlay screenshots for hover + axtree=obs.get("axtree_txt", ""), + hints=step_hint, + suggestions=suggestions, + ) + + self.ui.update_context(hint_labeling_inputs) + response = self.ui.wait_for_response(timeout=600) + + if response["type"] == "reprompt": + new_hints = response["payload"].get("hints", []) + # Replace with the new list from UI, or extend if needed + step_hint = list(new_hints) if isinstance(new_hints, list) else step_hint + candidates = self.subagent.get_candidate_generations( + obs, hint=step_hint if step_hint else None, n_candidates=3 + ) + step_n_human_intervention_rounds += 1 + suggestions = [ + {"action": c["action"], "think": c["agent_info"].think} for c in candidates + ] + screenshots = [overlay_action(obs, choice["action"]) for choice in suggestions] + + elif response["type"] == "step": + selected_action = response["payload"]["action"] + choice_idx = None + for i, candidate in enumerate(suggestions): + if candidate["action"] == selected_action: + choice_idx = i + break + selected_candidate = candidates[choice_idx] + self.subagent.update_agent_state_from_selected_candidate(selected_candidate) + action = selected_candidate["action"] + agent_info = selected_candidate["agent_info"] + return action, agent_info + + except KeyboardInterrupt: + print("User cancelled the operation") + if self.ui: + self.ui.close() + raise + except playwright.sync_api.TimeoutError: + # Handle timeout specifically: fall back to first candidate + print("UI timeout; falling back to first candidate.") + selected_candidate = candidates[0] + self.subagent.update_agent_state_from_selected_candidate(selected_candidate) + action = selected_candidate["action"] + agent_info = selected_candidate["agent_info"] + return action, agent_info + except Exception as e: + print(f"Error in human intervention UI: {e}") + if self.ui: + self.ui.close() + self.ui = None + # Raise exception instead of falling back to console input + raise RuntimeError(f"Human intervention UI failed: {e}") from e + + +@dataclass +class HumanInTheLoopAgentArgs(AgentArgs): + subagent_args: Optional[AgentArgs] = None # args for the underlying multiple proposal agent + + def make_agent(self): + assert self.subagent_args is not None + return HumanInTheLoopAgent(subagent_args=self.subagent_args) + + def __post_init__(self): + """Prefix subagent name with 'HITL-'.""" + super().__post_init__() + if self.subagent_args and self.subagent_args.agent_name: + self.agent_name = "HITL-" + self.subagent_args.agent_name + + def set_benchmark(self, benchmark, demo_mode): + """Delegate set_benchmark to the subagent if it has the method.""" + if hasattr(self.subagent_args, "set_benchmark"): + self.subagent_args.set_benchmark(benchmark, demo_mode) + + def set_reproducibility_mode(self): + """Delegate set_reproducibility_mode to the subagent if it has the method.""" + if hasattr(self.subagent_args, "set_reproducibility_mode"): + self.subagent_args.set_reproducibility_mode() + + +def get_base_human_in_the_loop_genericagent(llm_config): + """ + Create a base human-in-the-loop generic agent configuration using the key from CHAT_MODEL_ARGS_DICT. + + This function creates a HumanInTheLoopAgentArgs instance with a MultiCandidateGenericAgent + as the subagent, configured with the specified LLM configuration and base flags. + + Args: + llm_config (str): The LLM configuration key to use from CHAT_MODEL_ARGS_DICT. + + Returns: + HumanInTheLoopAgentArgs: Configured human-in-the-loop agent arguments with + a multi-candidate generic agent as the subagent. + """ + from agentlab.agents.generic_agent.tmlr_config import BASE_FLAGS + from agentlab.agents.hitl_agent.hitl_agent import HumanInTheLoopAgentArgs + from agentlab.agents.hitl_agent.multi_candidate_generic_agent import ( + MultiCandidateGenericAgentArgs, + ) + from agentlab.llm.llm_configs import CHAT_MODEL_ARGS_DICT + + return HumanInTheLoopAgentArgs( + subagent_args=MultiCandidateGenericAgentArgs( + chat_model_args=CHAT_MODEL_ARGS_DICT[llm_config], + flags=BASE_FLAGS, + ) + ) + + +HUMAN_GUIDED_GENERIC_AGENT = get_base_human_in_the_loop_genericagent("openai/gpt-5-mini-2025-08-07") + +if __name__ == "__main__": + import logging + + from agentlab.agents.hitl_agent.hitl_agent import ( + HUMAN_GUIDED_GENERIC_AGENT, + ) + from agentlab.experiments.study import Study + + agent_configs = [HUMAN_GUIDED_GENERIC_AGENT] + benchmark = bgym.DEFAULT_BENCHMARKS["miniwob"]() + benchmark = benchmark.subset_from_glob("task_name", "*book*") + benchmark.env_args_list = benchmark.env_args_list[2:3] + + for env_args in benchmark.env_args_list: + env_args.max_steps = 100 # max human steps + env_args.headless = False + + Study(agent_configs, benchmark, logging_level=logging.WARNING).run( + n_jobs=1, + parallel_backend="sequential", + n_relaunch=1, + ) diff --git a/src/agentlab/agents/hitl_agent/launch_hint_ui.py b/src/agentlab/agents/hitl_agent/launch_hint_ui.py new file mode 100644 index 00000000..df2e9dbc --- /dev/null +++ b/src/agentlab/agents/hitl_agent/launch_hint_ui.py @@ -0,0 +1,176 @@ +""" +Console launcher for the Human-in-the-Loop Generic Agent UI. + +Usage (installed entry point): + agentlab-mentor --benchmark miniwob --task-name miniwob.book-flight --seed 123 --no-headless + +This will run a Study with the MultipleProposalGenericAgent and the selected task. +""" + +from __future__ import annotations + +import argparse +import logging +from pathlib import Path + +import bgym + +from agentlab.agents.hitl_agent.generic_human_guided_agent import get_base_agent +from agentlab.experiments.exp_utils import RESULTS_DIR +from agentlab.experiments.study import Study + + +def build_benchmark(benchmark_name: str, task_name: str, seed: int, headless: bool): + # Instantiate benchmark by name using BrowserGym registry + try: + benchmark = bgym.DEFAULT_BENCHMARKS[benchmark_name.lower()]() + except KeyError as e: + choices = ", ".join(sorted(bgym.DEFAULT_BENCHMARKS.keys())) + raise SystemExit(f"Unknown benchmark '{benchmark_name}'. Choose one of: {choices}") from e + + filtered_env_args = [ + env_args for env_args in benchmark.env_args_list if env_args.task_name == task_name + ] + if not filtered_env_args: + raise SystemExit(f'No tasks found matching "{task_name}"') + filtered_env_args = filtered_env_args[:1] # take the first one + benchmark.env_args_list = filtered_env_args + + # Reasonable defaults for interactive UI + for env_args in benchmark.env_args_list: + env_args.task_seed = seed + env_args.max_steps = env_args.max_steps or 200 + env_args.headless = headless + + return benchmark + + +def extract_hints_from_experiment_trace(exp_dir): + """Extracts hints from every step of each episode in a exp_dir and returns a df with each row containing a hint. + + Args: + exp_dir: Path-like to a study/experiment directory whose results should be scanned. + + Returns: + pandas.DataFrame: One row per hint with metadata columns. + """ + import pandas as pd + + from agentlab.analyze import inspect_results + from agentlab.experiments.exp_utils import RESULTS_DIR + from agentlab.experiments.loop import ExpResult + + output = [] + # Use provided exp_dir if set; otherwise default to <$AGENTLAB_EXP_ROOT>/agentlab_mentor + result_df = inspect_results.load_result_df(exp_dir or (RESULTS_DIR / "agentlab_mentor")) + if result_df is None: + # No results to parse; return empty dataframe with expected columns + return pd.DataFrame( + columns=[ + "exp_id", + "agent_name", + "benchmark", + "task_name", + "episode_reward", + "hint", + ] + ) + result_df = result_df.reset_index() + for _, row in result_df.iterrows(): + result = ExpResult(row.exp_dir) + episode = result.steps_info + episode_reward = max([step.reward for step in episode]) + for step_info in episode: + step_hints = step_info.agent_info.get("extra_info", {}).get("step_hints", None) + if step_hints: + for hint in step_hints: + output.append( + { + "exp_id": row["exp_id"], + "agent_name": row["agent.agent_name"], + "benchmark": row["env.task_name"].split(".")[0], + "task_name": row["env.task_name"], + "episode_reward": episode_reward, + "hint": hint, + } + ) + output = pd.DataFrame(output) + output = output.dropna() + return output + + +def parse_args(): + p = argparse.ArgumentParser(description="Run HITL Generic Agent UI on a benchmark task") + p.add_argument( + "--benchmark", + required=False, + help="Benchmark name as registered in BrowserGym, e.g., miniwob, workarena_l1, webarena, visualwebarena", + ) + p.add_argument( + "--task-name", + dest="task_name", + required=False, + help="Exact task name within the benchmark (e.g., 'miniwob.book-flight')", + ) + p.add_argument( + "--seed", + type=int, + required=False, + help="Task seed to use for the selected task.", + ) + p.add_argument( + "--llm-config", + dest="llm_config", + default="openai/gpt-5-mini-2025-08-07", + help="LLM configuration to use for the agent (e.g., 'azure/gpt-5-mini-2025-08-07').", + ) + p.add_argument( + "--headless", + action=argparse.BooleanOptionalAction, + default=True, + help="Run the browser headless (default: True). Use --no-headless to show the browser.", + ) + p.add_argument( + "--download-hints", + nargs="?", + const="extracted_hints.csv", + required=False, + default=None, + metavar="[OUTPUT_CSV]", + help=( + "Extract hints from the default study directory and save to OUTPUT_CSV. " + "If OUTPUT_CSV is omitted, saves to 'extracted_hints.csv'. When provided, other args are ignored." + ), + ) + return p.parse_args() + + +def main(): + args = parse_args() + save_dir = RESULTS_DIR / "agentlab_mentor" + if args.download_hints: + df = extract_hints_from_experiment_trace(save_dir) + out_path = Path(args.download_hints) + out_path.parent.mkdir(parents=True, exist_ok=True) + df.to_csv(out_path, index=False) + print(str(out_path)) + return + # Validate required args only when not downloading hints + if not args.benchmark or not args.task_name or args.seed is None: + raise SystemExit( + "--benchmark, --task-name, and --seed are required unless using --download-hints" + ) + benchmark = build_benchmark(args.benchmark, args.task_name, args.seed, args.headless) + agent_configs = [get_base_agent(args.llm_config)] + # study is needed to run the 'set_benchmark' method which sets appropriate agent parameters. + study = Study(agent_args=agent_configs, benchmark=benchmark, logging_level=logging.WARNING) + study.run( + n_jobs=1, + parallel_backend="sequential", + n_relaunch=1, + exp_root=save_dir, + ) + + +if __name__ == "__main__": + main() diff --git a/src/agentlab/agents/hitl_agent/multi_candidate_generic_agent.py b/src/agentlab/agents/hitl_agent/multi_candidate_generic_agent.py new file mode 100644 index 00000000..e4e53b7a --- /dev/null +++ b/src/agentlab/agents/hitl_agent/multi_candidate_generic_agent.py @@ -0,0 +1,225 @@ +import re +from dataclasses import asdict, dataclass +from typing import Dict, List + +from browsergym.experiments.agent import AgentInfo + +from agentlab.agents import dynamic_prompting as dp +from agentlab.agents.generic_agent.generic_agent import GenericAgent, GenericAgentArgs +from agentlab.agents.generic_agent.generic_agent_prompt import MainPrompt +from agentlab.llm.llm_utils import Discussion, HumanMessage, SystemMessage + + +class CandidatesGeneration(dp.PromptElement): + # Ask for multiple alternatives; each candidate must contain and . + def __init__(self, hint: list[str] | None = None, n_candidates=3) -> None: + self.hint = hint + self.n_candidates = n_candidates + self.hint_prompt = "\n".join(f"{i}. {c}" for i, c in enumerate(hint, 1)) if hint else "" + super().__init__(True) + self._prompt = [ + dict( + type="text", + text=f""" + You are a web agent. Propose {self.n_candidates} alternative next steps for the current page. + {('Use the Hints:' + self.hint_prompt) if self.hint else ""}\n + Return EACH candidate wrapped as numbered tags: + ... + ... + + Inside every candidate you MUST include: + ...why this action is appropriate now... + ...ONE atomic, executable action string... + + Do not include any extra text outside the candidate tags. + Use this format: + + Explain why Candidate One is chosen + Candidate One Action + + + + Explain why Candidate Two is chosen + Candidate Two Action + + # Example + + The login button is visible and proceeding will reveal the auth form. + click(role="button", name="Log in") + + + + User might need to enter email first; the email field is focused and visible. + fill(bid="a112", text="user@example.com") + + """, + ) + ] + + # Regex patterns for numbered candidates only + _NUM_BLOCK = re.compile( + r"<\s*candidate[_ ]generation[_ ](?P[0-9]+)\s*>(?P.*?)<\s*/\s*candidate[_ ]generation[_ ](?P=idx)\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + _THINK_PATTERN = re.compile( + r"<\s*think\s*>(?P.*?)<\s*/\s*think\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + _ACTION_PATTERN = re.compile( + r"<\s*action\s*>(?P.*?)<\s*/\s*action\s*>", + flags=re.IGNORECASE | re.DOTALL, + ) + + def _parse_answer(self, text_answer: str) -> Dict[str, Dict[str, str]]: + """Extract up to n_candidates candidates, using numbered tags only. + + Args: + text_answer: The text response containing candidate generation tags. + + Returns: + Dictionary mapping candidate names to their think and action content. + Format: {"candidate_generation_1": {"think": "...", "action": "..."}, ...} + """ + result = { + f"candidate_generation_{i+1}": {"think": "", "action": ""} + for i in range(self.n_candidates) + } + + if not isinstance(text_answer, str): + return result + + matches: List[re.Match] = list(self._NUM_BLOCK.finditer(text_answer)) + # Sort by numeric index + matches_sorted = sorted(matches, key=lambda m: int(m.group("idx"))) + for i, m in enumerate(matches_sorted[: self.n_candidates]): + body = m.group("body").strip() + think_m = self._THINK_PATTERN.search(body) + action_m = self._ACTION_PATTERN.search(body) + result[f"candidate_generation_{i+1}"] = { + "think": (think_m.group("think").strip() if think_m else ""), + "action": (action_m.group("action").strip() if action_m else ""), + } + + return result + + +class MultiCandidateGenericAgent(GenericAgent): + + def __init__( + self, + chat_model_args, + flags, + max_retry: int = 4, + ): + super().__init__(chat_model_args, flags, max_retry) + + def get_candidate_generations( + self, + obs, + hint: list[str] | None = None, + n_candidates=3, + ) -> list[dict]: + # Append obs to history only if it's not already the last entry + # Important to handle cases when get_candidate_generation is called multiple times in a single step. + if not self.obs_history or self.obs_history[-1] is not obs: + self.obs_history.append(obs) + + main_prompt = MainPrompt( + action_set=self.action_set, + obs_history=self.obs_history, + actions=self.actions, + memories=self.memories, + thoughts=self.thoughts, + previous_plan=self.plan, + step=self.plan_step, + flags=self.flags, + ) + max_prompt_tokens, max_trunc_itr = self._get_maxes() + + system_prompt = SystemMessage(dp.SystemPrompt().prompt) + + human_prompt = dp.fit_tokens( + shrinkable=main_prompt, + max_prompt_tokens=max_prompt_tokens, + model_name=self.chat_model_args.model_name, + max_iterations=max_trunc_itr, + additional_prompts=system_prompt, + ) + + cg = CandidatesGeneration(hint=hint, n_candidates=n_candidates) + candidates_prompt = HumanMessage(cg.prompt) + chat_messages = Discussion([system_prompt, human_prompt, candidates_prompt]) + output = self.chat_llm(chat_messages) + candidates = cg._parse_answer(output["content"]) + # Not adding the generate candidate prompt to xray. + msg_to_add_to_xray = Discussion([system_prompt, human_prompt]) + suggestions = [ + { + "action": candidate["action"], + "think": candidate["think"], + } + for key, candidate in candidates.items() + ] + output = [] + for candidate in suggestions: + agent_info = AgentInfo( + think=candidate.get("think", None), + chat_messages=msg_to_add_to_xray, + stats=self.chat_llm.get_stats(), + extra_info={ + "chat_model_args": asdict(self.chat_model_args), + "think": candidate.get("think", None), + "plan": candidate.get("plan", None), + "step": candidate.get("step", None), + "memory": candidate.get("memory", None), + }, + ) + output.append({"action": candidate["action"], "agent_info": agent_info}) + + return output + + def update_agent_state_from_selected_candidate(self, output): + """Updates the agent's internal state based on the selected candidate from human feedback. + + Args: + output: Dictionary containing 'action' and 'agent_info' keys from selected candidate. + """ + action, agent_info = output["action"], output["agent_info"] + self.plan = agent_info.extra_info.get("plan", self.plan) + self.plan_step = agent_info.extra_info.get("step", self.plan_step) + self.memories.append(agent_info.extra_info.get("memory", None)) + self.thoughts.append(agent_info.extra_info.get("think", None)) + self.actions.append(action) + + def get_action(self, obs): + """Generates multiple candidates and always returns the first one. + This allows to use this agent as a drop-in replacement for a single-candidate agent. + + Args: + obs: The observation from the environment. + + Returns: + tuple: A tuple containing (action, agent_info). + """ + candidates = self.get_candidate_generations(obs, hint=None, n_candidates=2) + selection = candidates[0] # always select the first option. + self.update_agent_state_from_selected_candidate(selection) + action, agent_info = selection["action"], selection["agent_info"] + + return action, agent_info + + +@dataclass +class MultiCandidateGenericAgentArgs(GenericAgentArgs): + def make_agent(self): + return MultiCandidateGenericAgent( + chat_model_args=self.chat_model_args, + flags=self.flags, + max_retry=self.max_retry, + ) + + def __post_init__(self): + """Prefix subagent name with 'MC-'.""" + super().__post_init__() + if hasattr(self, "agent_name") and self.agent_name: + self.agent_name = "MC-" + self.agent_name diff --git a/src/agentlab/analyze/agent_xray.py b/src/agentlab/analyze/agent_xray.py index 8accbfd6..6dbec117 100644 --- a/src/agentlab/analyze/agent_xray.py +++ b/src/agentlab/analyze/agent_xray.py @@ -818,6 +818,18 @@ def update_agent_info_html(): s1, action_str = get_screenshot(info, info.step, False) s2, action_str = get_screenshot(info, info.step + 1, False) agent_info = info.exp_result.steps_info[info.step].agent_info + # Minimal: show step_hints if present + hints = ( + agent_info.get("step_hints") + or agent_info.get("hints") + or agent_info.get("extra_info", {}).get("step_hints") + ) + if hints: + if not isinstance(hints, (list, tuple)): + hints = [hints] + items = "".join(f"
  • {html.escape(str(h))}
  • " for h in hints) + hints_html = f"

    Step Hints

      {items}
    " + return _page_to_iframe(hints_html), s1, s2 page = agent_info.get("html_page", ["No Agent Info"]) if page is None: page = """Fill up html_page attribute in AgentInfo to display here.""" diff --git a/src/agentlab/llm/llm_utils.py b/src/agentlab/llm/llm_utils.py index 10013b72..2bc83d43 100644 --- a/src/agentlab/llm/llm_utils.py +++ b/src/agentlab/llm/llm_utils.py @@ -727,6 +727,16 @@ def image_to_png_base64_url(image: np.ndarray | Image.Image): return f"data:image/png;base64,{image_base64}" +def img_to_base_64(image: Image.Image | np.ndarray) -> str: + """Converts a PIL Image or NumPy array to a base64-encoded string.""" + if isinstance(image, np.ndarray): + image = Image.fromarray(image) + buffer = io.BytesIO() + image.save(buffer, format="PNG") + b64_str = base64.b64encode(buffer.getvalue()).decode("utf-8") + return b64_str + + class BaseMessage(dict): def __init__(self, role: str, content: Union[str, list[dict]], **kwargs): allowed_attrs = {"log_probs"}