Chatbot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

381 lines
14 KiB

import asyncio
from typing import Any, Dict, List, Tuple, Optional
from pydantic import BaseModel, Extra, Field, root_validator
from langchain.chains.llm import LLMChain
from langchain.memory.chat_memory import BaseChatMemory, BaseMemory
from langchain.memory.prompt import SUMMARY_PROMPT
from langchain.prompts.base import BasePromptTemplate
from langchain.base_language import BaseLanguageModel
from langchain.schema import BaseMessage, BaseChatMessageHistory, BaseMemory, get_buffer_string
from langchain.schema import AIMessage, HumanMessage, SystemMessage, ChatMessage
from langchain.memory.chat_message_histories.in_memory import ChatMessageHistory
from ..utilities.messages import Message
class ChatMessageHistoryCustom(BaseChatMessageHistory, BaseModel):
messages: List[BaseMessage] = []
def add_user_message(self, message: str) -> None:
self.messages.append(HumanMessage(content=message))
def add_ai_message(self, message: str) -> None:
self.messages.append(AIMessage(content=message))
def add_system_message(self, message: str) -> None:
self.messages.append(SystemMessage(content=message))
def add_chat_message(self, message: str) -> None:
self.messages.append(ChatMessage(content=message))
def clear(self) -> None:
self.messages = []
class CustomMemory(BaseMemory):
"""Buffer for storing conversation memory."""
human_prefix: str = "Human"
ai_prefix: str = "AI"
memory_key: str = "history" #: :meta private:
chat_memory: BaseChatMessageHistory = Field(default_factory=ChatMessageHistoryCustom)
chat_memory_day: BaseChatMessageHistory = Field(default_factory=ChatMessageHistoryCustom)
output_key: Optional[str] = None
input_key: Optional[str] = None
return_messages: bool = False
max_len: int = 1200
min_len: int = 200
#length_function: Callable[[str], int] = len,
#length_function: Callable[[str], int] = self.llm.get_num_tokens_from_messages,
moving_summary_buffer: str = ""
last_message_ids_summarized = []
llm: BaseLanguageModel
summary_prompt: BasePromptTemplate = SUMMARY_PROMPT
#summary_message_cls: Type[BaseMessage] = SystemMessage
def _get_input_output(
self, inputs: Dict[str, Any], outputs: Dict[str, str]
) -> Tuple[str, str]:
if self.input_key is None:
prompt_input_key = get_prompt_input_key(inputs, self.memory_variables)
else:
prompt_input_key = self.input_key
if self.output_key is None:
if len(outputs) != 1:
raise ValueError(f"One output key expected, got {outputs.keys()}")
output_key = list(outputs.keys())[0]
else:
output_key = self.output_key
return inputs[prompt_input_key], outputs[output_key]
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
input_str, output_str = self._get_input_output(inputs, outputs)
self.chat_memory.add_user_message(input_str)
self.chat_memory.add_ai_message(output_str)
self.chat_memory_day.add_user_message(input_str)
self.chat_memory_day.add_ai_message(output_str)
# Prune buffer if it exceeds max token limit
buffer = self.chat_memory.messages
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
if curr_buffer_length > self.max_len:
pruned_memory = []
while curr_buffer_length > self.min_len:
pruned_memory.append(buffer.pop(0))
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
loop = asyncio.get_event_loop()
self.moving_summary_buffer = loop.run_until_complete(
self.predict_new_summary(pruned_memory, self.moving_summary_buffer)
)
# loop.run_in_executor(None, self.predict_new_summary, pruned_memory, self.moving_summery_buffer)
async def prune_memory(self, max_len):
# Prune buffer if it exceeds max token limit
#ToDo: We can summarize the whole dialogue here, let half of it in the buffer but skip doing a summary until this is flushed, too?
buffer = self.chat_memory.messages
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
if curr_buffer_length > max_len:
pruned_memory = []
while curr_buffer_length > min(self.min_len, self.max_len) and len(buffer) > 0:
pruned_memory.append(buffer.pop(0))
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
self.moving_summary_buffer = await self.apredict_new_summary(pruned_memory, self.moving_summary_buffer)
for m in pruned_memory:
if "event_id" in m.additional_kwargs:
self.last_message_ids_summarized.append(m.additional_kwargs['event_id'])
self.last_message_ids_summarized = self.last_message_ids_summarized[-100 :]
async def asave_context(self, input_msg: BaseMessage, output_msg: BaseMessage) -> None:
"""Save context from this conversation to buffer."""
self.chat_memory.messages.append(input_msg)
self.chat_memory.messages.append(output_msg)
self.chat_memory_day.messages.append(input_msg)
self.chat_memory_day.messages.append(output_msg)
await self.prune_memory(self.max_len)
def clear(self) -> None:
"""Clear memory contents."""
self.chat_memory.clear()
self.chat_memory_day.clear()
self.moving_summary_buffer = ""
def get_buffer_string(self, messages: List[BaseMessage], human_prefix: str = "Human", ai_prefix: str = "AI") -> str:
"""Get buffer string of messages."""
string_messages = []
for m in messages:
if isinstance(m, HumanMessage):
role = human_prefix
elif isinstance(m, AIMessage):
role = ai_prefix
elif isinstance(m, SystemMessage):
role = "System"
elif isinstance(m, ChatMessage):
role = m.role
else:
raise ValueError(f"Got unsupported message type: {m}")
if "user_name" in m.additional_kwargs:
role = m.additional_kwargs["user_name"]
string_messages.append(f"{role}: {m.content}")
return "\n".join(string_messages)
@property
def buffer(self) -> Any:
"""String buffer of memory."""
if self.return_messages:
return self.chat_memory.messages
else:
return self.get_buffer_string(
self.chat_memory.messages,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
@property
def buffer_day(self) -> Any:
"""String buffer of memory."""
if self.return_messages:
return self.chat_memory_day.messages
else:
return self.get_buffer_string(
self.chat_memory_day.messages,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
@property
def memory_variables(self) -> List[str]:
"""Will always return list of memory variables.
:meta private:
"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Return history buffer."""
return {self.memory_key: self.buffer}
async def apredict_new_summary(self, messages: List[BaseMessage], existing_summary: str) -> str:
new_lines = self.get_buffer_string(
messages,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
chain = LLMChain(llm=self.llm, prompt=self.summary_prompt, verbose = True)
await asyncio.sleep(0)
output = await chain.apredict(summary=existing_summary, chat_history=new_lines)
return output.strip()
class ChangeNamesMemory(BaseMemory):
"""A memory wrapper that changes names."""
memory: BaseMemory
replace_ai_chat_names: Dict[str, str]
replace_human_chat_names: Dict[str, str]
def get_buffer_string(self, messages: List[BaseMessage]) -> str:
"""Get buffer string of messages."""
string_messages = []
for m in messages:
if isinstance(m, HumanMessage):
role = self.memory.human_prefix
elif isinstance(m, AIMessage):
role = self.memory.ai_prefix
elif isinstance(m, SystemMessage):
role = "System"
elif isinstance(m, ChatMessage):
role = m.role
else:
raise ValueError(f"Got unsupported message type: {m}")
if "user_name" in m.additional_kwargs:
role = m.additional_kwargs["user_name"]
if isinstance(m, HumanMessage):
for i, j in self.replace_human_chat_names.items():
role = role.replace(i, j)
elif isinstance(m, AIMessage):
for i, j in self.replace_ai_chat_names.items():
role = role.replace(i, j)
string_messages.append(f"{role}: {m.content}")
return "\n".join(string_messages)
@property
def buffer(self) -> Any:
"""String buffer of memory."""
if self.memory.return_messages:
return self.memory.chat_memory.messages
else:
return self.get_buffer_string(
self.memory.chat_memory.messages,
)
@property
def memory_variables(self) -> List[str]:
"""Return memory variables."""
return self.memory.memory_variables
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, str]:
"""Load memory variables from memory."""
return {self.memory.memory_key: self.buffer}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Nothing should be saved or changed"""
pass
def clear(self) -> None:
"""Nothing to clear, got a memory like a vault."""
pass
class ChatMessageHistoryMessage(BaseModel):
#messages: List[Message] = []
messages = []
def add_user_message(self, message: Message) -> None:
self.messages.append(message)
def add_ai_message(self, message: Message) -> None:
self.messages.append(message)
def add_system_message(self, message: Message) -> None:
self.messages.append(message)
def add_chat_message(self, message: Message) -> None:
self.messages.append(message)
def clear(self) -> None:
self.messages = []
class TestMemory(BaseMemory):
"""Buffer for storing conversation memory."""
human_prefix: str = "Human"
ai_prefix: str = "AI"
chat_memory: ChatMessageHistory = Field(default_factory=ChatMessageHistoryMessage)
# buffer: str = ""
output_key: Optional[str] = None
input_key: Optional[str] = None
memory_key: str = "history" #: :meta private:
@property
def memory_variables(self) -> List[str]:
"""Will always return list of memory variables.
:meta private:
"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Return history buffer."""
string_messages = []
for m in chat_memory.messages:
string_messages.append(f"{message.user_name}: {message.message}")
return {self.memory_key: "\n".join(string_messages)}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer."""
input_str, output_str = self._get_input_output(inputs, outputs)
self.chat_memory.add_user_message(input_str)
self.chat_memory.add_ai_message(output_str)
def clear(self) -> None:
"""Clear memory contents."""
self.chat_memory.clear()
class BotConversationSummaryBufferWindowMemory(BaseChatMemory):
"""Buffer for storing conversation memory."""
human_prefix: str = "Human"
ai_prefix: str = "AI"
# Define key to pass information about entities into prompt.
memory_key: str = "history" #: :meta private:
#k: int = 5
max_token_limit: int = 1200
min_token_limit: int = 200
moving_summary_buffer: str = ""
llm: BaseLanguageModel
summary_prompt: BasePromptTemplate = SUMMARY_PROMPT
@property
def buffer(self) -> List[BaseMessage]:
"""String buffer of memory."""
return self.chat_memory.messages
@property
def memory_variables(self) -> List[str]:
"""Will always return list of memory variables.
:meta private:
"""
return [self.memory_key]
def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, str]:
"""Return history buffer."""
buffer = self.buffer
#buffer: Any = self.buffer[-self.k * 2 :] if self.k > 0 else []
if not self.return_messages:
buffer = get_buffer_string(
buffer,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
return {self.memory_key: buffer}
def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
"""Save context from this conversation to buffer. Pruned."""
super().save_context(inputs, outputs)
# Prune buffer if it exceeds max token limit
buffer = self.chat_memory.messages
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
if curr_buffer_length > self.max_token_limit:
pruned_memory = []
while curr_buffer_length > self.min_token_limit:
pruned_memory.append(buffer.pop(0))
curr_buffer_length = self.llm.get_num_tokens_from_messages(buffer)
self.moving_summary_buffer = self.predict_new_summary(
pruned_memory, self.moving_summary_buffer
)
def clear(self) -> None:
"""Clear memory contents."""
super().clear()
self.moving_summary_buffer = ""
def predict_new_summary(
self, messages: List[BaseMessage], existing_summary: str
) -> str:
new_lines = get_buffer_string(
messages,
human_prefix=self.human_prefix,
ai_prefix=self.ai_prefix,
)
chain = LLMChain(llm=self.llm, prompt=self.summary_prompt)
return chain.predict(summary=existing_summary, new_lines=new_lines)