Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import requests | |
import json | |
# Define your API key and endpoint | |
api_key = 'AIzaSyAQ4dXlOkF8rPC21f6omTS4p6v-uJ2vVIg' | |
url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent" | |
headers = {'Content-Type': 'application/json'} | |
# Cache the sentiment analysis function to improve performance | |
def analyze_sentiment(text): | |
""" | |
Analyze the sentiment of the given text using the Gemini API. | |
""" | |
system_prompt = """ | |
You are a Sentiment Analysis Tool (SEA). Analyze the following comments and classify the sentiment of each as positive, neutral, or negative. | |
Return the results in the following format: | |
Comment: <comment> | |
Sentiment: <sentiment> | |
--- | |
Additionally, provide actionable insights into customer satisfaction trends in the following format: | |
### Suggestions for Improvement: | |
- <suggestion 1> | |
- <suggestion 2> | |
""" | |
data = { | |
"contents": [{ | |
"parts": [{"text": f"{system_prompt}\n\n{text}"}] | |
}] | |
} | |
response = requests.post(url, headers=headers, json=data, params={'key': api_key}) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
st.error(f"Request failed with status code {response.status_code}: {response.text}") | |
return None | |
def read_file_content(file, file_type): | |
""" | |
Read the entire content of the file based on its type. | |
""" | |
if file_type == 'csv': | |
df = pd.read_csv(file) | |
text = ' '.join(df.apply(lambda x: ' '.join(x.dropna().astype(str)), axis=1)) | |
elif file_type == 'xlsx': | |
df = pd.read_excel(file) | |
text = ' '.join(df.apply(lambda x: ' '.join(x.dropna().astype(str)), axis=1)) | |
elif file_type == 'json': | |
df = pd.read_json(file) | |
text = ' '.join(df.apply(lambda x: ' '.join(x.dropna().astype(str)), axis=1)) | |
elif file_type == 'txt' or file_type == 'md': | |
text = file.read().decode('utf-8') | |
else: | |
st.error("Unsupported file type.") | |
return None | |
return text | |
def process_large_text(text, chunk_size=5000): | |
""" | |
Split large text into smaller chunks for processing. | |
""" | |
chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] | |
return chunks | |
def display_sentiment_results(response_text, file_name): | |
""" | |
Display sentiment analysis results for a single file. | |
""" | |
# Parse comments and sentiments | |
results = [] | |
lines = response_text.split('\n') | |
comment = None | |
sentiment = None | |
for line in lines: | |
if line.startswith("Comment:"): | |
comment = line.replace("Comment:", "").strip() | |
elif line.startswith("Sentiment:"): | |
sentiment = line.replace("Sentiment:", "").strip() | |
if comment and sentiment: | |
results.append((comment, sentiment)) | |
comment = None | |
sentiment = None | |
# Display results | |
st.write(f"### Sentiment Analysis Results for **{file_name}**") | |
df_results = pd.DataFrame(results, columns=['Comment/Prompt', 'Sentiment']) | |
st.dataframe(df_results) | |
# Sentiment distribution | |
sentiment_counts = df_results['Sentiment'].value_counts().reset_index() | |
sentiment_counts.columns = ['Sentiment', 'Count'] | |
with st.expander(f"View Sentiment Distribution for {file_name}"): | |
st.bar_chart(sentiment_counts.set_index('Sentiment')) | |
# Suggestions | |
suggestions = [] | |
current_section = None | |
for line in lines: | |
if line.startswith("### Suggestions for Improvement:"): | |
current_section = "Suggestions for Improvement" | |
elif current_section and line.startswith("- "): | |
suggestions.append(line.replace("- ", "").strip()) | |
if suggestions: | |
st.write("### Suggestions for Improvement") | |
for suggestion in suggestions: | |
st.write(f"- {suggestion}") | |
else: | |
st.warning("No suggestions available.") | |
# CSV download | |
output_file = f"sentiment_analysis_results_{file_name}.csv" | |
df_results.to_csv(output_file, index=False) | |
st.download_button( | |
label=f"Download Results for {file_name} as CSV", | |
data=open(output_file, 'rb').read(), | |
file_name=output_file, | |
mime='text/csv', | |
) | |
# Streamlit layout | |
st.set_page_config(page_title="Sentiment Analysis Tool", layout="wide") | |
st.title("Sentiment Analysis Tool (SEA) π¬") | |
st.write("Analyze customer feedback with sentiment classification and actionable insights.") | |
# Sidebar for instructions | |
with st.sidebar: | |
st.header("Instructions π") | |
st.write(""" | |
1. Upload one or more files containing customer feedback in the main area. | |
2. Analyze real-time feedback using the text input box. | |
3. Download sentiment analysis results as CSV files. | |
""") | |
st.write("---") | |
st.header("About") | |
st.write("This app uses the Gemini API for sentiment analysis and provides actionable insights.") | |
# Main layout with tabs | |
tab1, tab2 = st.tabs(["π File Analysis", "βοΈ Real-Time Feedback"]) | |
with tab1: | |
st.write("### Upload one or more files for batch sentiment analysis:") | |
uploaded_files = st.file_uploader("Choose files", type=["csv", "xlsx", "json", "txt", "md"], accept_multiple_files=True) | |
if uploaded_files: | |
for uploaded_file in uploaded_files: | |
file_type = uploaded_file.name.split('.')[-1] | |
with st.spinner(f"Processing {uploaded_file.name}..."): | |
# Read the entire file content | |
text = read_file_content(uploaded_file, file_type) | |
if text: | |
# Process large text in chunks if necessary | |
chunks = process_large_text(text) | |
combined_results = "" | |
for chunk in chunks: | |
sentiment_result = analyze_sentiment(chunk) | |
if sentiment_result: | |
response_text = sentiment_result.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', '').strip() | |
combined_results += response_text + "\n" | |
if combined_results: | |
display_sentiment_results(combined_results, uploaded_file.name) | |
else: | |
st.error(f"Sentiment analysis failed for {uploaded_file.name}.") | |
with tab2: | |
st.write("### Enter your feedback for real-time analysis:") | |
feedback_input = st.text_area("Enter your feedback:", placeholder="Type your feedback here...") | |
if st.button("Analyze Sentiment"): | |
if feedback_input.strip() == "": | |
st.warning("Please enter some feedback to analyze.") | |
else: | |
with st.spinner("Analyzing sentiment..."): | |
sentiment_result = analyze_sentiment(feedback_input) | |
if sentiment_result: | |
sentiment = sentiment_result.get('candidates', [{}])[0].get('content', {}).get('parts', [{}])[0].get('text', '').strip().lower() | |
if "positive" in sentiment: | |
st.success(f"Sentiment: **Positive** π") | |
elif "neutral" in sentiment: | |
st.info(f"Sentiment: **Neutral** π") | |
elif "negative" in sentiment: | |
st.error(f"Sentiment: **Negative** π ") | |
else: | |
st.warning(f"Sentiment: **Unknown** π€") | |
else: | |
st.error("Sentiment analysis failed.") |