Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 219 additions & 5 deletions knowledge_base/knowledge_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,104 @@
import chromadb
from chromadb.config import Settings
from itertools import permutations
from pathlib import Path
from langchain.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
import json
import re
from typing import Optional, Dict, Any, Tuple


# DEFINING TOOLS FOR FUNCTION CALLLING

@tool
def multiply(a: int, b: int) -> int:
"""Multiplies two numbers together. ONLY use this when the user explicitly asks you to multiply specific numbers or calculate a product."""
return a * b

@tool
def add(a: int, b: int) -> int:
"""Adds two numbers. ONLY use this when the user explicitly asks you to add specific numbers or calculate a sum."""
return a + b

@tool
def climatic_zone_heating_times(zone: str) -> str:
"""Returns heating hours/heating time/riscaldamento hours for Italian climatic zones.
Use this when the user asks about:
- heating hours, heating time, ore di riscaldamento
- climatic zones, zona climatica (A, B, C, D, E, F)
- how many hours, how long can I heat

Examples of queries that should use this tool:
- "How many heating hours for zone E?"
- "What is the heating time for climatic zone C?"
- "ore di riscaldamento zona D"

Args:
zone: The climatic zone letter (A, B, C, D, E, or F)
"""
# Since this point in the code is reached only when the LLM has already decided to use this specific tool,
# we can assume that we are talking about climatic zones, so we just have to extract the letter.
zone_match = re.search(r'[A-Fa-f]', zone.upper())
if not zone_match:
return "Zona climatica non riconosciuta. Specificare una zona tra A, B, C, D, E, o F."

zone_letter = zone_match.group(0).upper()

zone_mapping = {
"A": "6 ore",
"B": "8 ore",
"C": "10 ore",
"D": "12 ore",
"E": "14 ore",
"F": "senza limiti"
}

return zone_mapping.get(zone_letter, "Zona climatica non riconosciuta.")

@tool
def shower_water_consumption(duration_minutes: float):
"""Calculates water consumption/water used/water consumed/liters used during a shower.
Use this when the user asks about:
- water consumption in shower, water used in shower
- how much water, how many liters
- shower duration, shower time
- consumo acqua doccia

Examples of queries that should use this tool:
- "How much water is consumed during a 35 minute shower?"
- "Water usage for 10 minute shower"
- "Liters consumed in a 20 minute shower"

Takes shower duration in minutes, returns lower and upper estimates in liters.
"""
liters_per_min_lower = int(15)
liters_per_min_upper = int(18)

lower_estimate = duration_minutes * liters_per_min_lower
upper_estimate = duration_minutes * liters_per_min_upper

return lower_estimate, upper_estimate



tools = [multiply, add, climatic_zone_heating_times, shower_water_consumption]

# TOOL_PATTERNS = {
# "multiply": {
# "keywords": ["moltiplica", "multiplica", "prodotto", "per", "multiply", "times", "product"],
# "pattern": r'-?\d+\.?\d*',
# },
# "add": {
# "keywords": ["somma", "aggiungi", "addiziona", "più", "add", "sum", "plus"],
# "pattern": r'-?\d+\.?\d*',
# },
# "climatic_zone_heating_times": {
# "keywords": ["zona climatica", "zona", "riscaldamento", "ore", "heating", "climatic zone", "hours"],
# "pattern": r'\b(zona\s*)?([A-Fa-f])\b',
# }
# }

class KnowledgeManager:
"""A class to manage knowledge base operations."""
embedding: object
Expand Down Expand Up @@ -59,8 +155,116 @@ def __init__(self, provider: str, model: str, embedding: str, language: str, kno

self.knowledge_base_path = knowledge_base_path

def _check_and_invoke_tools(self, message: str) -> Tuple[bool, str, str]:
"""Use LLM to determine if tools should be invoked, execute them, and generate final response."""

print(f"\n===== TOOL CALLING DEBUG START =====")
print(f"Input message: {message}")

# First we build a tool list to provide the LLM in the prompt
tool_list = "\n".join([f"- {t.name}: {t.description}" for t in tools])

# We create a system prompt to guide the LLM in deciding whether or not to use a tool
system_prompt = f"""You are a tool invoking agent. Analyze if the user's query can be answered by ANY of the available tools, even if the wording is different.

Available tools:
{tool_list}

IMPORTANT: Match tools based on MEANING, not exact keywords:
- Questions about shower water/liters → use shower_water_consumption
- Questions about heating hours/time for zones → use climatic_zone_heating_times
- Questions about multiplication/product → use multiply
- Questions about addition/sum → use add

Respond with JSON only:
{{"use_tool": true, "tool_name": "name", "parameters": {{"param": value}}}}
OR
{{"use_tool": false}}

Examples:
"What is 5 times 3?" → {{"use_tool": true, "tool_name": "multiply", "parameters": {{"a": 5, "b": 3}}}}
"zona C" → {{"use_tool": true, "tool_name": "climatic_zone_heating_times", "parameters": {{"zone": "C"}}}}
"How much water for 1 hour shower?" → {{"use_tool": true, "tool_name": "shower_water_consumption", "parameters": {{"duration_minutes": 60}}}}
"Water consumed in 35 minute shower?" → {{"use_tool": true, "tool_name": "shower_water_consumption", "parameters": {{"duration_minutes": 35}}}}
"Heating hours zone E?" → {{"use_tool": true, "tool_name": "climatic_zone_heating_times", "parameters": {{"zone": "E"}}}}
"""

try:
print(f"\nCalling LLM for tool decision...")
# Now we send the LLM both the user message and the system prompt and ask it to decide whether to use a tool
response = self.llm_handler.generate_response(system_prompt, f'Message: "{message}"', False)

print(f"\nRaw LLM response:\n{response}")
print(f"\nResponse type: {type(response)}")

# We take the response from the LLM and modify it to extract just the parts that we need:
# {"use_tool": true, "tool_name": "multiply"} etc.
# Finally we transform it into a python dict to be able to access the fields
response = response.strip().replace("```json", "").replace("```", "").strip()
print(f"\nCleaned response:\n{response}")

decision = json.loads(response)
print(f"\nParsed decision: {decision}")

# Now we can check the field "use_tool" to see if we need to invoke a tool
if not decision.get("use_tool"):
print(f"Decision: No tool needed")
print(f"===== TOOL CALLING DEBUG END =====\n")
return False, "", ""

# If we are using a tool we extract the tool name and the parameters
tool_name = decision.get("tool_name")
params = decision.get("parameters", {})

print(f"\nTool to invoke: {tool_name}")
print(f"Parameters: {params}")

# Now for each one of our tools that the LLM has decided that need to be used we invoke
# them using the .invoke() method in order to calculate the results
tool_result = None
for tool in tools:
if tool.name == tool_name:
print(f"\nInvoking tool: {tool.name}")
tool_result = tool.invoke(params)
print(f"TOOL CALLING: Tool '{tool_name}' used, with result: {tool_result}")
break

# In case the tool is not found within our defined tools we return an error
if tool_result is None:
print(f"TOOL CALLING: Tool not found")
print(f"===== TOOL CALLING DEBUG END =====\n")
return False, "", ""

# Finally the result of the tool calling is used together with the original user message
# to generate using again the LLM a final response to give back to the user
final_response_prompt = f"""You are a helpful assistant. An external function was called to help answer the user's question.

User's question: {message}

Function used: {tool_name}
Function result: {tool_result}

Based on this function result, provide a clear and natural response to the user's question.
Incorporate the tool result into your answer in a helpful way. Try not to make up additional information not contained in the tool result
and don't openly mention that an external function was used to get the answer."""

print(f"\nGenerating final response...")
final_response = self.llm_handler.generate_response(final_response_prompt, "", False)
print(f"\nFinal response: {final_response}")
print(f"===== TOOL CALLING DEBUG END =====\n")

return True, tool_result, final_response

except Exception as e:
print(f"\n!!! EXCEPTION IN TOOL CALLING !!!")
print(f"Tool check failed: {e}")
import traceback
traceback.print_exc()
print(f"===== TOOL CALLING DEBUG END =====\n")
return False, "", ""

def _update_entries(self, existing, new_entries):
# Convert list to dict for fast access
# Convert list to dict for fast access
entry_map = {item["id"]: item for item in existing}

# Handle batch updates
Expand Down Expand Up @@ -122,15 +326,26 @@ def user_message(self, message: str, user_type: str, house_type: str, region: st

times = []

#print(f"\n\n-----User message-----\n{message}")
print(f"\n\n-----User message-----\n{message}")

# If the question is not well-formed
if len(message) < 3:
return wrong_answer_prompt(self.language)


# Translate if needed
if self.language != "English":
message = self.llm_handler.generate_response(translate_chunk(), f"{message}", False)

# Logic to check if any tool needs to be used
# the function returns two values: a boolean indicating if a tool was used,
# and the result of the tool (if used)
# ADDED THIRD RETURN VALUE: the actual return from the tool calling
# that we output in order to exclude the chance that the
# LLM paraphrases or omits it from the final answer
tool_used, tool_raw, tool_final = self._check_and_invoke_tools(message)
if tool_used:
#This formatting is used to identify that the answer comes from a tool call
return f"Tool result: {tool_raw}\n\n{tool_final}" # This is a FINAL ANSWER

#print(f"\n\n-----English user message-----\n{message}")

# Compute nodes and relationships for the user message.
Expand Down Expand Up @@ -487,4 +702,3 @@ def user_message(self, message: str, user_type: str, house_type: str, region: st
answer = wrong_answer_prompt(self.language)

return answer

21 changes: 19 additions & 2 deletions orchestrator/guru.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,33 @@ def user_message(self, message: str):# -> str:
else:
return self.llm.generate_response(None, message)

def user_message_stream(self, message: str):# -> str:
def user_message_stream(self, message: str):
"""
Process a user message and return a response.
Args:
message (str): The user message to process.
Returns:
str: The response from the LLM.
ADDED THE LOGIC TO HANDLE THE TOOL CALLING STREAMING CASE
Now the module will take the response from the tool calling and send it directly to the user
"""
if self.user_knowledge:
yield from self.llm.generate_response_stream(self.know_base.user_message(message, self.user_type, self.house_type, self.region), message, False)
kb_response = self.know_base.user_message(message, self.user_type, self.house_type, self.region)

# Check if this is a tool calling result (final answer)
if kb_response.startswith("Tool result:"):
# Extract just the final response part (after the raw tool result)
parts = kb_response.split("\n\n", 1)
if len(parts) > 1:
final_answer = parts[1]
else:
final_answer = kb_response

# Stream the final answer directly without another LLM call
yield final_answer
else:
# This is a prompt for the knowledge base, use it as system prompt
yield from self.llm.generate_response_stream(kb_response, message, False)
else:
yield from self.llm.generate_response_stream(None, message)

Expand Down