|
| 1 | +import re |
| 2 | +from datetime import datetime |
| 3 | +from typing import Any, Dict, List, Optional, Tuple |
| 4 | + |
| 5 | +from pydantic import BaseModel, Field |
| 6 | + |
| 7 | +from langchain import LLMChain |
| 8 | +from langchain.experimental.generative_agents.memory import GenerativeAgentMemory |
| 9 | +from langchain.prompts import PromptTemplate |
| 10 | +from langchain.schema import BaseLanguageModel |
| 11 | + |
| 12 | + |
| 13 | +class GenerativeAgent(BaseModel): |
| 14 | + """A character with memory and innate characteristics.""" |
| 15 | + |
| 16 | + name: str |
| 17 | + """The character's name.""" |
| 18 | + |
| 19 | + age: Optional[int] = None |
| 20 | + """The optional age of the character.""" |
| 21 | + traits: str = "N/A" |
| 22 | + """Permanent traits to ascribe to the character.""" |
| 23 | + status: str |
| 24 | + """The traits of the character you wish not to change.""" |
| 25 | + memory: GenerativeAgentMemory |
| 26 | + """The memory object that combines relevance, recency, and 'importance'.""" |
| 27 | + llm: BaseLanguageModel |
| 28 | + """The underlying language model.""" |
| 29 | + verbose: bool = False |
| 30 | + summary: str = "" #: :meta private: |
| 31 | + """Stateful self-summary generated via reflection on the character's memory.""" |
| 32 | + |
| 33 | + summary_refresh_seconds: int = 3600 #: :meta private: |
| 34 | + """How frequently to re-generate the summary.""" |
| 35 | + |
| 36 | + last_refreshed: datetime = Field(default_factory=datetime.now) # : :meta private: |
| 37 | + """The last time the character's summary was regenerated.""" |
| 38 | + |
| 39 | + daily_summaries: List[str] = Field(default_factory=list) # : :meta private: |
| 40 | + """Summary of the events in the plan that the agent took.""" |
| 41 | + |
| 42 | + class Config: |
| 43 | + """Configuration for this pydantic object.""" |
| 44 | + |
| 45 | + arbitrary_types_allowed = True |
| 46 | + |
| 47 | + # LLM-related methods |
| 48 | + @staticmethod |
| 49 | + def _parse_list(text: str) -> List[str]: |
| 50 | + """Parse a newline-separated string into a list of strings.""" |
| 51 | + lines = re.split(r"\n", text.strip()) |
| 52 | + return [re.sub(r"^\s*\d+\.\s*", "", line).strip() for line in lines] |
| 53 | + |
| 54 | + def chain(self, prompt: PromptTemplate) -> LLMChain: |
| 55 | + return LLMChain( |
| 56 | + llm=self.llm, prompt=prompt, verbose=self.verbose, memory=self.memory |
| 57 | + ) |
| 58 | + |
| 59 | + def _get_entity_from_observation(self, observation: str) -> str: |
| 60 | + prompt = PromptTemplate.from_template( |
| 61 | + "What is the observed entity in the following observation? {observation}" |
| 62 | + + "\nEntity=" |
| 63 | + ) |
| 64 | + return self.chain(prompt).run(observation=observation).strip() |
| 65 | + |
| 66 | + def _get_entity_action(self, observation: str, entity_name: str) -> str: |
| 67 | + prompt = PromptTemplate.from_template( |
| 68 | + "What is the {entity} doing in the following observation? {observation}" |
| 69 | + + "\nThe {entity} is" |
| 70 | + ) |
| 71 | + return ( |
| 72 | + self.chain(prompt).run(entity=entity_name, observation=observation).strip() |
| 73 | + ) |
| 74 | + |
| 75 | + def summarize_related_memories(self, observation: str) -> str: |
| 76 | + """Summarize memories that are most relevant to an observation.""" |
| 77 | + prompt = PromptTemplate.from_template( |
| 78 | + """ |
| 79 | +{q1}? |
| 80 | +Context from memory: |
| 81 | +{relevant_memories} |
| 82 | +Relevant context: |
| 83 | +""" |
| 84 | + ) |
| 85 | + entity_name = self._get_entity_from_observation(observation) |
| 86 | + entity_action = self._get_entity_action(observation, entity_name) |
| 87 | + q1 = f"What is the relationship between {self.name} and {entity_name}" |
| 88 | + q2 = f"{entity_name} is {entity_action}" |
| 89 | + return self.chain(prompt=prompt).run(q1=q1, queries=[q1, q2]).strip() |
| 90 | + |
| 91 | + def _generate_reaction(self, observation: str, suffix: str) -> str: |
| 92 | + """React to a given observation or dialogue act.""" |
| 93 | + prompt = PromptTemplate.from_template( |
| 94 | + "{agent_summary_description}" |
| 95 | + + "\nIt is {current_time}." |
| 96 | + + "\n{agent_name}'s status: {agent_status}" |
| 97 | + + "\nSummary of relevant context from {agent_name}'s memory:" |
| 98 | + + "\n{relevant_memories}" |
| 99 | + + "\nMost recent observations: {most_recent_memories}" |
| 100 | + + "\nObservation: {observation}" |
| 101 | + + "\n\n" |
| 102 | + + suffix |
| 103 | + ) |
| 104 | + agent_summary_description = self.get_summary() |
| 105 | + relevant_memories_str = self.summarize_related_memories(observation) |
| 106 | + current_time_str = datetime.now().strftime("%B %d, %Y, %I:%M %p") |
| 107 | + kwargs: Dict[str, Any] = dict( |
| 108 | + agent_summary_description=agent_summary_description, |
| 109 | + current_time=current_time_str, |
| 110 | + relevant_memories=relevant_memories_str, |
| 111 | + agent_name=self.name, |
| 112 | + observation=observation, |
| 113 | + agent_status=self.status, |
| 114 | + ) |
| 115 | + consumed_tokens = self.llm.get_num_tokens( |
| 116 | + prompt.format(most_recent_memories="", **kwargs) |
| 117 | + ) |
| 118 | + kwargs[self.memory.most_recent_memories_token_key] = consumed_tokens |
| 119 | + return self.chain(prompt=prompt).run(**kwargs).strip() |
| 120 | + |
| 121 | + def _clean_response(self, text: str) -> str: |
| 122 | + return re.sub(f"^{self.name} ", "", text.strip()).strip() |
| 123 | + |
| 124 | + def generate_reaction(self, observation: str) -> Tuple[bool, str]: |
| 125 | + """React to a given observation.""" |
| 126 | + call_to_action_template = ( |
| 127 | + "Should {agent_name} react to the observation, and if so," |
| 128 | + + " what would be an appropriate reaction? Respond in one line." |
| 129 | + + ' If the action is to engage in dialogue, write:\nSAY: "what to say"' |
| 130 | + + "\notherwise, write:\nREACT: {agent_name}'s reaction (if anything)." |
| 131 | + + "\nEither do nothing, react, or say something but not both.\n\n" |
| 132 | + ) |
| 133 | + full_result = self._generate_reaction(observation, call_to_action_template) |
| 134 | + result = full_result.strip().split("\n")[0] |
| 135 | + # AAA |
| 136 | + self.memory.save_context( |
| 137 | + {}, |
| 138 | + { |
| 139 | + self.memory.add_memory_key: f"{self.name} observed " |
| 140 | + f"{observation} and reacted by {result}" |
| 141 | + }, |
| 142 | + ) |
| 143 | + if "REACT:" in result: |
| 144 | + reaction = self._clean_response(result.split("REACT:")[-1]) |
| 145 | + return False, f"{self.name} {reaction}" |
| 146 | + if "SAY:" in result: |
| 147 | + said_value = self._clean_response(result.split("SAY:")[-1]) |
| 148 | + return True, f"{self.name} said {said_value}" |
| 149 | + else: |
| 150 | + return False, result |
| 151 | + |
| 152 | + def generate_dialogue_response(self, observation: str) -> Tuple[bool, str]: |
| 153 | + """React to a given observation.""" |
| 154 | + call_to_action_template = ( |
| 155 | + "What would {agent_name} say? To end the conversation, write:" |
| 156 | + ' GOODBYE: "what to say". Otherwise to continue the conversation,' |
| 157 | + ' write: SAY: "what to say next"\n\n' |
| 158 | + ) |
| 159 | + full_result = self._generate_reaction(observation, call_to_action_template) |
| 160 | + result = full_result.strip().split("\n")[0] |
| 161 | + if "GOODBYE:" in result: |
| 162 | + farewell = self._clean_response(result.split("GOODBYE:")[-1]) |
| 163 | + self.memory.save_context( |
| 164 | + {}, |
| 165 | + { |
| 166 | + self.memory.add_memory_key: f"{self.name} observed " |
| 167 | + f"{observation} and said {farewell}" |
| 168 | + }, |
| 169 | + ) |
| 170 | + return False, f"{self.name} said {farewell}" |
| 171 | + if "SAY:" in result: |
| 172 | + response_text = self._clean_response(result.split("SAY:")[-1]) |
| 173 | + self.memory.save_context( |
| 174 | + {}, |
| 175 | + { |
| 176 | + self.memory.add_memory_key: f"{self.name} observed " |
| 177 | + f"{observation} and said {response_text}" |
| 178 | + }, |
| 179 | + ) |
| 180 | + return True, f"{self.name} said {response_text}" |
| 181 | + else: |
| 182 | + return False, result |
| 183 | + |
| 184 | + ###################################################### |
| 185 | + # Agent stateful' summary methods. # |
| 186 | + # Each dialog or response prompt includes a header # |
| 187 | + # summarizing the agent's self-description. This is # |
| 188 | + # updated periodically through probing its memories # |
| 189 | + ###################################################### |
| 190 | + def _compute_agent_summary(self) -> str: |
| 191 | + """""" |
| 192 | + prompt = PromptTemplate.from_template( |
| 193 | + "How would you summarize {name}'s core characteristics given the" |
| 194 | + + " following statements:\n" |
| 195 | + + "{relevant_memories}" |
| 196 | + + "Do not embellish." |
| 197 | + + "\n\nSummary: " |
| 198 | + ) |
| 199 | + # The agent seeks to think about their core characteristics. |
| 200 | + return ( |
| 201 | + self.chain(prompt) |
| 202 | + .run(name=self.name, queries=[f"{self.name}'s core characteristics"]) |
| 203 | + .strip() |
| 204 | + ) |
| 205 | + |
| 206 | + def get_summary(self, force_refresh: bool = False) -> str: |
| 207 | + """Return a descriptive summary of the agent.""" |
| 208 | + current_time = datetime.now() |
| 209 | + since_refresh = (current_time - self.last_refreshed).seconds |
| 210 | + if ( |
| 211 | + not self.summary |
| 212 | + or since_refresh >= self.summary_refresh_seconds |
| 213 | + or force_refresh |
| 214 | + ): |
| 215 | + self.summary = self._compute_agent_summary() |
| 216 | + self.last_refreshed = current_time |
| 217 | + age = self.age if self.age is not None else "N/A" |
| 218 | + return ( |
| 219 | + f"Name: {self.name} (age: {age})" |
| 220 | + + f"\nInnate traits: {self.traits}" |
| 221 | + + f"\n{self.summary}" |
| 222 | + ) |
| 223 | + |
| 224 | + def get_full_header(self, force_refresh: bool = False) -> str: |
| 225 | + """Return a full header of the agent's status, summary, and current time.""" |
| 226 | + summary = self.get_summary(force_refresh=force_refresh) |
| 227 | + current_time_str = datetime.now().strftime("%B %d, %Y, %I:%M %p") |
| 228 | + return ( |
| 229 | + f"{summary}\nIt is {current_time_str}.\n{self.name}'s status: {self.status}" |
| 230 | + ) |
0 commit comments