import streamlit as st import pandas as pd from appStore.prep_data import process_giz_worldwide, remove_duplicates, get_max_end_year from appStore.prep_utils import create_documents, get_client from appStore.embed import hybrid_embed_chunks from appStore.search import hybrid_search from appStore.region_utils import load_region_data, get_country_name, get_regions from appStore.tfidf_extraction import extract_top_keywords from torch import cuda import json from datetime import datetime # get the device to be used eithe gpu or cpu device = 'cuda' if cuda.is_available() else 'cpu' st.set_page_config(page_title="SEARCH IATI",layout='wide') st.title("GIZ Project Database (PROTOTYPE)") var = st.text_input("Enter Search Query") # Load the region lookup CSV region_lookup_path = "docStore/regions_lookup.csv" region_df = load_region_data(region_lookup_path) #################### Create the embeddings collection and save ###################### # the steps below need to be performed only once and then commented out any unnecssary compute over-run ##### First we process and create the chunks for relvant data source #chunks = process_giz_worldwide() ##### Convert to langchain documents #temp_doc = create_documents(chunks,'chunks') ##### Embed and store docs, check if collection exist then you need to update the collection collection_name = "giz_worldwide" #hybrid_embed_chunks(docs= temp_doc, collection_name = collection_name) ################### Hybrid Search ###################################################### client = get_client() print(client.get_collections()) # Get the maximum end_year across the entire collection max_end_year = get_max_end_year(client, collection_name) # Get all unique sub-regions _, unique_sub_regions = get_regions(region_df) # Fetch unique country codes and map to country names @st.cache_data def get_country_name_and_region_mapping(_client, collection_name, region_df): results = hybrid_search(_client, "", collection_name) country_set = set() for res in results[0] + results[1]: countries = res.payload.get('metadata', {}).get('countries', "[]") try: country_list = json.loads(countries.replace("'", '"')) # Only add codes of length 2 two_digit_codes = [code.upper() for code in country_list if len(code) == 2] country_set.update(two_digit_codes) except json.JSONDecodeError: pass # Create a mapping of {CountryName -> ISO2Code} and {ISO2Code -> SubRegion} country_name_to_code = {} iso_code_to_sub_region = {} for code in country_set: name = get_country_name(code, region_df) sub_region_row = region_df[region_df['alpha-2'] == code] sub_region = sub_region_row['sub-region'].values[0] if not sub_region_row.empty else "Not allocated" country_name_to_code[name] = code iso_code_to_sub_region[code] = sub_region return country_name_to_code, iso_code_to_sub_region # Get country name and region mappings client = get_client() country_name_mapping, iso_code_to_sub_region = get_country_name_and_region_mapping(client, collection_name, region_df) unique_country_names = sorted(country_name_mapping.keys()) # List of country names # Layout filters in columns col1, col2, col3, col4 = st.columns([1, 1, 1, 4]) # Region filter with col1: region_filter = st.selectbox("Region", ["All/Not allocated"] + sorted(unique_sub_regions)) # Display region names # Dynamically filter countries based on selected region if region_filter == "All/Not allocated": filtered_country_names = unique_country_names # Show all countries if no region is selected else: filtered_country_names = [ name for name, code in country_name_mapping.items() if iso_code_to_sub_region.get(code) == region_filter ] # Country filter with col2: country_filter = st.selectbox("Country", ["All/Not allocated"] + filtered_country_names) # Display filtered country names # Year range slider with col3: current_year = datetime.now().year default_start_year = current_year - 5 # 3) The max_value is now the actual max end_year from your collection end_year_range = st.slider( "Project End Year", min_value=2010, max_value=max_end_year, value=(default_start_year, max_end_year), ) # Checkbox to control whether to show only exact matches show_exact_matches = st.checkbox("Show only exact matches", value=False) button = st.button("Refresh Search") def filter_results(results, country_filter, region_filter, end_year_range): filtered = [] for r in results: metadata = r.payload.get('metadata', {}) countries = metadata.get('countries', "[]") end_year_val = float(metadata.get('end_year', 0)) # Convert countries to a list try: c_list = json.loads(countries.replace("'", '"')) c_list = [code.upper() for code in c_list if len(code) == 2] except json.JSONDecodeError: c_list = [] # Translate selected country name to iso2 selected_iso_code = country_name_mapping.get(country_filter, None) # Check if any country in the metadata matches the selected region if region_filter != "All/Not allocated": countries_in_region = [code for code in c_list if iso_code_to_sub_region.get(code) == region_filter] else: countries_in_region = c_list # Filtering if ( (country_filter == "All/Not allocated" or selected_iso_code in c_list) and (region_filter == "All/Not allocated" or countries_in_region) and (end_year_range[0] <= end_year_val <= end_year_range[1]) ): filtered.append(r) return filtered if button: # 1) Use a bigger limit so we get more than 15 results results = hybrid_search(client, var, collection_name, limit=500) # e.g., 100 or 200 # results is a tuple: (semantic_results, lexical_results) semantic_all = results[0] lexical_all = results[1] # 2) Filter out content < 20 chars (as intermediate fix to problem that e.g. super short paragraphs with few chars get high similarity score) semantic_all = [ r for r in semantic_all if len(r.payload["page_content"]) >= 70 ] lexical_all = [ r for r in lexical_all if len(r.payload["page_content"]) >= 70 ] # 2) Apply a threshold to SEMANTIC results (score >= 0.3) semantic_thresholded = [r for r in semantic_all if r.score >= 0.4] # 2) Filter the entire sets filtered_semantic = filter_results(semantic_thresholded, country_filter, region_filter, end_year_range) filtered_lexical = filter_results(lexical_all, country_filter, region_filter, end_year_range) filtered_semantic_no_dupe = remove_duplicates(filtered_semantic) filtered_lexical_no_dupe = remove_duplicates(filtered_lexical) # 3) Now we take the top 15 *after* filtering # Check user preference if show_exact_matches: # 1) Display heading st.write(f"Showing **Top 15 Lexical Search results** for query: {var}") # 2) Do a simple substring check (case-insensitive) # We'll create a new list lexical_substring_filtered query_substring = var.strip().lower() lexical_substring_filtered = [] for r in lexical_all: # page_content in lowercase page_text_lower = r.payload["page_content"].lower() # Keep this result only if the query substring is found if query_substring in page_text_lower: lexical_substring_filtered.append(r) # 3) Now apply your region/country/year filter on that new list filtered_lexical = filter_results( lexical_substring_filtered, country_filter, region_filter, end_year_range ) # 4) Remove duplicates filtered_lexical_no_dupe = remove_duplicates(filtered_lexical) # 5) If empty after substring + filters + dedupe, show a custom message if not filtered_lexical_no_dupe: st.write('No exact matches, consider unchecking "Show only exact matches"') else: # 6) Display the first 15 matching results for res in filtered_lexical_no_dupe[:15]: project_name = res.payload['metadata'].get('project_name', 'Project Link') url = res.payload['metadata'].get('url', '#') st.markdown(f"#### [{project_name}]({url})") # Snippet logic (80 words) full_text = res.payload['page_content'] words = full_text.split() preview_word_count = 80 preview_text = " ".join(words[:preview_word_count]) remainder_text = " ".join(words[preview_word_count:]) st.write(preview_text + ("..." if remainder_text else "")) # Keywords top_keywords = extract_top_keywords(full_text, top_n=5) if top_keywords: st.markdown(f"_{' · '.join(top_keywords)}_") # Metadata metadata = res.payload.get('metadata', {}) countries = metadata.get('countries', "[]") client_name = metadata.get('client', 'Unknown Client') start_year = metadata.get('start_year', None) end_year_ = metadata.get('end_year', None) try: c_list = json.loads(countries.replace("'", '"')) except json.JSONDecodeError: c_list = [] # Only keep country names if the region lookup (get_country_name) # returns something different than the raw code. matched_countries = [] for code in c_list: if len(code) == 2: resolved_name = get_country_name(code.upper(), region_df) # If get_country_name didn't find a match, # it typically just returns the same code (like "XX"). # We'll consider "successfully looked up" if # resolved_name != code.upper(). if resolved_name.upper() != code.upper(): matched_countries.append(resolved_name) # Format the year range start_year_str = f"{int(round(float(start_year)))}" if start_year else "Unknown" end_year_str = f"{int(round(float(end_year_)))}" if end_year_ else "Unknown" # Build the final string if matched_countries: # We have at least 1 valid country name additional_text = ( f"**{', '.join(matched_countries)}**, commissioned by **{client_name}**, " f"**{start_year_str}-{end_year_str}**" ) else: # No valid countries found additional_text = ( f"Commissioned by **{client_name}**, **{start_year_str}-{end_year_str}**" ) st.markdown(additional_text) st.divider() else: st.write(f"Showing **Top 15 Semantic Search results** for query: {var}") if not filtered_semantic_no_dupe: st.write("No relevant results found.") else: # Show the top 15 from filtered_semantic for res in filtered_semantic_no_dupe[:15]: project_name = res.payload['metadata'].get('project_name', 'Project Link') url = res.payload['metadata'].get('url', '#') st.markdown(f"#### [{project_name}]({url})") # Snippet logic full_text = res.payload['page_content'] words = full_text.split() preview_word_count = 80 preview_text = " ".join(words[:preview_word_count]) remainder_text = " ".join(words[preview_word_count:]) st.write(preview_text + ("..." if remainder_text else "")) # Keywords top_keywords = extract_top_keywords(full_text, top_n=5) if top_keywords: st.markdown(f"_{' · '.join(top_keywords)}_") # Metadata metadata = res.payload.get('metadata', {}) countries = metadata.get('countries', "[]") client_name = metadata.get('client', 'Unknown Client') start_year = metadata.get('start_year', None) end_year_ = metadata.get('end_year', None) try: c_list = json.loads(countries.replace("'", '"')) except json.JSONDecodeError: c_list = [] # Only keep country names if the region lookup (get_country_name) # returns something different than the raw code. matched_countries = [] for code in c_list: if len(code) == 2: resolved_name = get_country_name(code.upper(), region_df) # If get_country_name didn't find a match, # it typically just returns the same code (like "XX"). # We'll consider "successfully looked up" if # resolved_name != code.upper(). if resolved_name.upper() != code.upper(): matched_countries.append(resolved_name) # Format the year range start_year_str = f"{int(round(float(start_year)))}" if start_year else "Unknown" end_year_str = f"{int(round(float(end_year_)))}" if end_year_ else "Unknown" # Build the final string if matched_countries: # We have at least 1 valid country name additional_text = ( f"**{', '.join(matched_countries)}**, commissioned by **{client_name}**, " f"**{start_year_str}-{end_year_str}**" ) else: # No valid countries found additional_text = ( f"Commissioned by **{client_name}**, **{start_year_str}-{end_year_str}**" ) st.markdown(additional_text) st.divider() # for i in results: # st.subheader(str(i.metadata['id'])+":"+str(i.metadata['title_main'])) # st.caption(f"Status:{str(i.metadata['status'])}, Country:{str(i.metadata['country_name'])}") # st.write(i.page_content) # st.divider()