Spaces:
Sleeping
Sleeping
import os | |
import json | |
import pandas as pd | |
import gradio as gr | |
from llama_index import ( | |
VectorStoreIndex, | |
download_loader, | |
) | |
import chromadb | |
import typing_extensions | |
from llama_index import Document | |
from llama_index.llms import MistralAI | |
from llama_index.embeddings import MistralAIEmbedding | |
from llama_index.vector_stores import ChromaVectorStore | |
from llama_index.storage.storage_context import StorageContext | |
from llama_index import ServiceContext | |
from utils import departments_list, region_list | |
title = "Team LFD rotation finder app" | |
description = "Propose a rotation for a farmer" | |
placeholder = ( | |
"Vous pouvez me posez une question sur ce contexte, appuyer sur Entrée pour valider" | |
) | |
placeholder_url = "Extract text from this url" | |
llm_model = "mistral-tiny" | |
env_api_key = os.environ.get("MISTRAL_API_KEY") | |
query_engine = None | |
# Define LLMs | |
llm = MistralAI(api_key=env_api_key, model=llm_model, temperature = 0.05) | |
embed_model = MistralAIEmbedding(model_name="mistral-embed", api_key=env_api_key, max_length=10000) | |
# create client and a new collection | |
db = chromadb.PersistentClient(path="./chroma_db") | |
chroma_collection = db.get_or_create_collection("quickstart") | |
# set up ChromaVectorStore and load in data | |
vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
service_context = ServiceContext.from_defaults( | |
chunk_size=1024, llm=llm, embed_model=embed_model | |
) | |
PDFReader = download_loader("PDFReader") | |
loader = PDFReader() | |
index = VectorStoreIndex( | |
[], service_context=service_context, storage_context=storage_context | |
) | |
query_engine = index.as_query_engine(similarity_top_k=5, similarity_threshold=0.8) | |
def create_prompt(farmSize, cultures): | |
prompt = f""" | |
You are a French agronomical advisor, answering in French. Your task is to provide an concise advice as a table of rotation crops (with a prioritary suggestion and an alternative one) to the farmer what to seed in the next year and in which proportion. You will be given the historical information about the farmer, and context data given previously gives you average performances in yield per hectare by region and by culture, as well as production costs and selling prices. Consider agronomical limitation and provide advice to the farmer to maximize his profit (maximum yield and revenue : (the difference between the selling price and the cost of production) mutliplied by the yield). There are three possible scenarii, pessimistic (lowest revenue), optimistic (highest revenue) and mean. | |
#facts | |
The farm area is {farmSize} ha. | |
""" | |
for i, culture in enumerate(cultures): | |
prompt += f"Parcel {i+1} most recently grew {culture}." | |
prompt += """I need you to answer in French formulating a concise table with the crops you want to grow and by parcel, and predicting gross margin per hectare according to the scenario asked for (mean, pessimistic or optimistic. Default: mean). | |
Réponds en français en formulant un tableau concis avec les cultures que tu veux cultiver et par parcelle, et en prévoyant la marge brute et le coût par hectare selon le scénario demandé (moyen, pessimiste ou optimiste. Par défaut : moyen).\n" | |
""" | |
print(prompt) | |
#prompt += "Le scénario choisi est le moyen." | |
return prompt | |
# Structure of the data sent by the form | |
InputForm = typing_extensions.TypedDict('InputForm', { | |
'department': str, | |
'farmSize': float, | |
'benefitsFromCommonAgriculturalPolicy': bool, | |
'cultures': list[str], | |
'yields': dict[str, float] | |
}) | |
# This function is the API endpoint the web app will use | |
def find_my_rotation(department: str, farmSize: float, benefitsFromCommonAgriculturalPolicy: bool, cultures: list[str], yields: dict[str, float]): | |
department_name = departments_list.get(department) | |
dpt_yield = pd.read_csv(f'data/departments/{department_name}.csv') | |
yield_text = '' | |
for i, row in dpt_yield.iterrows(): | |
yield_text += f"Dans le département de {department_name}, la production de {row['Culture'].split('-')[1]} est de {row['mean']} en moyenne par hectare, de {row['pessimistic']} par hectare avec un scenario pessimiste et de {row['optimistic']} par hectare avec un scenario optimiste. " | |
# Create the prompt | |
index.insert(Document(text=yield_text)) | |
prompt = create_prompt(farmSize, cultures) | |
# Question the model | |
response = query_engine.query(prompt) | |
#prompt = 'Traduis cette réponse en français: ' + response.response | |
#response = query_engine.query(prompt) | |
return response | |
def get_documents_in_db(): | |
print("Fetching documents in DB") | |
docs = [] | |
for item in chroma_collection.get(include=["metadatas"])["metadatas"]: | |
try: | |
docs.append(json.loads(item["_node_content"])["metadata"]["file_name"]) | |
except: | |
pass | |
docs = list(set(docs)) | |
print(f"Found {len(docs)} documents") | |
out = "**List of files in db:**\n" | |
for d in docs: | |
out += " - " + d + "\n" | |
return out | |
def empty_db(): | |
ids = chroma_collection.get()["ids"] | |
chroma_collection.delete(ids) | |
return get_documents_in_db() | |
def load_file(file): | |
documents = loader.load_data(file=file) | |
for doc in documents: | |
index.insert(doc) | |
return ( | |
gr.Textbox(visible=False), | |
gr.Textbox(value=f"Document encoded ! You can ask questions", visible=True), | |
get_documents_in_db(), | |
) | |
def load_local_data(data_folder): | |
for file in os.listdir(data_folder): | |
if file.endswith('.pdf'): | |
print('Adding file ' + file + ' to DB') | |
documents = loader.load_data(file= data_folder + file) | |
for doc in documents: | |
index.insert(doc) | |
if file.endswith('.txt'): | |
print('Adding file ' + file + ' to DB') | |
with open(data_folder + file, 'r') as f: | |
file_ = f.read() | |
index.insert(Document(text=file_)) | |
if file=='price_by_crop.csv': | |
print('Adding file ' + file + ' to DB') | |
prices_text = 'The price of some agricultural data is given by this csv: It displays three scenario, a mean, an optimistic, and a pessimistic' + str(pd.read_csv(data_folder + file)) | |
index.insert(Document(text=prices_text)) | |
if file=='data_cout_production_grandes_cultures_2021_2025.xlsx': | |
production_costs = "" | |
for _, row in pd.read_excel(data_folder + file).iterrows(): | |
if row['ANNEE']==2024: | |
production_costs += f"Le coût de production par tonne en moyenne pour {row['CULTURES']} était {row['MOYENNE']} euros par tonne avec un scénario moyen, {row['QUART INFERIEUR']} pour un scénario optimiste, et {row['QUART SUPERIEUR']} pour un scénario pessimiste. \n" | |
print('Adding file ' + file + ' to DB') | |
index.insert(Document(text=production_costs)) | |
def load_document(input_file): | |
file_name = input_file.name.split("/")[-1] | |
return gr.Textbox(value=f"Document loaded: {file_name}", visible=True) | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" # Welcome to Gaia Level 3 Demo | |
Add a file before interacting with the Chat. | |
This demo allows you to interact with a pdf file and then ask questions to Mistral APIs. | |
Mistral will answer with the context extracted from your uploaded file. | |
*The files will stay in the database unless there is 48h of inactivty or you re-build the space.* | |
""" | |
) | |
gr.Markdown(""" ### 1 / Extract data from PDF """) | |
with gr.Row(): | |
with gr.Column(): | |
input_file = gr.File( | |
label="Load a pdf", | |
file_types=[".pdf"], | |
file_count="single", | |
type="filepath", | |
interactive=True, | |
) | |
file_msg = gr.Textbox( | |
label="Loaded documents:", container=False, visible=False | |
) | |
input_file.upload( | |
fn=load_document, | |
inputs=[ | |
input_file, | |
], | |
outputs=[file_msg], | |
concurrency_limit=20, | |
) | |
load_local_data('data/') | |
load_local_data('data/pdf/') | |
help_msg = gr.Markdown( | |
value="Once the document is loaded, press the Encode button below to add it to the db." | |
) | |
file_btn = gr.Button(value="Encode file ✅", interactive=True) | |
btn_msg = gr.Textbox(container=False, visible=False) | |
with gr.Row(): | |
db_list = gr.Markdown(value=get_documents_in_db) | |
delete_btn = gr.Button(value="Empty db 🗑️", interactive=True, scale=0) | |
file_btn.click( | |
load_file, | |
inputs=[input_file], | |
outputs=[file_msg, btn_msg, db_list], | |
show_progress="full", | |
) | |
delete_btn.click(empty_db, outputs=[db_list], show_progress="minimal") | |
gr.Markdown(""" ### 2 / Ask a question about this context """) | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox(placeholder=placeholder) | |
clear = gr.ClearButton([msg, chatbot]) | |
def respond(message, chat_history): | |
response = query_engine.query(message) | |
chat_history.append((message, str(response))) | |
return chat_history | |
msg.submit(respond, [msg, chatbot], [chatbot]) | |
# Terrible terrible terrible way of handling this | |
# but we don't have much time left | |
invisible_output = gr.Textbox(visible=True) | |
invisible_btn = gr.Button(visible=False) | |
invisible_btn.click( | |
find_my_rotation, | |
inputs=[gr.Textbox(), gr.Number(), gr.Checkbox(), gr.List(), gr.List()], | |
outputs=[invisible_output] | |
) | |
demo.title = title | |
demo.launch() |