Spaces:
Runtime error
Runtime error
"""Base interface that all chains should implement.""" | |
import json | |
from abc import ABC, abstractmethod | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional, Union | |
import yaml | |
from pydantic import BaseModel, Field, validator | |
import langchain | |
from langchain.callbacks import get_callback_manager | |
from langchain.callbacks.base import BaseCallbackManager | |
from langchain.schema import BaseMemory | |
def _get_verbosity() -> bool: | |
return langchain.verbose | |
class Chain(BaseModel, ABC): | |
"""Base interface that all chains should implement.""" | |
memory: Optional[BaseMemory] = None | |
callback_manager: BaseCallbackManager = Field( | |
default_factory=get_callback_manager, exclude=True | |
) | |
verbose: bool = Field( | |
default_factory=_get_verbosity | |
) # Whether to print the response text | |
class Config: | |
"""Configuration for this pydantic object.""" | |
arbitrary_types_allowed = True | |
def _chain_type(self) -> str: | |
raise NotImplementedError("Saving not supported for this chain type.") | |
def set_callback_manager( | |
cls, callback_manager: Optional[BaseCallbackManager] | |
) -> BaseCallbackManager: | |
"""If callback manager is None, set it. | |
This allows users to pass in None as callback manager, which is a nice UX. | |
""" | |
return callback_manager or get_callback_manager() | |
def set_verbose(cls, verbose: Optional[bool]) -> bool: | |
"""If verbose is None, set it. | |
This allows users to pass in None as verbose to access the global setting. | |
""" | |
if verbose is None: | |
return _get_verbosity() | |
else: | |
return verbose | |
def input_keys(self) -> List[str]: | |
"""Input keys this chain expects.""" | |
def output_keys(self) -> List[str]: | |
"""Output keys this chain expects.""" | |
def _validate_inputs(self, inputs: Dict[str, str]) -> None: | |
"""Check that all inputs are present.""" | |
missing_keys = set(self.input_keys).difference(inputs) | |
if missing_keys: | |
raise ValueError(f"Missing some input keys: {missing_keys}") | |
def _validate_outputs(self, outputs: Dict[str, str]) -> None: | |
if set(outputs) != set(self.output_keys): | |
raise ValueError( | |
f"Did not get output keys that were expected. " | |
f"Got: {set(outputs)}. Expected: {set(self.output_keys)}." | |
) | |
def _call(self, inputs: Dict[str, str]) -> Dict[str, str]: | |
"""Run the logic of this chain and return the output.""" | |
async def _acall(self, inputs: Dict[str, str]) -> Dict[str, str]: | |
"""Run the logic of this chain and return the output.""" | |
raise NotImplementedError("Async call not supported for this chain type.") | |
def __call__( | |
self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False | |
) -> Dict[str, Any]: | |
"""Run the logic of this chain and add to output if desired. | |
Args: | |
inputs: Dictionary of inputs, or single input if chain expects | |
only one param. | |
return_only_outputs: boolean for whether to return only outputs in the | |
response. If True, only new keys generated by this chain will be | |
returned. If False, both input keys and new keys generated by this | |
chain will be returned. Defaults to False. | |
""" | |
inputs = self.prep_inputs(inputs) | |
self.callback_manager.on_chain_start( | |
{"name": self.__class__.__name__}, | |
inputs, | |
verbose=self.verbose, | |
) | |
try: | |
outputs = self._call(inputs) | |
except (KeyboardInterrupt, Exception) as e: | |
self.callback_manager.on_chain_error(e, verbose=self.verbose) | |
raise e | |
self.callback_manager.on_chain_end(outputs, verbose=self.verbose) | |
return self.prep_outputs(inputs, outputs, return_only_outputs) | |
async def acall( | |
self, inputs: Union[Dict[str, Any], Any], return_only_outputs: bool = False | |
) -> Dict[str, Any]: | |
"""Run the logic of this chain and add to output if desired. | |
Args: | |
inputs: Dictionary of inputs, or single input if chain expects | |
only one param. | |
return_only_outputs: boolean for whether to return only outputs in the | |
response. If True, only new keys generated by this chain will be | |
returned. If False, both input keys and new keys generated by this | |
chain will be returned. Defaults to False. | |
""" | |
inputs = self.prep_inputs(inputs) | |
if self.callback_manager.is_async: | |
await self.callback_manager.on_chain_start( | |
{"name": self.__class__.__name__}, | |
inputs, | |
verbose=self.verbose, | |
) | |
else: | |
self.callback_manager.on_chain_start( | |
{"name": self.__class__.__name__}, | |
inputs, | |
verbose=self.verbose, | |
) | |
try: | |
outputs = await self._acall(inputs) | |
except (KeyboardInterrupt, Exception) as e: | |
if self.callback_manager.is_async: | |
await self.callback_manager.on_chain_error(e, verbose=self.verbose) | |
else: | |
self.callback_manager.on_chain_error(e, verbose=self.verbose) | |
raise e | |
if self.callback_manager.is_async: | |
await self.callback_manager.on_chain_end(outputs, verbose=self.verbose) | |
else: | |
self.callback_manager.on_chain_end(outputs, verbose=self.verbose) | |
return self.prep_outputs(inputs, outputs, return_only_outputs) | |
def prep_outputs( | |
self, | |
inputs: Dict[str, str], | |
outputs: Dict[str, str], | |
return_only_outputs: bool = False, | |
) -> Dict[str, str]: | |
"""Validate and prep outputs.""" | |
self._validate_outputs(outputs) | |
if self.memory is not None: | |
self.memory.save_context(inputs, outputs) | |
if return_only_outputs: | |
return outputs | |
else: | |
return {**inputs, **outputs} | |
def prep_inputs(self, inputs: Union[Dict[str, Any], Any]) -> Dict[str, str]: | |
"""Validate and prep inputs.""" | |
if not isinstance(inputs, dict): | |
_input_keys = set(self.input_keys) | |
if self.memory is not None: | |
# If there are multiple input keys, but some get set by memory so that | |
# only one is not set, we can still figure out which key it is. | |
_input_keys = _input_keys.difference(self.memory.memory_variables) | |
if len(_input_keys) != 1: | |
raise ValueError( | |
f"A single string input was passed in, but this chain expects " | |
f"multiple inputs ({_input_keys}). When a chain expects " | |
f"multiple inputs, please call it by passing in a dictionary, " | |
"eg `chain({'foo': 1, 'bar': 2})`" | |
) | |
inputs = {list(_input_keys)[0]: inputs} | |
if self.memory is not None: | |
external_context = self.memory.load_memory_variables(inputs) | |
inputs = dict(inputs, **external_context) | |
self._validate_inputs(inputs) | |
return inputs | |
def apply(self, input_list: List[Dict[str, Any]]) -> List[Dict[str, str]]: | |
"""Call the chain on all inputs in the list.""" | |
return [self(inputs) for inputs in input_list] | |
def run(self, *args: str, **kwargs: str) -> str: | |
"""Run the chain as text in, text out or multiple variables, text out.""" | |
if len(self.output_keys) != 1: | |
raise ValueError( | |
f"`run` not supported when there is not exactly " | |
f"one output key. Got {self.output_keys}." | |
) | |
if args and not kwargs: | |
if len(args) != 1: | |
raise ValueError("`run` supports only one positional argument.") | |
return self(args[0])[self.output_keys[0]] | |
if kwargs and not args: | |
return self(kwargs)[self.output_keys[0]] | |
raise ValueError( | |
f"`run` supported with either positional arguments or keyword arguments" | |
f" but not both. Got args: {args} and kwargs: {kwargs}." | |
) | |
async def arun(self, *args: str, **kwargs: str) -> str: | |
"""Run the chain as text in, text out or multiple variables, text out.""" | |
if len(self.output_keys) != 1: | |
raise ValueError( | |
f"`run` not supported when there is not exactly " | |
f"one output key. Got {self.output_keys}." | |
) | |
if args and not kwargs: | |
if len(args) != 1: | |
raise ValueError("`run` supports only one positional argument.") | |
return (await self.acall(args[0]))[self.output_keys[0]] | |
if kwargs and not args: | |
return (await self.acall(kwargs))[self.output_keys[0]] | |
raise ValueError( | |
f"`run` supported with either positional arguments or keyword arguments" | |
f" but not both. Got args: {args} and kwargs: {kwargs}." | |
) | |
def dict(self, **kwargs: Any) -> Dict: | |
"""Return dictionary representation of chain.""" | |
if self.memory is not None: | |
raise ValueError("Saving of memory is not yet supported.") | |
_dict = super().dict() | |
_dict["_type"] = self._chain_type | |
return _dict | |
def save(self, file_path: Union[Path, str]) -> None: | |
"""Save the chain. | |
Args: | |
file_path: Path to file to save the chain to. | |
Example: | |
.. code-block:: python | |
chain.save(file_path="path/chain.yaml") | |
""" | |
# Convert file to Path object. | |
if isinstance(file_path, str): | |
save_path = Path(file_path) | |
else: | |
save_path = file_path | |
directory_path = save_path.parent | |
directory_path.mkdir(parents=True, exist_ok=True) | |
# Fetch dictionary to save | |
chain_dict = self.dict() | |
if save_path.suffix == ".json": | |
with open(file_path, "w") as f: | |
json.dump(chain_dict, f, indent=4) | |
elif save_path.suffix == ".yaml": | |
with open(file_path, "w") as f: | |
yaml.dump(chain_dict, f, default_flow_style=False) | |
else: | |
raise ValueError(f"{save_path} must be json or yaml") | |