annikwag's picture
Update app.py
efd387c verified
import streamlit as st
import pandas as pd
from appStore.prep_data import process_giz_worldwide, remove_duplicates, get_max_end_year
from appStore.prep_utils import create_documents, get_client
from appStore.embed import hybrid_embed_chunks
from appStore.search import hybrid_search
from appStore.region_utils import load_region_data, get_country_name, get_regions
from appStore.tfidf_extraction import extract_top_keywords
from torch import cuda
import json
from datetime import datetime
# get the device to be used eithe gpu or cpu
device = 'cuda' if cuda.is_available() else 'cpu'
st.set_page_config(page_title="SEARCH IATI",layout='wide')
st.title("GIZ Project Database (PROTOTYPE)")
var = st.text_input("Enter Search Query")
# Load the region lookup CSV
region_lookup_path = "docStore/regions_lookup.csv"
region_df = load_region_data(region_lookup_path)
#################### Create the embeddings collection and save ######################
# the steps below need to be performed only once and then commented out any unnecssary compute over-run
##### First we process and create the chunks for relvant data source
#chunks = process_giz_worldwide()
##### Convert to langchain documents
#temp_doc = create_documents(chunks,'chunks')
##### Embed and store docs, check if collection exist then you need to update the collection
collection_name = "giz_worldwide"
#hybrid_embed_chunks(docs= temp_doc, collection_name = collection_name)
################### Hybrid Search ######################################################
client = get_client()
print(client.get_collections())
# Get the maximum end_year across the entire collection
max_end_year = get_max_end_year(client, collection_name)
# Get all unique sub-regions
_, unique_sub_regions = get_regions(region_df)
# Fetch unique country codes and map to country names
@st.cache_data
def get_country_name_and_region_mapping(_client, collection_name, region_df):
results = hybrid_search(_client, "", collection_name)
country_set = set()
for res in results[0] + results[1]:
countries = res.payload.get('metadata', {}).get('countries', "[]")
try:
country_list = json.loads(countries.replace("'", '"'))
# Only add codes of length 2
two_digit_codes = [code.upper() for code in country_list if len(code) == 2]
country_set.update(two_digit_codes)
except json.JSONDecodeError:
pass
# Create a mapping of {CountryName -> ISO2Code} and {ISO2Code -> SubRegion}
country_name_to_code = {}
iso_code_to_sub_region = {}
for code in country_set:
name = get_country_name(code, region_df)
sub_region_row = region_df[region_df['alpha-2'] == code]
sub_region = sub_region_row['sub-region'].values[0] if not sub_region_row.empty else "Not allocated"
country_name_to_code[name] = code
iso_code_to_sub_region[code] = sub_region
return country_name_to_code, iso_code_to_sub_region
# Get country name and region mappings
client = get_client()
country_name_mapping, iso_code_to_sub_region = get_country_name_and_region_mapping(client, collection_name, region_df)
unique_country_names = sorted(country_name_mapping.keys()) # List of country names
# Layout filters in columns
col1, col2, col3, col4 = st.columns([1, 1, 1, 4])
# Region filter
with col1:
region_filter = st.selectbox("Region", ["All/Not allocated"] + sorted(unique_sub_regions)) # Display region names
# Dynamically filter countries based on selected region
if region_filter == "All/Not allocated":
filtered_country_names = unique_country_names # Show all countries if no region is selected
else:
filtered_country_names = [
name for name, code in country_name_mapping.items() if iso_code_to_sub_region.get(code) == region_filter
]
# Country filter
with col2:
country_filter = st.selectbox("Country", ["All/Not allocated"] + filtered_country_names) # Display filtered country names
# Year range slider
with col3:
current_year = datetime.now().year
default_start_year = current_year - 5
# 3) The max_value is now the actual max end_year from your collection
end_year_range = st.slider(
"Project End Year",
min_value=2010,
max_value=max_end_year,
value=(default_start_year, max_end_year),
)
# Checkbox to control whether to show only exact matches
show_exact_matches = st.checkbox("Show only exact matches", value=False)
button = st.button("Refresh Search")
def filter_results(results, country_filter, region_filter, end_year_range):
filtered = []
for r in results:
metadata = r.payload.get('metadata', {})
countries = metadata.get('countries', "[]")
end_year_val = float(metadata.get('end_year', 0))
# Convert countries to a list
try:
c_list = json.loads(countries.replace("'", '"'))
c_list = [code.upper() for code in c_list if len(code) == 2]
except json.JSONDecodeError:
c_list = []
# Translate selected country name to iso2
selected_iso_code = country_name_mapping.get(country_filter, None)
# Check if any country in the metadata matches the selected region
if region_filter != "All/Not allocated":
countries_in_region = [code for code in c_list if iso_code_to_sub_region.get(code) == region_filter]
else:
countries_in_region = c_list
# Filtering
if (
(country_filter == "All/Not allocated" or selected_iso_code in c_list)
and (region_filter == "All/Not allocated" or countries_in_region)
and (end_year_range[0] <= end_year_val <= end_year_range[1])
):
filtered.append(r)
return filtered
if button:
# 1) Use a bigger limit so we get more than 15 results
results = hybrid_search(client, var, collection_name, limit=500) # e.g., 100 or 200
# results is a tuple: (semantic_results, lexical_results)
semantic_all = results[0]
lexical_all = results[1]
# 2) Filter out content < 20 chars (as intermediate fix to problem that e.g. super short paragraphs with few chars get high similarity score)
semantic_all = [
r for r in semantic_all if len(r.payload["page_content"]) >= 70
]
lexical_all = [
r for r in lexical_all if len(r.payload["page_content"]) >= 70
]
# 2) Apply a threshold to SEMANTIC results (score >= 0.3)
semantic_thresholded = [r for r in semantic_all if r.score >= 0.4]
# 2) Filter the entire sets
filtered_semantic = filter_results(semantic_thresholded, country_filter, region_filter, end_year_range)
filtered_lexical = filter_results(lexical_all, country_filter, region_filter, end_year_range)
filtered_semantic_no_dupe = remove_duplicates(filtered_semantic)
filtered_lexical_no_dupe = remove_duplicates(filtered_lexical)
# 3) Now we take the top 15 *after* filtering
# Check user preference
if show_exact_matches:
# 1) Display heading
st.write(f"Showing **Top 15 Lexical Search results** for query: {var}")
# 2) Do a simple substring check (case-insensitive)
# We'll create a new list lexical_substring_filtered
query_substring = var.strip().lower()
lexical_substring_filtered = []
for r in lexical_all:
# page_content in lowercase
page_text_lower = r.payload["page_content"].lower()
# Keep this result only if the query substring is found
if query_substring in page_text_lower:
lexical_substring_filtered.append(r)
# 3) Now apply your region/country/year filter on that new list
filtered_lexical = filter_results(
lexical_substring_filtered, country_filter, region_filter, end_year_range
)
# 4) Remove duplicates
filtered_lexical_no_dupe = remove_duplicates(filtered_lexical)
# 5) If empty after substring + filters + dedupe, show a custom message
if not filtered_lexical_no_dupe:
st.write('No exact matches, consider unchecking "Show only exact matches"')
else:
# 6) Display the first 15 matching results
for res in filtered_lexical_no_dupe[:15]:
project_name = res.payload['metadata'].get('project_name', 'Project Link')
url = res.payload['metadata'].get('url', '#')
st.markdown(f"#### [{project_name}]({url})")
# Snippet logic (80 words)
full_text = res.payload['page_content']
words = full_text.split()
preview_word_count = 80
preview_text = " ".join(words[:preview_word_count])
remainder_text = " ".join(words[preview_word_count:])
st.write(preview_text + ("..." if remainder_text else ""))
# Keywords
top_keywords = extract_top_keywords(full_text, top_n=5)
if top_keywords:
st.markdown(f"_{' · '.join(top_keywords)}_")
# Metadata
metadata = res.payload.get('metadata', {})
countries = metadata.get('countries', "[]")
client_name = metadata.get('client', 'Unknown Client')
start_year = metadata.get('start_year', None)
end_year_ = metadata.get('end_year', None)
try:
c_list = json.loads(countries.replace("'", '"'))
except json.JSONDecodeError:
c_list = []
# Only keep country names if the region lookup (get_country_name)
# returns something different than the raw code.
matched_countries = []
for code in c_list:
if len(code) == 2:
resolved_name = get_country_name(code.upper(), region_df)
# If get_country_name didn't find a match,
# it typically just returns the same code (like "XX").
# We'll consider "successfully looked up" if
# resolved_name != code.upper().
if resolved_name.upper() != code.upper():
matched_countries.append(resolved_name)
# Format the year range
start_year_str = f"{int(round(float(start_year)))}" if start_year else "Unknown"
end_year_str = f"{int(round(float(end_year_)))}" if end_year_ else "Unknown"
# Build the final string
if matched_countries:
# We have at least 1 valid country name
additional_text = (
f"**{', '.join(matched_countries)}**, commissioned by **{client_name}**, "
f"**{start_year_str}-{end_year_str}**"
)
else:
# No valid countries found
additional_text = (
f"Commissioned by **{client_name}**, **{start_year_str}-{end_year_str}**"
)
st.markdown(additional_text)
st.divider()
else:
st.write(f"Showing **Top 15 Semantic Search results** for query: {var}")
if not filtered_semantic_no_dupe:
st.write("No relevant results found.")
else:
# Show the top 15 from filtered_semantic
for res in filtered_semantic_no_dupe[:15]:
project_name = res.payload['metadata'].get('project_name', 'Project Link')
url = res.payload['metadata'].get('url', '#')
st.markdown(f"#### [{project_name}]({url})")
# Snippet logic
full_text = res.payload['page_content']
words = full_text.split()
preview_word_count = 80
preview_text = " ".join(words[:preview_word_count])
remainder_text = " ".join(words[preview_word_count:])
st.write(preview_text + ("..." if remainder_text else ""))
# Keywords
top_keywords = extract_top_keywords(full_text, top_n=5)
if top_keywords:
st.markdown(f"_{' · '.join(top_keywords)}_")
# Metadata
metadata = res.payload.get('metadata', {})
countries = metadata.get('countries', "[]")
client_name = metadata.get('client', 'Unknown Client')
start_year = metadata.get('start_year', None)
end_year_ = metadata.get('end_year', None)
try:
c_list = json.loads(countries.replace("'", '"'))
except json.JSONDecodeError:
c_list = []
# Only keep country names if the region lookup (get_country_name)
# returns something different than the raw code.
matched_countries = []
for code in c_list:
if len(code) == 2:
resolved_name = get_country_name(code.upper(), region_df)
# If get_country_name didn't find a match,
# it typically just returns the same code (like "XX").
# We'll consider "successfully looked up" if
# resolved_name != code.upper().
if resolved_name.upper() != code.upper():
matched_countries.append(resolved_name)
# Format the year range
start_year_str = f"{int(round(float(start_year)))}" if start_year else "Unknown"
end_year_str = f"{int(round(float(end_year_)))}" if end_year_ else "Unknown"
# Build the final string
if matched_countries:
# We have at least 1 valid country name
additional_text = (
f"**{', '.join(matched_countries)}**, commissioned by **{client_name}**, "
f"**{start_year_str}-{end_year_str}**"
)
else:
# No valid countries found
additional_text = (
f"Commissioned by **{client_name}**, **{start_year_str}-{end_year_str}**"
)
st.markdown(additional_text)
st.divider()
# for i in results:
# st.subheader(str(i.metadata['id'])+":"+str(i.metadata['title_main']))
# st.caption(f"Status:{str(i.metadata['status'])}, Country:{str(i.metadata['country_name'])}")
# st.write(i.page_content)
# st.divider()