Skip to content

Conversation

@paxiaatucsdedu
Copy link
Member

Problem

Streaming tool calls were not handling double-escaped JSON arguments, while the non-streaming path (convert_oci_tool_call_to_langchain) already had this fix. This caused tool call arguments to be incorrectly parsed in streaming mode.

Solution

Applied the same double-escape handling logic to process_stream_tool_calls in both CohereProvider and GenericProvider:

args = tool_call["function"].get("arguments")
try:
    parsed_args = json.loads(json.loads(args))
    args = json.dumps(parsed_args)
except (json.JSONDecodeError, TypeError):
    pass

Logic:

  • Normal JSON ('{"key": "value"}'): First parse succeeds → dict → second parse raises TypeError → keep original

  • Double-escaped JSON ('"{"key": "value"}"'): First parse → string → second parse → dict → convert back to unescaped JSON

  • Invalid/empty JSON: First parse raises JSONDecodeError → keep original

Adds logic to parse tool call arguments that are double-escaped JSON strings in both CohereProvider and GenericProvider. This ensures arguments are correctly deserialized before being passed to tool_call_chunk.
@oracle-contributor-agreement oracle-contributor-agreement bot added the OCA Verified All contributors have signed the Oracle Contributor Agreement. label Nov 21, 2025
if tool_id:
tool_call_ids.add(tool_id)

args = tool_call["function"].get("arguments")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will take more than this to fix this problem.
When langgraph tries to consume streaming chunks and tries to create a tool call, it will fail if the parsed string is not a json and it will create an invalid tool call much before the control comes to our code.
https://github.com/langchain-ai/langchain/blob/master/libs/core/langchain_core/messages/ai.py#L508-L522

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option is to extend AIMessageChunk and override init_tool_calls to do the double parsing ourselves. Make sure you use the new class in this file instead of AIMessageChunk

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class OCIAIMessageChunk(AIMessageChunk):
    @model_validator(mode="after")
    def init_tool_calls(self) -> Self:
        """Initialize tool calls from tool call chunks.

        Returns:
            The values with tool calls initialized.

        Raises:
            ValueError: If the tool call chunks are malformed.
        """
        if not self.tool_call_chunks:
            if self.tool_calls:
                self.tool_call_chunks = [
                    create_tool_call_chunk(
                        name=tc["name"],
                        args=json.dumps(tc["args"]),
                        id=tc["id"],
                        index=None,
                    )
                    for tc in self.tool_calls
                ]
            if self.invalid_tool_calls:
                tool_call_chunks = self.tool_call_chunks
                tool_call_chunks.extend(
                    [
                        create_tool_call_chunk(
                            name=tc["name"], args=tc["args"], id=tc["id"], index=None
                        )
                        for tc in self.invalid_tool_calls
                    ]
                )
                self.tool_call_chunks = tool_call_chunks

            return self
        tool_calls = []
        invalid_tool_calls = []


        def add_chunk_to_invalid_tool_calls(chunk: ToolCallChunk) -> None:
            invalid_tool_calls.append(
                create_invalid_tool_call(
                    name=chunk["name"],
                    args=chunk["args"],
                    id=chunk["id"],
                    error=None,
                )
            )

        for chunk in self.tool_call_chunks:
            try:
                parsed_args = parse_partial_json(chunk["args"]) if chunk["args"] else {}
                if isinstance(parsed_args, str):
                    parsed_args = parse_partial_json(parsed_args)
                if isinstance(parsed_args, dict):
                    tool_calls.append(
                        create_tool_call(
                            name=chunk["name"] or "",
                            args=parsed_args,
                            id=chunk["id"],
                        )
                    )
                else:
                    add_chunk_to_invalid_tool_calls(chunk)
            except Exception:
                add_chunk_to_invalid_tool_calls(chunk)
        self.tool_calls = tool_calls
        self.invalid_tool_calls = invalid_tool_calls
        return self

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my experience, the OCI genai endpoint either returns a JSON or a double escaped JSON. The PR code handles both situations:

  • Normal JSON ('{"key": "value"}'): First parse succeeds → dict → second parse raises TypeError → keep original
  • Double-escaped JSON ('"{"key": "value"}"'): First parse → string → second parse → dict → convert back to unescaped JSON

Then the result passed to LangChain will always be a valid JSON after LangChain parsed it by:
args_ = parse_partial_json(chunk["args"]) if chunk["args"] else {}

Copy link

@kirankumarjoseph kirankumarjoseph Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not able to reproduce this double escaped json response from llama anymore.
Here is crux of the test case that I used to reproduce earlier.

@tool
def MonthlyVarianceSQL(month: str, year: str):
    """
    Fetch variance by customer for a specific month/year. Input: month (str), year (str)
    """
    return {
    "result": [
        {"CUSTOMER_NAME": "EuroTech GmbH", "COLLECTED": 1500000, "FORECAST": 3000000, "VARIANCE": -1500000},
        {"CUSTOMER_NAME": "Acme Corp", "COLLECTED": 3800000, "FORECAST": 5000000, "VARIANCE": -1200000},
        {"CUSTOMER_NAME": "Global Manufacturing", "COLLECTED": 2200000, "FORECAST": 3000000, "VARIANCE": -800000},
        {"CUSTOMER_NAME": "NorthStar Energy", "COLLECTED": 1200000, "FORECAST": 2000000, "VARIANCE": -800000},
        {"CUSTOMER_NAME": "Silverline Retail", "COLLECTED": 1000000, "FORECAST": 1500000, "VARIANCE": -500000},
        {"CUSTOMER_NAME": "Zenith Healthcare", "COLLECTED": 2800000, "FORECAST": 2800000, "VARIANCE": 0},
        {"CUSTOMER_NAME": "Pacific Traders", "COLLECTED": 2000000, "FORECAST": 2000000, "VARIANCE": 0}
    ],
    "query": "SELECT customer_name, SUM(amount) AS collected, SUM(forecast_amount) AS forecast, SUM(amount - forecast_amount) AS variance FROM GOLD.cashflow_actuals_vs_forecast    WHERE period_month = :month AND period_year = :year    GROUP BY customer_name    ORDER BY variance ASC;\n",
    "error": None,
    "executionTimeMs": 0
}


llm_conf = OCIAIConf(model_provider='generic',
                     compartment_id='ocid1.tenancy.oc1..aaaaaaaa7ayxuw32vjb64hbxtouarftwtwb2uat5x5mf4hu7cvzaesfrebrq',
                     model_args=model_args,
                     endpoint='https://inference.generativeai.us-chicago-1.oci.oraclecloud.com/',
                     model_id='meta.llama-3.3-70b-instruct',
                     auth_type="security_token",
                     auth_profile="DEFAULT",
                     guardrails_config=None)


    system_prompt = """You are a finance assistant. 
You have access to these tools:
- MonthlyVarianceSQL: fetch per-customer variance for a specific month and year
Question: {input}
Always use the appropriate tool first to get data, then explain in natural language.
"""

from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()

tools_agent1 = [MonthlyVarianceSQL]
agent = create_react_agent(model=oci_llm, tools=tools_agent1, prompt=system_prompt, debug=False, checkpointer= checkpointer)


agent.astream(input=input, config=config, stream_mode="messages")

First query
"Explain why cash inflow in August 2025 was below forecast."
2nd Query
"Tell me more information about Acme Corp in this period"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

OCA Verified All contributors have signed the Oracle Contributor Agreement.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants