From 8847199f1a1fd8a442c4601e61063694ab9a8d1f Mon Sep 17 00:00:00 2001 From: Davide Morelli Date: Thu, 27 Nov 2025 15:08:53 +0100 Subject: [PATCH 1/2] Implement tools for multiplication, addition, and water usage Added tools for mathematical operations and water consumption calculations. --- knowledge_base/knowledge_manager.py | 224 +++++++++++++++++++++++++++- 1 file changed, 219 insertions(+), 5 deletions(-) diff --git a/knowledge_base/knowledge_manager.py b/knowledge_base/knowledge_manager.py index 02d77da..ec92a70 100644 --- a/knowledge_base/knowledge_manager.py +++ b/knowledge_base/knowledge_manager.py @@ -21,8 +21,104 @@ import chromadb from chromadb.config import Settings from itertools import permutations +from pathlib import Path +from langchain.tools import tool +from langchain_core.messages import HumanMessage, SystemMessage +import json +import re +from typing import Optional, Dict, Any, Tuple + + + # DEFINING TOOLS FOR FUNCTION CALLLING + +@tool +def multiply(a: int, b: int) -> int: + """Multiplies two numbers together. ONLY use this when the user explicitly asks you to multiply specific numbers or calculate a product.""" + return a * b + +@tool +def add(a: int, b: int) -> int: + """Adds two numbers. ONLY use this when the user explicitly asks you to add specific numbers or calculate a sum.""" + return a + b + +@tool +def climatic_zone_heating_times(zone: str) -> str: + """Returns heating hours/heating time/riscaldamento hours for Italian climatic zones. + Use this when the user asks about: + - heating hours, heating time, ore di riscaldamento + - climatic zones, zona climatica (A, B, C, D, E, F) + - how many hours, how long can I heat + + Examples of queries that should use this tool: + - "How many heating hours for zone E?" + - "What is the heating time for climatic zone C?" + - "ore di riscaldamento zona D" + + Args: + zone: The climatic zone letter (A, B, C, D, E, or F) + """ + # Since this point in the code is reached only when the LLM has already decided to use this specific tool, + # we can assume that we are talking about climatic zones, so we just have to extract the letter. + zone_match = re.search(r'[A-Fa-f]', zone.upper()) + if not zone_match: + return "Zona climatica non riconosciuta. Specificare una zona tra A, B, C, D, E, o F." + + zone_letter = zone_match.group(0).upper() + + zone_mapping = { + "A": "6 ore", + "B": "8 ore", + "C": "10 ore", + "D": "12 ore", + "E": "14 ore", + "F": "senza limiti" + } + + return zone_mapping.get(zone_letter, "Zona climatica non riconosciuta.") + +@tool +def shower_water_consumption(duration_minutes: float): + """Calculates water consumption/water used/water consumed/liters used during a shower. + Use this when the user asks about: + - water consumption in shower, water used in shower + - how much water, how many liters + - shower duration, shower time + - consumo acqua doccia + + Examples of queries that should use this tool: + - "How much water is consumed during a 35 minute shower?" + - "Water usage for 10 minute shower" + - "Liters consumed in a 20 minute shower" + + Takes shower duration in minutes, returns lower and upper estimates in liters. + """ + liters_per_min_lower = int(15) + liters_per_min_upper = int(18) + + lower_estimate = duration_minutes * liters_per_min_lower + upper_estimate = duration_minutes * liters_per_min_upper + + return lower_estimate, upper_estimate + +tools = [multiply, add, climatic_zone_heating_times, shower_water_consumption] + +# TOOL_PATTERNS = { +# "multiply": { +# "keywords": ["moltiplica", "multiplica", "prodotto", "per", "multiply", "times", "product"], +# "pattern": r'-?\d+\.?\d*', +# }, +# "add": { +# "keywords": ["somma", "aggiungi", "addiziona", "più", "add", "sum", "plus"], +# "pattern": r'-?\d+\.?\d*', +# }, +# "climatic_zone_heating_times": { +# "keywords": ["zona climatica", "zona", "riscaldamento", "ore", "heating", "climatic zone", "hours"], +# "pattern": r'\b(zona\s*)?([A-Fa-f])\b', +# } +# } + class KnowledgeManager: """A class to manage knowledge base operations.""" embedding: object @@ -59,8 +155,116 @@ def __init__(self, provider: str, model: str, embedding: str, language: str, kno self.knowledge_base_path = knowledge_base_path + def _check_and_invoke_tools(self, message: str) -> Tuple[bool, str, str]: + """Use LLM to determine if tools should be invoked, execute them, and generate final response.""" + + print(f"\n===== TOOL CALLING DEBUG START =====") + print(f"Input message: {message}") + + # First we build a tool list to provide the LLM in the prompt + tool_list = "\n".join([f"- {t.name}: {t.description}" for t in tools]) + + # We create a system prompt to guide the LLM in deciding whether or not to use a tool + system_prompt = f"""You are a tool invoking agent. Analyze if the user's query can be answered by ANY of the available tools, even if the wording is different. + +Available tools: +{tool_list} + +IMPORTANT: Match tools based on MEANING, not exact keywords: +- Questions about shower water/liters → use shower_water_consumption +- Questions about heating hours/time for zones → use climatic_zone_heating_times +- Questions about multiplication/product → use multiply +- Questions about addition/sum → use add + +Respond with JSON only: +{{"use_tool": true, "tool_name": "name", "parameters": {{"param": value}}}} +OR +{{"use_tool": false}} + +Examples: +"What is 5 times 3?" → {{"use_tool": true, "tool_name": "multiply", "parameters": {{"a": 5, "b": 3}}}} +"zona C" → {{"use_tool": true, "tool_name": "climatic_zone_heating_times", "parameters": {{"zone": "C"}}}} +"How much water for 1 hour shower?" → {{"use_tool": true, "tool_name": "shower_water_consumption", "parameters": {{"duration_minutes": 60}}}} +"Water consumed in 35 minute shower?" → {{"use_tool": true, "tool_name": "shower_water_consumption", "parameters": {{"duration_minutes": 35}}}} +"Heating hours zone E?" → {{"use_tool": true, "tool_name": "climatic_zone_heating_times", "parameters": {{"zone": "E"}}}} +""" + + try: + print(f"\nCalling LLM for tool decision...") + # Now we send the LLM both the user message and the system prompt and ask it to decide whether to use a tool + response = self.llm_handler.generate_response(system_prompt, f'Message: "{message}"', False) + + print(f"\nRaw LLM response:\n{response}") + print(f"\nResponse type: {type(response)}") + + # We take the response from the LLM and modify it to extract just the parts that we need: + # {"use_tool": true, "tool_name": "multiply"} etc. + # Finally we transform it into a python dict to be able to access the fields + response = response.strip().replace("```json", "").replace("```", "").strip() + print(f"\nCleaned response:\n{response}") + + decision = json.loads(response) + print(f"\nParsed decision: {decision}") + + # Now we can check the field "use_tool" to see if we need to invoke a tool + if not decision.get("use_tool"): + print(f"Decision: No tool needed") + print(f"===== TOOL CALLING DEBUG END =====\n") + return False, "", "" + + # If we are using a tool we extract the tool name and the parameters + tool_name = decision.get("tool_name") + params = decision.get("parameters", {}) + + print(f"\nTool to invoke: {tool_name}") + print(f"Parameters: {params}") + + # Now for each one of our tools that the LLM has decided that need to be used we invoke + # them using the .invoke() method in order to calculate the results + tool_result = None + for tool in tools: + if tool.name == tool_name: + print(f"\nInvoking tool: {tool.name}") + tool_result = tool.invoke(params) + print(f"TOOL CALLING: Tool '{tool_name}' used, with result: {tool_result}") + break + + # In case the tool is not found within our defined tools we return an error + if tool_result is None: + print(f"TOOL CALLING: Tool not found") + print(f"===== TOOL CALLING DEBUG END =====\n") + return False, "", "" + + # Finally the result of the tool calling is used together with the original user message + # to generate using again the LLM a final response to give back to the user + final_response_prompt = f"""You are a helpful assistant. An external function was called to help answer the user's question. + + User's question: {message} + + Function used: {tool_name} + Function result: {tool_result} + + Based on this function result, provide a clear and natural response to the user's question. + Incorporate the tool result into your answer in a helpful way. Try not to make up additional information not contained in the tool result + and don't openly mention that an external function was used to get the answer.""" + + print(f"\nGenerating final response...") + final_response = self.llm_handler.generate_response(final_response_prompt, "", False) + print(f"\nFinal response: {final_response}") + print(f"===== TOOL CALLING DEBUG END =====\n") + + return True, tool_result, final_response + + except Exception as e: + print(f"\n!!! EXCEPTION IN TOOL CALLING !!!") + print(f"Tool check failed: {e}") + import traceback + traceback.print_exc() + print(f"===== TOOL CALLING DEBUG END =====\n") + return False, "", "" + def _update_entries(self, existing, new_entries): - # Convert list to dict for fast access + # Convert list to dict for fast access entry_map = {item["id"]: item for item in existing} # Handle batch updates @@ -122,15 +326,26 @@ def user_message(self, message: str, user_type: str, house_type: str, region: st times = [] - #print(f"\n\n-----User message-----\n{message}") + print(f"\n\n-----User message-----\n{message}") - # If the question is not well-formed if len(message) < 3: return wrong_answer_prompt(self.language) - + + # Translate if needed if self.language != "English": message = self.llm_handler.generate_response(translate_chunk(), f"{message}", False) + # Logic to check if any tool needs to be used + # the function returns two values: a boolean indicating if a tool was used, + # and the result of the tool (if used) + # ADDED THIRD RETURN VALUE: the actual return from the tool calling + # that we output in order to exclude the chance that the + # LLM paraphrases or omits it from the final answer + tool_used, tool_raw, tool_final = self._check_and_invoke_tools(message) + if tool_used: + #This formatting is used to identify that the answer comes from a tool call + return f"Tool result: {tool_raw}\n\n{tool_final}" # This is a FINAL ANSWER + #print(f"\n\n-----English user message-----\n{message}") # Compute nodes and relationships for the user message. @@ -487,4 +702,3 @@ def user_message(self, message: str, user_type: str, house_type: str, region: st answer = wrong_answer_prompt(self.language) return answer - From 422f465d6c1a4edc8ea7012b72b797294810f16b Mon Sep 17 00:00:00 2001 From: Davide Morelli Date: Thu, 27 Nov 2025 15:11:58 +0100 Subject: [PATCH 2/2] Enhance user_message_stream to support tool calling Added logic to handle tool calling in user_message_stream, allowing direct streaming of final answers to the user. --- orchestrator/guru.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/orchestrator/guru.py b/orchestrator/guru.py index b834e88..61f88c1 100644 --- a/orchestrator/guru.py +++ b/orchestrator/guru.py @@ -72,16 +72,33 @@ def user_message(self, message: str):# -> str: else: return self.llm.generate_response(None, message) - def user_message_stream(self, message: str):# -> str: + def user_message_stream(self, message: str): """ Process a user message and return a response. Args: message (str): The user message to process. Returns: str: The response from the LLM. + ADDED THE LOGIC TO HANDLE THE TOOL CALLING STREAMING CASE + Now the module will take the response from the tool calling and send it directly to the user """ if self.user_knowledge: - yield from self.llm.generate_response_stream(self.know_base.user_message(message, self.user_type, self.house_type, self.region), message, False) + kb_response = self.know_base.user_message(message, self.user_type, self.house_type, self.region) + + # Check if this is a tool calling result (final answer) + if kb_response.startswith("Tool result:"): + # Extract just the final response part (after the raw tool result) + parts = kb_response.split("\n\n", 1) + if len(parts) > 1: + final_answer = parts[1] + else: + final_answer = kb_response + + # Stream the final answer directly without another LLM call + yield final_answer + else: + # This is a prompt for the knowledge base, use it as system prompt + yield from self.llm.generate_response_stream(kb_response, message, False) else: yield from self.llm.generate_response_stream(None, message)