import os, json, re, logging, asyncio, time, requests, pandas as pd, streamlit as st, openai, boto3, uuid, aiohttp, urllib3, random, html, smtplib from datetime import datetime, timedelta from dotenv import load_dotenv from bs4 import BeautifulSoup from googlesearch import search as google_search from fake_useragent import UserAgent from sqlalchemy import func, create_engine, Column, BigInteger, Text, DateTime, ForeignKey, Boolean, JSON, select, text, distinct, and_ from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Session, joinedload from sqlalchemy.exc import SQLAlchemyError from botocore.exceptions import ClientError from tenacity import retry, stop_after_attempt, wait_random_exponential, wait_fixed from email_validator import validate_email, EmailNotValidError from streamlit_option_menu import option_menu from openai import OpenAI from typing import List, Optional from urllib.parse import urlparse, urlencode from streamlit_tags import st_tags import plotly.express as px from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from contextlib import contextmanager #database info DB_HOST = os.getenv("SUPABASE_DB_HOST") DB_NAME = os.getenv("SUPABASE_DB_NAME") DB_USER = os.getenv("SUPABASE_DB_USER") DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD") DB_PORT = os.getenv("SUPABASE_DB_PORT") DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) load_dotenv() DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT = map(os.getenv, ["SUPABASE_DB_HOST", "SUPABASE_DB_NAME", "SUPABASE_DB_USER", "SUPABASE_DB_PASSWORD", "SUPABASE_DB_PORT"]) DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" engine = create_engine(DATABASE_URL, pool_size=20, max_overflow=0) SessionLocal, Base = sessionmaker(bind=engine), declarative_base() if not all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT]): raise ValueError("One or more required database environment variables are not set") class Project(Base): __tablename__ = 'projects' id = Column(BigInteger, primary_key=True) project_name = Column(Text, default="Default Project") created_at = Column(DateTime(timezone=True), server_default=func.now()) campaigns = relationship("Campaign", back_populates="project") knowledge_base = relationship("KnowledgeBase", back_populates="project", uselist=False) class Campaign(Base): __tablename__ = 'campaigns' id = Column(BigInteger, primary_key=True) campaign_name = Column(Text, default="Default Campaign") campaign_type = Column(Text, default="Email") project_id = Column(BigInteger, ForeignKey('projects.id'), default=1) created_at = Column(DateTime(timezone=True), server_default=func.now()) auto_send = Column(Boolean, default=False) loop_automation = Column(Boolean, default=False) ai_customization = Column(Boolean, default=False) max_emails_per_group = Column(BigInteger, default=500) loop_interval = Column(BigInteger, default=60) project = relationship("Project", back_populates="campaigns") email_campaigns = relationship("EmailCampaign", back_populates="campaign") search_terms = relationship("SearchTerm", back_populates="campaign") campaign_leads = relationship("CampaignLead", back_populates="campaign") class CampaignLead(Base): __tablename__ = 'campaign_leads' id = Column(BigInteger, primary_key=True) campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) lead_id = Column(BigInteger, ForeignKey('leads.id')) status = Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) lead = relationship("Lead", back_populates="campaign_leads") campaign = relationship("Campaign", back_populates="campaign_leads") class KnowledgeBase(Base): __tablename__ = 'knowledge_base' id = Column(BigInteger, primary_key=True) project_id = Column(BigInteger, ForeignKey('projects.id'), nullable=False) kb_name, kb_bio, kb_values, contact_name, contact_role, contact_email = [Column(Text) for _ in range(6)] company_description, company_mission, company_target_market, company_other = [Column(Text) for _ in range(4)] product_name, product_description, product_target_customer, product_other = [Column(Text) for _ in range(4)] other_context, example_email = Column(Text), Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) project = relationship("Project", back_populates="knowledge_base") def to_dict(self): return {attr: getattr(self, attr) for attr in ['kb_name', 'kb_bio', 'kb_values', 'contact_name', 'contact_role', 'contact_email', 'company_description', 'company_mission', 'company_target_market', 'company_other', 'product_name', 'product_description', 'product_target_customer', 'product_other', 'other_context', 'example_email']} # Update the Lead model to remove the domain attribute class Lead(Base): __tablename__ = 'leads' id = Column(BigInteger, primary_key=True) email = Column(Text, unique=True) phone, first_name, last_name, company, job_title = [Column(Text) for _ in range(5)] created_at = Column(DateTime(timezone=True), server_default=func.now()) # Remove the domain column campaign_leads = relationship("CampaignLead", back_populates="lead") lead_sources = relationship("LeadSource", back_populates="lead") email_campaigns = relationship("EmailCampaign", back_populates="lead") class EmailTemplate(Base): __tablename__ = 'email_templates' id = Column(BigInteger, primary_key=True) campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) template_name, subject, body_content = Column(Text), Column(Text), Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) is_ai_customizable = Column(Boolean, default=False) language = Column(Text, default='ES') # Add language column campaign = relationship("Campaign") email_campaigns = relationship("EmailCampaign", back_populates="template") class EmailCampaign(Base): __tablename__ = 'email_campaigns' id = Column(BigInteger, primary_key=True) campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) lead_id = Column(BigInteger, ForeignKey('leads.id')) template_id = Column(BigInteger, ForeignKey('email_templates.id')) customized_subject = Column(Text) customized_content = Column(Text) # Make sure this column exists original_subject = Column(Text) original_content = Column(Text) status = Column(Text) engagement_data = Column(JSON) message_id = Column(Text) tracking_id = Column(Text, unique=True) sent_at = Column(DateTime(timezone=True)) ai_customized = Column(Boolean, default=False) opened_at = Column(DateTime(timezone=True)) clicked_at = Column(DateTime(timezone=True)) open_count = Column(BigInteger, default=0) click_count = Column(BigInteger, default=0) campaign = relationship("Campaign", back_populates="email_campaigns") lead = relationship("Lead", back_populates="email_campaigns") template = relationship("EmailTemplate", back_populates="email_campaigns") class OptimizedSearchTerm(Base): __tablename__ = 'optimized_search_terms' id = Column(BigInteger, primary_key=True) original_term_id = Column(BigInteger, ForeignKey('search_terms.id')) term = Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) original_term = relationship("SearchTerm", back_populates="optimized_terms") class SearchTermEffectiveness(Base): __tablename__ = 'search_term_effectiveness' id = Column(BigInteger, primary_key=True) search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) total_results, valid_leads, irrelevant_leads, blogs_found, directories_found = [Column(BigInteger) for _ in range(5)] created_at = Column(DateTime(timezone=True), server_default=func.now()) search_term = relationship("SearchTerm", back_populates="effectiveness") class SearchTermGroup(Base): __tablename__ = 'search_term_groups' id = Column(BigInteger, primary_key=True) name, email_template, description = Column(Text), Column(Text), Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) search_terms = relationship("SearchTerm", back_populates="group") class SearchTerm(Base): __tablename__ = 'search_terms' id = Column(BigInteger, primary_key=True) group_id = Column(BigInteger, ForeignKey('search_term_groups.id')) campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) term, category = Column(Text), Column(Text) created_at = Column(DateTime(timezone=True), server_default=func.now()) language = Column(Text, default='ES') # Add language column group = relationship("SearchTermGroup", back_populates="search_terms") campaign = relationship("Campaign", back_populates="search_terms") optimized_terms = relationship("OptimizedSearchTerm", back_populates="original_term") lead_sources = relationship("LeadSource", back_populates="search_term") effectiveness = relationship("SearchTermEffectiveness", back_populates="search_term", uselist=False) class LeadSource(Base): __tablename__ = 'lead_sources' id = Column(BigInteger, primary_key=True) lead_id = Column(BigInteger, ForeignKey('leads.id')) search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) url, domain, page_title, meta_description, scrape_duration = [Column(Text) for _ in range(5)] meta_tags, phone_numbers, content, tags = [Column(Text) for _ in range(4)] http_status = Column(BigInteger) created_at = Column(DateTime(timezone=True), server_default=func.now()) lead = relationship("Lead", back_populates="lead_sources") search_term = relationship("SearchTerm", back_populates="lead_sources") class AIRequestLog(Base): __tablename__ = 'ai_request_logs' id = Column(BigInteger, primary_key=True) function_name, prompt, response, model_used = [Column(Text) for _ in range(4)] created_at = Column(DateTime(timezone=True), server_default=func.now()) lead_id = Column(BigInteger, ForeignKey('leads.id')) email_campaign_id = Column(BigInteger, ForeignKey('email_campaigns.id')) lead = relationship("Lead") email_campaign = relationship("EmailCampaign") class AutomationLog(Base): __tablename__ = 'automation_logs' id = Column(BigInteger, primary_key=True) campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) leads_gathered, emails_sent = Column(BigInteger), Column(BigInteger) start_time = Column(DateTime(timezone=True), server_default=func.now()) end_time = Column(DateTime(timezone=True)) status, logs = Column(Text), Column(JSON) campaign = relationship("Campaign") search_term = relationship("SearchTerm") # Replace the existing EmailSettings class with this unified Settings class class Settings(Base): __tablename__ = 'settings' id = Column(BigInteger, primary_key=True) name = Column(Text, nullable=False) setting_type = Column(Text, nullable=False) # 'general', 'email', etc. value = Column(JSON, nullable=False) created_at = Column(DateTime(timezone=True), server_default=func.now()) updated_at = Column(DateTime(timezone=True), onupdate=func.now()) class EmailSettings(Base): __tablename__ = 'email_settings' id = Column(BigInteger, primary_key=True) name = Column(Text, nullable=False) email = Column(Text, nullable=False) provider = Column(Text, nullable=False) smtp_server = Column(Text) smtp_port = Column(BigInteger) smtp_username = Column(Text) smtp_password = Column(Text) aws_access_key_id = Column(Text) aws_secret_access_key = Column(Text) aws_region = Column(Text) DATABASE_URL = os.environ.get("DATABASE_URL") if not DATABASE_URL: raise ValueError("DATABASE_URL not set") engine = create_engine(DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base.metadata.create_all(bind=engine) @contextmanager def db_session(): session = SessionLocal() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() def settings_page(): st.title("Settings") with db_session() as session: general_settings = session.query(Settings).filter_by(setting_type='general').first() or Settings(name='General Settings', setting_type='general', value={}) st.header("General Settings") with st.form("general_settings_form"): openai_api_key = st.text_input("OpenAI API Key", value=general_settings.value.get('openai_api_key', ''), type="password") openai_api_base = st.text_input("OpenAI API Base URL", value=general_settings.value.get('openai_api_base', 'https://api.openai.com/v1')) openai_model = st.text_input("OpenAI Model", value=general_settings.value.get('openai_model', 'gpt-4o-mini')) if st.form_submit_button("Save General Settings"): general_settings.value = {'openai_api_key': openai_api_key, 'openai_api_base': openai_api_base, 'openai_model': openai_model} session.add(general_settings) session.commit() st.success("General settings saved successfully!") st.header("Email Settings") email_settings = session.query(EmailSettings).all() for setting in email_settings: with st.expander(f"{setting.name} ({setting.email})"): st.write(f"Provider: {setting.provider}") st.write(f"{'SMTP Server: ' + setting.smtp_server if setting.provider == 'smtp' else 'AWS Region: ' + setting.aws_region}") if st.button(f"Delete {setting.name}", key=f"delete_{setting.id}"): session.delete(setting) session.commit() st.success(f"Deleted {setting.name}") st.rerun() edit_id = st.selectbox("Edit existing setting", ["New Setting"] + [f"{s.id}: {s.name}" for s in email_settings]) edit_setting = session.query(EmailSettings).get(int(edit_id.split(":")[0])) if edit_id != "New Setting" else None with st.form("email_setting_form"): name = st.text_input("Name", value=edit_setting.name if edit_setting else "", placeholder="e.g., Company Gmail") email = st.text_input("Email", value=edit_setting.email if edit_setting else "", placeholder="your.email@example.com") provider = st.selectbox("Provider", ["smtp", "ses"], index=0 if edit_setting and edit_setting.provider == "smtp" else 1) if provider == "smtp": smtp_server = st.text_input("SMTP Server", value=edit_setting.smtp_server if edit_setting else "", placeholder="smtp.gmail.com") smtp_port = st.number_input("SMTP Port", min_value=1, max_value=65535, value=edit_setting.smtp_port if edit_setting else 587) smtp_username = st.text_input("SMTP Username", value=edit_setting.smtp_username if edit_setting else "", placeholder="your.email@gmail.com") smtp_password = st.text_input("SMTP Password", type="password", value=edit_setting.smtp_password if edit_setting else "", placeholder="Your SMTP password") else: aws_access_key_id = st.text_input("AWS Access Key ID", value=edit_setting.aws_access_key_id if edit_setting else "", placeholder="AKIAIOSFODNN7EXAMPLE") aws_secret_access_key = st.text_input("AWS Secret Access Key", type="password", value=edit_setting.aws_secret_access_key if edit_setting else "", placeholder="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") aws_region = st.text_input("AWS Region", value=edit_setting.aws_region if edit_setting else "", placeholder="us-west-2") if st.form_submit_button("Save Email Setting"): setting_data = {k: v for k, v in locals().items() if k in ['name', 'email', 'provider', 'smtp_server', 'smtp_port', 'smtp_username', 'smtp_password', 'aws_access_key_id', 'aws_secret_access_key', 'aws_region'] and v is not None} try: if edit_setting: for k, v in setting_data.items(): setattr(edit_setting, k, v) else: new_setting = EmailSettings(**setting_data) session.add(new_setting) session.commit() st.success("Email setting saved successfully!") st.rerun() except Exception as e: st.error(f"Error saving email setting: {str(e)}") session.rollback() def send_email_ses(session, from_email, to_email, subject, body, charset='UTF-8', reply_to=None, ses_client=None): email_settings = session.query(EmailSettings).filter_by(email=from_email).first() if not email_settings: logging.error(f"No email settings found for {from_email}") return None, None tracking_id = str(uuid.uuid4()) tracking_pixel_url = f"https://autoclient-email-analytics.trigox.workers.dev/track?{urlencode({'id': tracking_id, 'type': 'open'})}" wrapped_body = wrap_email_body(body) tracked_body = wrapped_body.replace('', f'') soup = BeautifulSoup(tracked_body, 'html.parser') for a in soup.find_all('a', href=True): original_url = a['href'] tracked_url = f"https://autoclient-email-analytics.trigox.workers.dev/track?{urlencode({'id': tracking_id, 'type': 'click', 'url': original_url})}" a['href'] = tracked_url tracked_body = str(soup) try: test_response = requests.get(f"https://autoclient-email-analytics.trigox.workers.dev/test", timeout=5) if test_response.status_code != 200: logging.warning("Analytics worker is down. Using original URLs.") tracked_body = wrapped_body except requests.RequestException: logging.warning("Failed to reach analytics worker. Using original URLs.") tracked_body = wrapped_body try: if email_settings.provider == 'ses': if ses_client is None: aws_session = boto3.Session( aws_access_key_id=email_settings.aws_access_key_id, aws_secret_access_key=email_settings.aws_secret_access_key, region_name=email_settings.aws_region ) ses_client = aws_session.client('ses') response = ses_client.send_email( Source=from_email, Destination={'ToAddresses': [to_email]}, Message={ 'Subject': {'Data': subject, 'Charset': charset}, 'Body': {'Html': {'Data': tracked_body, 'Charset': charset}} }, ReplyToAddresses=[reply_to] if reply_to else [] ) return response, tracking_id elif email_settings.provider == 'smtp': msg = MIMEMultipart() msg['From'] = from_email msg['To'] = to_email msg['Subject'] = subject if reply_to: msg['Reply-To'] = reply_to msg.attach(MIMEText(tracked_body, 'html')) with smtplib.SMTP(email_settings.smtp_server, email_settings.smtp_port) as server: server.starttls() server.login(email_settings.smtp_username, email_settings.smtp_password) server.send_message(msg) return {'MessageId': f'smtp-{uuid.uuid4()}'}, tracking_id else: logging.error(f"Unknown email provider: {email_settings.provider}") return None, None except Exception as e: logging.error(f"Error sending email: {str(e)}") return None, None def save_email_campaign(session, lead_email, template_id, status, sent_at, subject, message_id, email_body): try: lead = session.query(Lead).filter_by(email=lead_email).first() if not lead: logging.error(f"Lead with email {lead_email} not found.") return new_campaign = EmailCampaign( lead_id=lead.id, template_id=template_id, status=status, sent_at=sent_at, customized_subject=subject or "No subject", message_id=message_id or f"unknown-{uuid.uuid4()}", customized_content=email_body or "No content", campaign_id=get_active_campaign_id(), tracking_id=str(uuid.uuid4()) ) session.add(new_campaign) session.commit() except Exception as e: logging.error(f"Error saving email campaign: {str(e)}") session.rollback() return None return new_campaign def update_log(log_container, message, level='info'): icon = {'info': '🔵', 'success': '🟢', 'warning': '🟠', 'error': '🔴', 'email_sent': '🟣'}.get(level, '⚪') log_entry = f"{icon} {message}" # Simple console logging without HTML print(f"{icon} {message.split('<')[0]}") # Only print the first part of the message before any HTML tags if 'log_entries' not in st.session_state: st.session_state.log_entries = [] # HTML-formatted log entry for Streamlit display html_log_entry = f"{icon} {message}" st.session_state.log_entries.append(html_log_entry) # Update the Streamlit display with all logs log_html = f"
{'
'.join(st.session_state.log_entries)}
" log_container.markdown(log_html, unsafe_allow_html=True) def optimize_search_term(search_term, language): if language == 'english': return f'"{search_term}" email OR contact OR "get in touch" site:.com' elif language == 'spanish': return f'"{search_term}" correo OR contacto OR "ponte en contacto" site:.es' return search_term def shuffle_keywords(term): words = term.split() random.shuffle(words) return ' '.join(words) def get_domain_from_url(url): return urlparse(url).netloc def is_valid_email(email): pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' return re.match(pattern, email) is not None def extract_emails_from_html(html_content): pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' return re.findall(pattern, html_content) def extract_info_from_page(soup): name = soup.find('meta', {'name': 'author'}) name = name['content'] if name else '' company = soup.find('meta', {'property': 'og:site_name'}) company = company['content'] if company else '' job_title = soup.find('meta', {'name': 'job_title'}) job_title = job_title['content'] if job_title else '' return name, company, job_title def manual_search(session, terms, num_results, ignore_previously_fetched=True, optimize_english=False, optimize_spanish=False, shuffle_keywords_option=False, language='ES', enable_email_sending=True, log_container=None, from_email=None, reply_to=None, email_template=None): ua, results, total_leads, domains_processed = UserAgent(), [], 0, set() processed_emails_per_domain = {} # Track processed emails per domain for original_term in terms: try: search_term_id = add_or_get_search_term(session, original_term, get_active_campaign_id()) search_term = shuffle_keywords(original_term) if shuffle_keywords_option else original_term search_term = optimize_search_term(search_term, 'english' if optimize_english else 'spanish') if optimize_english or optimize_spanish else search_term update_log(log_container, f"Searching for '{original_term}' (Used '{search_term}')") for url in google_search(search_term, num_results, lang=language): domain = get_domain_from_url(url) if ignore_previously_fetched and domain in domains_processed: update_log(log_container, f"Skipping Previously Fetched: {domain}", 'warning') continue update_log(log_container, f"Fetching: {url}") try: if not url.startswith(('http://', 'https://')): url = 'http://' + url response = requests.get(url, timeout=10, verify=False, headers={'User-Agent': ua.random}) response.raise_for_status() html_content, soup = response.text, BeautifulSoup(response.text, 'html.parser') # Extract all emails from the page emails = extract_emails_from_html(html_content) valid_emails = [email for email in emails if is_valid_email(email)] update_log(log_container, f"Found {len(valid_emails)} valid email(s) on {url}", 'success') if not valid_emails: continue # Extract page info once for all leads from this URL name, company, job_title = extract_info_from_page(soup) page_title = get_page_title(html_content) page_description = get_page_description(html_content) # Initialize set for this domain if not exists if domain not in processed_emails_per_domain: processed_emails_per_domain[domain] = set() # Process each valid email from this URL for email in valid_emails: # Skip if we've already processed this email for this domain if email in processed_emails_per_domain[domain]: continue processed_emails_per_domain[domain].add(email) lead = save_lead(session, email=email, first_name=name, company=company, job_title=job_title, url=url, search_term_id=search_term_id, created_at=datetime.utcnow()) if lead: total_leads += 1 results.append({ 'Email': email, 'URL': url, 'Lead Source': original_term, 'Title': page_title, 'Description': page_description, 'Tags': [], 'Name': name, 'Company': company, 'Job Title': job_title, 'Search Term ID': search_term_id }) update_log(log_container, f"Saved lead: {email}", 'success') if enable_email_sending: if not from_email or not email_template: update_log(log_container, "Email sending is enabled but from_email or email_template is not provided", 'error') continue template = session.query(EmailTemplate).filter_by(id=int(email_template.split(":")[0])).first() if not template: update_log(log_container, "Email template not found", 'error') continue wrapped_content = wrap_email_body(template.body_content) response, tracking_id = send_email_ses(session, from_email, email, template.subject, wrapped_content, reply_to=reply_to) if response: update_log(log_container, f"Sent email to: {email}", 'email_sent') save_email_campaign(session, email, template.id, 'Sent', datetime.utcnow(), template.subject, response['MessageId'], wrapped_content) else: update_log(log_container, f"Failed to send email to: {email}", 'error') save_email_campaign(session, email, template.id, 'Failed', datetime.utcnow(), template.subject, None, wrapped_content) # Add domain to processed list after processing all its emails domains_processed.add(domain) except requests.RequestException as e: update_log(log_container, f"Error processing URL {url}: {str(e)}", 'error') except Exception as e: update_log(log_container, f"Error processing term '{original_term}': {str(e)}", 'error') update_log(log_container, f"Total leads found: {total_leads}", 'info') return {"total_leads": total_leads, "results": results} def generate_or_adjust_email_template(prompt, kb_info=None, current_template=None): messages = [ {"role": "system", "content": "You are an AI assistant specializing in creating and refining email templates for marketing campaigns. Always respond with a JSON object containing 'subject' and 'body' keys. The 'body' should contain HTML formatted content suitable for insertion into an email body."}, {"role": "user", "content": f"""{'Adjust the following email template based on the given instructions:' if current_template else 'Create an email template based on the following prompt:'} {prompt} {'Current Template:' if current_template else 'Guidelines:'} {current_template if current_template else '1. Focus on benefits to the reader, address potential customer doubts, include clear CTAs, use a natural tone, and be concise.'} Respond with a JSON object containing 'subject' and 'body' keys. The 'body' should contain HTML formatted content suitable for insertion into an email body. Follow these guidelines: 1. Use proper HTML tags for structuring the email content (e.g.,

,

,

, etc.). 2. Include inline CSS for styling where appropriate. 3. Ensure the content is properly structured and easy to read. 4. Include a call-to-action button or link with appropriate styling. 5. Make the design responsive for various screen sizes. 6. Do not include , , or tags. Example structure: {{ "subject": "Your compelling subject line here", "body": "

Welcome!

Your email content here...

Call to Action" }}"""} ] if kb_info: messages.append({"role": "user", "content": f"Consider this knowledge base information: {json.dumps(kb_info)}"}) response = openai_chat_completion(messages, function_name="generate_or_adjust_email_template") if isinstance(response, str): try: return json.loads(response) except json.JSONDecodeError: return { "subject": "AI Generated Subject", "body": f"

{response}

" } elif isinstance(response, dict): return response else: return {"subject": "", "body": "

Failed to generate email content.

"} def fetch_leads_with_sources(session): try: query = session.query(Lead, func.string_agg(LeadSource.url, ', ').label('sources'), func.max(EmailCampaign.sent_at).label('last_contact'), func.string_agg(EmailCampaign.status, ', ').label('email_statuses')).outerjoin(LeadSource).outerjoin(EmailCampaign).group_by(Lead.id) return pd.DataFrame([{**{k: getattr(lead, k) for k in ['id', 'email', 'first_name', 'last_name', 'company', 'job_title', 'created_at']}, 'Source': sources, 'Last Contact': last_contact, 'Last Email Status': email_statuses.split(', ')[-1] if email_statuses else 'Not Contacted', 'Delete': False} for lead, sources, last_contact, email_statuses in query.all()]) except SQLAlchemyError as e: logging.error(f"Database error in fetch_leads_with_sources: {str(e)}") return pd.DataFrame() def fetch_search_terms_with_lead_count(session): query = session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')).join(LeadSource, SearchTerm.id == LeadSource.search_term_id).join(Lead, LeadSource.lead_id == Lead.id).outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id).group_by(SearchTerm.term) return pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) def add_search_term(session, term, campaign_id): try: new_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=datetime.utcnow()) session.add(new_term) session.commit() return new_term.id except SQLAlchemyError as e: session.rollback() logging.error(f"Error adding search term: {str(e)}") raise def update_search_term_group(session, group_id, updated_terms): try: # Get IDs of selected terms selected_term_ids = [int(term.split(':')[0]) for term in updated_terms] # Update all terms that should be in this group session.query(SearchTerm)\ .filter(SearchTerm.id.in_(selected_term_ids))\ .update({SearchTerm.group_id: group_id}, synchronize_session=False) # Remove group_id from terms that were unselected session.query(SearchTerm)\ .filter(SearchTerm.group_id == group_id)\ .filter(~SearchTerm.id.in_(selected_term_ids))\ .update({SearchTerm.group_id: None}, synchronize_session=False) session.commit() except Exception as e: session.rollback() logging.error(f"Error in update_search_term_group: {str(e)}") raise def add_new_search_term(session, new_term, campaign_id, group_for_new_term): try: group_id = None if group_for_new_term != "None": group_id = int(group_for_new_term.split(':')[0]) new_search_term = SearchTerm( term=new_term, campaign_id=campaign_id, group_id=group_id, created_at=datetime.utcnow() ) session.add(new_search_term) session.commit() except Exception as e: session.rollback() logging.error(f"Error adding search term: {str(e)}") raise def ai_group_search_terms(session, ungrouped_terms): existing_groups = session.query(SearchTermGroup).all() prompt = f"Categorize these search terms into existing groups or suggest new ones:\n{', '.join([term.term for term in ungrouped_terms])}\n\nExisting groups: {', '.join([group.name for group in existing_groups])}\n\nRespond with a JSON object: {{group_name: [term1, term2, ...]}}" messages = [{"role": "system", "content": "You're an AI that categorizes search terms for lead generation. Be concise and efficient."}, {"role": "user", "content": prompt}] response = openai_chat_completion(messages, function_name="ai_group_search_terms") return response if isinstance(response, dict) else {} def update_search_term_groups(session, grouped_terms): for group_name, terms in grouped_terms.items(): group = session.query(SearchTermGroup).filter_by(name=group_name).first() or SearchTermGroup(name=group_name) if not group.id: session.add(group); session.flush() for term in terms: search_term = session.query(SearchTerm).filter_by(term=term).first() if search_term: search_term.group_id = group.id session.commit() def create_search_term_group(session, group_name): try: session.add(SearchTermGroup(name=group_name)) session.commit() except Exception as e: session.rollback() logging.error(f"Error creating search term group: {str(e)}") def delete_search_term_group(session, group_id): try: group = session.query(SearchTermGroup).get(group_id) if group: session.query(SearchTerm).filter(SearchTerm.group_id == group_id).update({SearchTerm.group_id: None}) session.delete(group) session.commit() except Exception as e: session.rollback() logging.error(f"Error deleting search term group: {str(e)}") def ai_automation_loop(session, log_container, leads_container): automation_logs, total_search_terms, total_emails_sent = [], 0, 0 while st.session_state.get('automation_status', False): try: log_container.info("Starting automation cycle") kb_info = get_knowledge_base_info(session, get_active_project_id()) if not kb_info: log_container.warning("Knowledge Base not found. Skipping cycle.") time.sleep(3600) continue base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) st.subheader("Optimized Search Terms") st.write(", ".join(optimized_terms)) total_search_terms = len(optimized_terms) progress_bar = st.progress(0) for idx, term in enumerate(optimized_terms): results = manual_search(session, [term], 10, ignore_previously_fetched=True) new_leads = [] for res in results['results']: lead = save_lead(session, res['Email'], url=res['URL']) if lead: new_leads.append((lead.id, lead.email)) if new_leads: template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() if template: from_email = kb_info.get('contact_email') or 'hello@indosy.com' reply_to = kb_info.get('contact_email') or 'eugproductions@gmail.com' logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) automation_logs.extend(logs) total_emails_sent += sent_count leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) progress_bar.progress((idx + 1) / len(optimized_terms)) st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") time.sleep(3600) except Exception as e: log_container.error(f"Critical error in automation cycle: {str(e)}") time.sleep(300) log_container.info("Automation stopped") st.session_state.automation_logs = automation_logs st.session_state.total_leads_found = total_search_terms st.session_state.total_emails_sent = total_emails_sent def openai_chat_completion(messages, temperature=0.7, function_name=None, lead_id=None, email_campaign_id=None): with db_session() as session: general_settings = session.query(Settings).filter_by(setting_type='general').first() if not general_settings or 'openai_api_key' not in general_settings.value: st.error("OpenAI API key not set. Please configure it in the settings.") return None client = OpenAI(api_key=general_settings.value['openai_api_key']) model = general_settings.value.get('openai_model', "gpt-4o-mini") try: response = client.chat.completions.create( model=model, messages=messages, temperature=temperature ) result = response.choices[0].message.content with db_session() as session: log_ai_request(session, function_name, messages, result, lead_id, email_campaign_id, model) try: return json.loads(result) except json.JSONDecodeError: return result except Exception as e: st.error(f"Error in OpenAI API call: {str(e)}") with db_session() as session: log_ai_request(session, function_name, messages, str(e), lead_id, email_campaign_id, model) return None def log_ai_request(session, function_name, prompt, response, lead_id=None, email_campaign_id=None, model_used=None): session.add(AIRequestLog( function_name=function_name, prompt=json.dumps(prompt), response=json.dumps(response) if response else None, lead_id=lead_id, email_campaign_id=email_campaign_id, model_used=model_used )) session.commit() def save_lead(session, email, first_name=None, last_name=None, company=None, job_title=None, phone=None, url=None, search_term_id=None, created_at=None): try: existing_lead = session.query(Lead).filter_by(email=email).first() if existing_lead: for attr in ['first_name', 'last_name', 'company', 'job_title', 'phone', 'created_at']: if locals()[attr]: setattr(existing_lead, attr, locals()[attr]) lead = existing_lead else: lead = Lead(email=email, first_name=first_name, last_name=last_name, company=company, job_title=job_title, phone=phone, created_at=created_at or datetime.utcnow()) session.add(lead) session.flush() lead_source = LeadSource(lead_id=lead.id, url=url, search_term_id=search_term_id) session.add(lead_source) campaign_lead = CampaignLead(campaign_id=get_active_campaign_id(), lead_id=lead.id, status="Not Contacted", created_at=datetime.utcnow()) session.add(campaign_lead) session.commit() return lead except Exception as e: logging.error(f"Error saving lead: {str(e)}") session.rollback() return None def save_lead_source(session, lead_id, search_term_id, url, http_status, scrape_duration, page_title=None, meta_description=None, content=None, tags=None, phone_numbers=None): session.add(LeadSource(lead_id=lead_id, search_term_id=search_term_id, url=url, http_status=http_status, scrape_duration=scrape_duration, page_title=page_title or get_page_title(url), meta_description=meta_description or get_page_description(url), content=content or extract_visible_text(BeautifulSoup(requests.get(url).text, 'html.parser')), tags=tags, phone_numbers=phone_numbers)) session.commit() def get_page_title(url): try: response = requests.get(url, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') title = soup.title.string if soup.title else "No title found" return title.strip() except Exception as e: logging.error(f"Error getting page title for {url}: {str(e)}") return "Error fetching title" def extract_visible_text(soup): for script in soup(["script", "style"]): script.extract() text = soup.get_text() lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) return ' '.join(chunk for chunk in chunks if chunk) def log_search_term_effectiveness(session, term, total_results, valid_leads, blogs_found, directories_found): session.add(SearchTermEffectiveness(term=term, total_results=total_results, valid_leads=valid_leads, irrelevant_leads=total_results - valid_leads, blogs_found=blogs_found, directories_found=directories_found)) session.commit() get_active_project_id = lambda: st.session_state.get('active_project_id', 1) get_active_campaign_id = lambda: st.session_state.get('active_campaign_id', 1) set_active_project_id = lambda project_id: st.session_state.__setitem__('active_project_id', project_id) set_active_campaign_id = lambda campaign_id: st.session_state.__setitem__('active_campaign_id', campaign_id) def add_or_get_search_term(session, term, campaign_id, created_at=None): search_term = session.query(SearchTerm).filter_by(term=term, campaign_id=campaign_id).first() if not search_term: search_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=created_at or datetime.utcnow()) session.add(search_term) session.commit() session.refresh(search_term) return search_term.id def fetch_campaigns(session): return [f"{camp.id}: {camp.campaign_name}" for camp in session.query(Campaign).all()] def fetch_projects(session): return [f"{project.id}: {project.project_name}" for project in session.query(Project).all()] def fetch_email_templates(session): return [f"{t.id}: {t.template_name}" for t in session.query(EmailTemplate).all()] def create_or_update_email_template(session, template_name, subject, body_content, template_id=None, is_ai_customizable=False, created_at=None, language='ES'): template = session.query(EmailTemplate).filter_by(id=template_id).first() if template_id else EmailTemplate(template_name=template_name, subject=subject, body_content=body_content, is_ai_customizable=is_ai_customizable, campaign_id=get_active_campaign_id(), created_at=created_at or datetime.utcnow()) if template_id: template.template_name, template.subject, template.body_content, template.is_ai_customizable = template_name, subject, body_content, is_ai_customizable template.language = language session.add(template) session.commit() return template.id safe_datetime_compare = lambda date1, date2: False if date1 is None or date2 is None else date1 > date2 def fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted): try: query = session.query(Lead) if send_option == "Specific Email": query = query.filter(Lead.email == specific_email) elif send_option in ["Leads from Chosen Search Terms", "Leads from Search Term Groups"] and selected_terms: query = query.join(LeadSource).join(SearchTerm).filter(SearchTerm.term.in_(selected_terms)) if exclude_previously_contacted: subquery = session.query(EmailCampaign.lead_id).filter(EmailCampaign.sent_at.isnot(None)).subquery() query = query.outerjoin(subquery, Lead.id == subquery.c.lead_id).filter(subquery.c.lead_id.is_(None)) return [{"Email": lead.email, "ID": lead.id} for lead in query.all()] except Exception as e: logging.error(f"Error fetching leads: {str(e)}") return [] def update_display(container, items, title, item_key): container.markdown( f"""

{title} ({len(items)})

{"".join(f'
{item[item_key]}
' for item in items[-20:])}
""", unsafe_allow_html=True ) def get_domain_from_url(url): return urlparse(url).netloc def manual_search_page(): st.title("Manual Search") with db_session() as session: # Fetch recent searches within the session recent_searches = session.query(SearchTerm).order_by(SearchTerm.created_at.desc()).limit(5).all() # Materialize the terms within the session recent_search_terms = [term.term for term in recent_searches] email_templates = fetch_email_templates(session) email_settings = fetch_email_settings(session) col1, col2 = st.columns([2, 1]) with col1: search_terms = st_tags( label='Enter search terms:', text='Press enter to add more', value=recent_search_terms, suggestions=['software engineer', 'data scientist', 'product manager'], maxtags=10, key='search_terms_input' ) num_results = st.slider("Results per term", 1, 50000, 10) with col2: enable_email_sending = st.checkbox("Enable email sending", value=True) ignore_previously_fetched = st.checkbox("Ignore fetched domains", value=True) shuffle_keywords_option = st.checkbox("Shuffle Keywords", value=True) optimize_english = st.checkbox("Optimize (English)", value=False) optimize_spanish = st.checkbox("Optimize (Spanish)", value=False) language = st.selectbox("Select Language", options=["ES", "EN"], index=0) if enable_email_sending: if not email_templates: st.error("No email templates available. Please create a template first.") return if not email_settings: st.error("No email settings available. Please add email settings first.") return col3, col4 = st.columns(2) with col3: email_template = st.selectbox("Email template", options=email_templates, format_func=lambda x: x.split(":")[1].strip()) with col4: email_setting_option = st.selectbox("From Email", options=email_settings, format_func=lambda x: f"{x['name']} ({x['email']})") if email_setting_option: from_email = email_setting_option['email'] reply_to = st.text_input("Reply To", email_setting_option['email']) else: st.error("No email setting selected. Please select an email setting.") return if st.button("Search"): if not search_terms: return st.warning("Enter at least one search term.") progress_bar = st.progress(0) status_text = st.empty() email_status = st.empty() results = [] leads_container = st.empty() leads_found, emails_sent = [], [] # Create a single log container for all search terms log_container = st.empty() for i, term in enumerate(search_terms): status_text.text(f"Searching: '{term}' ({i+1}/{len(search_terms)})") with db_session() as session: term_results = manual_search(session, [term], num_results, ignore_previously_fetched, optimize_english, optimize_spanish, shuffle_keywords_option, language, enable_email_sending, log_container, from_email, reply_to, email_template) results.extend(term_results['results']) leads_found.extend([f"{res['Email']} - {res['Company']}" for res in term_results['results']]) if enable_email_sending: template = session.query(EmailTemplate).filter_by(id=int(email_template.split(":")[0])).first() for result in term_results['results']: if not result or 'Email' not in result or not is_valid_email(result['Email']): status_text.text(f"Skipping invalid result or email: {result.get('Email') if result else 'None'}") continue wrapped_content = wrap_email_body(template.body_content) response, tracking_id = send_email_ses(session, from_email, result['Email'], template.subject, wrapped_content, reply_to=reply_to) if response: save_email_campaign(session, result['Email'], template.id, 'sent', datetime.utcnow(), template.subject, response.get('MessageId', 'Unknown'), template.body_content) emails_sent.append(f"✅ {result['Email']}") status_text.text(f"Email sent to: {result['Email']}") else: save_email_campaign(session, result['Email'], template.id, 'failed', datetime.utcnow(), template.subject, None, template.body_content) emails_sent.append(f"❌ {result['Email']}") status_text.text(f"Failed to send email to: {result['Email']}") leads_container.dataframe(pd.DataFrame({"Leads Found": leads_found, "Emails Sent": emails_sent + [""] * (len(leads_found) - len(emails_sent))})) progress_bar.progress((i + 1) / len(search_terms)) # Display final results st.subheader("Search Results") st.dataframe(pd.DataFrame(results)) if enable_email_sending: st.subheader("Email Sending Results") success_rate = sum(1 for email in emails_sent if email.startswith("✅")) / len(emails_sent) if emails_sent else 0 st.metric("Email Sending Success Rate", f"{success_rate:.2%}") st.download_button( label="Download CSV", data=pd.DataFrame(results).to_csv(index=False).encode('utf-8'), file_name="search_results.csv", mime="text/csv", ) # Update other functions that might be accessing detached objects def fetch_search_terms_with_lead_count(session): query = (session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')) .join(LeadSource, SearchTerm.id == LeadSource.search_term_id) .join(Lead, LeadSource.lead_id == Lead.id) .outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) .group_by(SearchTerm.term)) df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) return df def ai_automation_loop(session, log_container, leads_container): automation_logs, total_search_terms, total_emails_sent = [], 0, 0 while st.session_state.get('automation_status', False): try: log_container.info("Starting automation cycle") kb_info = get_knowledge_base_info(session, get_active_project_id()) if not kb_info: log_container.warning("Knowledge Base not found. Skipping cycle.") time.sleep(3600) continue base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) st.subheader("Optimized Search Terms") st.write(", ".join(optimized_terms)) total_search_terms = len(optimized_terms) progress_bar = st.progress(0) for idx, term in enumerate(optimized_terms): results = manual_search(session, [term], 10, ignore_previously_fetched=True) new_leads = [] for res in results['results']: lead = save_lead(session, res['Email'], url=res['URL']) if lead: new_leads.append((lead.id, lead.email)) if new_leads: template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() if template: from_email = kb_info.get('contact_email') or 'hello@indosy.com' reply_to = kb_info.get('contact_email') or 'eugproductions@gmail.com' logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) automation_logs.extend(logs) total_emails_sent += sent_count leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) progress_bar.progress((idx + 1) / len(optimized_terms)) st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") time.sleep(3600) except Exception as e: log_container.error(f"Critical error in automation cycle: {str(e)}") time.sleep(300) log_container.info("Automation stopped") st.session_state.automation_logs = automation_logs st.session_state.total_leads_found = total_search_terms st.session_state.total_emails_sent = total_emails_sent # Make sure all database operations are performed within a session context def get_knowledge_base_info(session, project_id): kb_info = session.query(KnowledgeBase).filter_by(project_id=project_id).first() return kb_info.to_dict() if kb_info else None def shuffle_keywords(term): words = term.split() random.shuffle(words) return ' '.join(words) def get_page_description(html_content): soup = BeautifulSoup(html_content, 'html.parser') meta_desc = soup.find('meta', attrs={'name': 'description'}) return meta_desc['content'] if meta_desc else "No description found" def is_valid_email(email): if email is None: return False invalid_patterns = [ r".*\.(png|jpg|jpeg|gif|css|js)$", r"^(nr|bootstrap|jquery|core|icon-|noreply)@.*", r"^(email|info|contact|support|hello|hola|hi|salutations|greetings|inquiries|questions)@.*", r"^email@email\.com$", r".*@example\.com$", r".*@.*\.(png|jpg|jpeg|gif|css|js|jpga|PM|HL)$" ] typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] if any(re.match(pattern, email, re.IGNORECASE) for pattern in invalid_patterns): return False if any(email.lower().endswith(f"@{domain}") for domain in typo_domains): return False try: validate_email(email); return True except EmailNotValidError: return False def remove_invalid_leads(session): invalid_leads = session.query(Lead).filter( ~Lead.email.op('~')(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') | Lead.email.op('~')(r'.*\.(png|jpg|jpeg|gif|css|js)$') | Lead.email.op('~')(r'^(nr|bootstrap|jquery|core|icon-|noreply)@.*') | Lead.email == 'email@email.com' | Lead.email.like('%@example.com') | Lead.email.op('~')(r'.*@.*\.(png|jpg|jpeg|gif|css|js|jpga|PM|HL)$') | Lead.email.like('%@gmil.com') | Lead.email.like('%@gmal.com') | Lead.email.like('%@gmaill.com') | Lead.email.like('%@gnail.com') ).all() for lead in invalid_leads: session.query(LeadSource).filter(LeadSource.lead_id == lead.id).delete() session.delete(lead) session.commit() return len(invalid_leads) def perform_quick_scan(session): with st.spinner("Performing quick scan..."): terms = session.query(SearchTerm).order_by(func.random()).limit(3).all() email_setting = fetch_email_settings(session)[0] if fetch_email_settings(session) else None from_email = email_setting['email'] if email_setting else None reply_to = from_email email_template = session.query(EmailTemplate).first() res = manual_search(session, [term.term for term in terms], 10, True, False, False, True, "EN", True, st.empty(), from_email, reply_to, f"{email_template.id}: {email_template.template_name}" if email_template else None) st.success(f"Quick scan completed! Found {len(res['results'])} new leads.") return {"new_leads": len(res['results']), "terms_used": [term.term for term in terms]} def bulk_send_emails(session, template_id, from_email, reply_to, leads, progress_bar=None, status_text=None, results=None, log_container=None): template = session.query(EmailTemplate).filter_by(id=template_id).first() if not template: logging.error(f"Email template with ID {template_id} not found.") return [], 0 email_subject = template.subject email_content = template.body_content logs, sent_count = [], 0 total_leads = len(leads) for index, lead in enumerate(leads): try: validate_email(lead['Email']) response, tracking_id = send_email_ses(session, from_email, lead['Email'], email_subject, email_content, reply_to=reply_to) if response: status = 'sent' message_id = response.get('MessageId', f"sent-{uuid.uuid4()}") sent_count += 1 log_message = f"✅ Email sent to: {lead['Email']}" else: status = 'failed' message_id = f"failed-{uuid.uuid4()}" log_message = f"❌ Failed to send email to: {lead['Email']}" save_email_campaign(session, lead['Email'], template_id, status, datetime.utcnow(), email_subject, message_id, email_content) logs.append(log_message) if progress_bar: progress_bar.progress((index + 1) / total_leads) if status_text: status_text.text(f"Processed {index + 1}/{total_leads} leads") if results is not None: results.append({"Email": lead['Email'], "Status": status}) if log_container: log_container.text(log_message) except EmailNotValidError: log_message = f"❌ Invalid email address: {lead['Email']}" logs.append(log_message) except Exception as e: error_message = f"Error sending email to {lead['Email']}: {str(e)}" logging.error(error_message) save_email_campaign(session, lead['Email'], template_id, 'failed', datetime.utcnow(), email_subject, f"error-{uuid.uuid4()}", email_content) logs.append(f"❌ Error sending email to: {lead['Email']} (Error: {str(e)})") return logs, sent_count def view_campaign_logs(): st.header("Email Logs") with db_session() as session: logs = fetch_all_email_logs(session) if logs.empty: st.info("No email logs found.") else: st.write(f"Total emails sent: {len(logs)}") st.write(f"Success rate: {(logs['Status'] == 'sent').mean():.2%}") col1, col2 = st.columns(2) with col1: start_date = st.date_input("Start Date", value=logs['Sent At'].min().date()) with col2: end_date = st.date_input("End Date", value=logs['Sent At'].max().date()) filtered_logs = logs[(logs['Sent At'].dt.date >= start_date) & (logs['Sent At'].dt.date <= end_date)] search_term = st.text_input("Search by email or subject") if search_term: filtered_logs = filtered_logs[filtered_logs['Email'].str.contains(search_term, case=False) | filtered_logs['Subject'].str.contains(search_term, case=False)] col1, col2, col3 = st.columns(3) with col1: st.metric("Emails Sent", len(filtered_logs)) with col2: st.metric("Unique Recipients", filtered_logs['Email'].nunique()) with col3: st.metric("Success Rate", f"{(filtered_logs['Status'] == 'sent').mean():.2%}") daily_counts = filtered_logs.resample('D', on='Sent At')['Email'].count() st.bar_chart(daily_counts) st.subheader("Detailed Email Logs") for _, log in filtered_logs.iterrows(): with st.expander(f"{log['Sent At'].strftime('%Y-%m-%d %H:%M:%S')} - {log['Email']} - {log['Status']}"): st.write(f"**Subject:** {log['Subject']}") st.write(f"**Content Preview:** {log['Content'][:100]}...") if st.button("View Full Email", key=f"view_email_{log['ID']}"): st.components.v1.html(wrap_email_body(log['Content']), height=400, scrolling=True) if log['Status'] != 'sent': st.error(f"Status: {log['Status']}") logs_per_page = 20 total_pages = (len(filtered_logs) - 1) // logs_per_page + 1 page = st.number_input("Page", min_value=1, max_value=total_pages, value=1) start_idx = (page - 1) * logs_per_page end_idx = start_idx + logs_per_page st.table(filtered_logs.iloc[start_idx:end_idx][['Sent At', 'Email', 'Subject', 'Status']]) if st.button("Export Logs to CSV"): csv = filtered_logs.to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="email_logs.csv", mime="text/csv" ) def fetch_all_email_logs(session): try: email_campaigns = session.query(EmailCampaign).join(Lead).join(EmailTemplate).options(joinedload(EmailCampaign.lead), joinedload(EmailCampaign.template)).order_by(EmailCampaign.sent_at.desc()).all() return pd.DataFrame({ 'ID': [ec.id for ec in email_campaigns], 'Sent At': [ec.sent_at for ec in email_campaigns], 'Email': [ec.lead.email for ec in email_campaigns], 'Template': [ec.template.template_name for ec in email_campaigns], 'Subject': [ec.customized_subject or "No subject" for ec in email_campaigns], 'Content': [ec.customized_content or "No content" for ec in email_campaigns], 'Status': [ec.status for ec in email_campaigns], 'Message ID': [ec.message_id or "No message ID" for ec in email_campaigns], 'Campaign ID': [ec.campaign_id for ec in email_campaigns], 'Lead Name': [f"{ec.lead.first_name or ''} {ec.lead.last_name or ''}".strip() or "Unknown" for ec in email_campaigns], 'Lead Company': [ec.lead.company or "Unknown" for ec in email_campaigns] }) except SQLAlchemyError as e: logging.error(f"Database error in fetch_all_email_logs: {str(e)}") return pd.DataFrame() def update_lead(session, lead_id, updated_data): try: lead = session.query(Lead).filter(Lead.id == lead_id).first() if lead: for key, value in updated_data.items(): setattr(lead, key, value) return True except SQLAlchemyError as e: logging.error(f"Error updating lead {lead_id}: {str(e)}") session.rollback() return False def delete_lead(session, lead_id): try: lead = session.query(Lead).filter(Lead.id == lead_id).first() if lead: session.delete(lead) return True except SQLAlchemyError as e: logging.error(f"Error deleting lead {lead_id}: {str(e)}") session.rollback() return False def is_valid_email(email): try: validate_email(email) return True except EmailNotValidError: return False def view_leads_page(): st.title("Lead Management Dashboard") with db_session() as session: if 'leads' not in st.session_state or st.button("Refresh Leads"): st.session_state.leads = fetch_leads_with_sources(session) if not st.session_state.leads.empty: total_leads = len(st.session_state.leads) contacted_leads = len(st.session_state.leads[st.session_state.leads['Last Contact'].notna()]) conversion_rate = (st.session_state.leads['Last Email Status'] == 'sent').mean() st.columns(3)[0].metric("Total Leads", f"{total_leads:,}") st.columns(3)[1].metric("Contacted Leads", f"{contacted_leads:,}") st.columns(3)[2].metric("Conversion Rate", f"{conversion_rate:.2%}") st.subheader("Leads Table") search_term = st.text_input("Search leads by email, name, company, or source") filtered_leads = st.session_state.leads[st.session_state.leads.apply(lambda row: search_term.lower() in str(row).lower(), axis=1)] leads_per_page, page_number = 20, st.number_input("Page", min_value=1, value=1) start_idx, end_idx = (page_number - 1) * leads_per_page, page_number * leads_per_page edited_df = st.data_editor( filtered_leads.iloc[start_idx:end_idx], column_config={ "ID": st.column_config.NumberColumn("ID", disabled=True), "Email": st.column_config.TextColumn("Email"), "First Name": st.column_config.TextColumn("First Name"), "Last Name": st.column_config.TextColumn("Last Name"), "Company": st.column_config.TextColumn("Company"), "Job Title": st.column_config.TextColumn("Job Title"), "Source": st.column_config.TextColumn("Source", disabled=True), "Last Contact": st.column_config.DatetimeColumn("Last Contact", disabled=True), "Last Email Status": st.column_config.TextColumn("Last Email Status", disabled=True), "Delete": st.column_config.CheckboxColumn("Delete") }, disabled=["ID", "Source", "Last Contact", "Last Email Status"], hide_index=True, num_rows="dynamic" ) if st.button("Save Changes", type="primary"): for index, row in edited_df.iterrows(): if row['Delete']: if delete_lead_and_sources(session, row['ID']): st.success(f"Deleted lead: {row['Email']}") else: updated_data = {k: row[k] for k in ['Email', 'First Name', 'Last Name', 'Company', 'Job Title']} if update_lead(session, row['ID'], updated_data): st.success(f"Updated lead: {row['Email']}") st.rerun() st.download_button( "Export Filtered Leads to CSV", filtered_leads.to_csv(index=False).encode('utf-8'), "exported_leads.csv", "text/csv" ) st.subheader("Lead Growth") if 'Created At' in st.session_state.leads.columns: lead_growth = st.session_state.leads.groupby(pd.to_datetime(st.session_state.leads['Created At']).dt.to_period('M')).size().cumsum() st.line_chart(lead_growth) else: st.warning("Created At data is not available for lead growth chart.") st.subheader("Email Campaign Performance") email_status_counts = st.session_state.leads['Last Email Status'].value_counts() st.plotly_chart(px.pie( values=email_status_counts.values, names=email_status_counts.index, title="Distribution of Email Statuses" ), use_container_width=True) else: st.info("No leads available. Start by adding some leads to your campaigns.") def fetch_leads_with_sources(session): try: query = session.query(Lead, func.string_agg(LeadSource.url, ', ').label('sources'), func.max(EmailCampaign.sent_at).label('last_contact'), func.string_agg(EmailCampaign.status, ', ').label('email_statuses')).outerjoin(LeadSource).outerjoin(EmailCampaign).group_by(Lead.id) return pd.DataFrame([{**{k: getattr(lead, k) for k in ['id', 'email', 'first_name', 'last_name', 'company', 'job_title', 'created_at']}, 'Source': sources, 'Last Contact': last_contact, 'Last Email Status': email_statuses.split(', ')[-1] if email_statuses else 'Not Contacted', 'Delete': False} for lead, sources, last_contact, email_statuses in query.all()]) except SQLAlchemyError as e: logging.error(f"Database error in fetch_leads_with_sources: {str(e)}") return pd.DataFrame() def delete_lead_and_sources(session, lead_id): try: session.query(LeadSource).filter(LeadSource.lead_id == lead_id).delete() lead = session.query(Lead).filter(Lead.id == lead_id).first() if lead: session.delete(lead) return True except SQLAlchemyError as e: logging.error(f"Error deleting lead {lead_id} and its sources: {str(e)}") session.rollback() return False def fetch_search_terms_with_lead_count(session): query = (session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')) .join(LeadSource, SearchTerm.id == LeadSource.search_term_id) .join(Lead, LeadSource.lead_id == Lead.id) .outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) .group_by(SearchTerm.term)) df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) return df def add_search_term(session, term, campaign_id): try: new_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=datetime.utcnow()) session.add(new_term) session.commit() return new_term.id except SQLAlchemyError as e: session.rollback() logging.error(f"Error adding search term: {str(e)}") raise def get_active_campaign_id(): return st.session_state.get('active_campaign_id', 1) def search_terms_page(): st.title("Search Terms Management") with db_session() as session: # Create new group section with st.expander("Create New Group", expanded=False): new_group_name = st.text_input("New Group Name") if st.button("Create Group"): if new_group_name.strip(): create_search_term_group(session, new_group_name) st.success(f"Group '{new_group_name}' created successfully!") st.rerun() else: st.warning("Please enter a group name") # Manage existing groups groups = session.query(SearchTermGroup).all() if groups: st.subheader("Existing Groups") for group in groups: with st.expander(f"Group: {group.name}", expanded=False): # Get all search terms all_terms = session.query(SearchTerm).filter_by(campaign_id=get_active_campaign_id()).all() # Get terms currently in this group group_terms = [term for term in all_terms if term.group_id == group.id] # Create options for multiselect term_options = [f"{term.id}:{term.term}" for term in all_terms] default_values = [f"{term.id}:{term.term}" for term in group_terms] # Display multiselect for terms selected_terms = st.multiselect( "Select terms for this group", options=term_options, default=default_values, format_func=lambda x: x.split(':')[1] ) if st.button("Update Group", key=f"update_{group.id}"): update_search_term_group(session, group.id, selected_terms) st.success("Group updated successfully!") st.rerun() if st.button("Delete Group", key=f"delete_{group.id}"): delete_search_term_group(session, group.id) st.success("Group deleted successfully!") st.rerun() # Add new search terms section st.subheader("Add New Search Term") with st.form("add_search_term_form"): new_term = st.text_input("New Search Term") group_options = ["None"] + [f"{g.id}:{g.name}" for g in groups] group_for_new_term = st.selectbox("Assign to Group", options=group_options) if st.form_submit_button("Add Term"): if new_term.strip(): add_new_search_term(session, new_term, get_active_campaign_id(), group_for_new_term) st.success(f"Term '{new_term}' added successfully!") st.rerun() else: st.warning("Please enter a search term") def update_search_term_group(session, group_id, updated_terms): try: # Get IDs of selected terms selected_term_ids = [int(term.split(':')[0]) for term in updated_terms] # Update all terms that should be in this group session.query(SearchTerm)\ .filter(SearchTerm.id.in_(selected_term_ids))\ .update({SearchTerm.group_id: group_id}, synchronize_session=False) # Remove group_id from terms that were unselected session.query(SearchTerm)\ .filter(SearchTerm.group_id == group_id)\ .filter(~SearchTerm.id.in_(selected_term_ids))\ .update({SearchTerm.group_id: None}, synchronize_session=False) session.commit() except Exception as e: session.rollback() logging.error(f"Error in update_search_term_group: {str(e)}") raise def add_new_search_term(session, new_term, campaign_id, group_for_new_term): try: group_id = None if group_for_new_term != "None": group_id = int(group_for_new_term.split(':')[0]) new_search_term = SearchTerm( term=new_term, campaign_id=campaign_id, group_id=group_id, created_at=datetime.utcnow() ) session.add(new_search_term) session.commit() except Exception as e: session.rollback() logging.error(f"Error adding search term: {str(e)}") raise def ai_group_search_terms(session, ungrouped_terms): existing_groups = session.query(SearchTermGroup).all() prompt = f""" Categorize these search terms into existing groups or suggest new ones: {', '.join([term.term for term in ungrouped_terms])} Existing groups: {', '.join([group.name for group in existing_groups])} Respond with a JSON object: {{group_name: [term1, term2, ...]}} """ messages = [ {"role": "system", "content": "You're an AI that categorizes search terms for lead generation. Be concise and efficient."}, {"role": "user", "content": prompt} ] response = openai_chat_completion(messages, function_name="ai_group_search_terms") return response if isinstance(response, dict) else {} def update_search_term_groups(session, grouped_terms): for group_name, terms in grouped_terms.items(): group = session.query(SearchTermGroup).filter_by(name=group_name).first() if not group: group = SearchTermGroup(name=group_name) session.add(group) session.flush() for term in terms: search_term = session.query(SearchTerm).filter_by(term=term).first() if search_term: search_term.group_id = group.id session.commit() def create_search_term_group(session, group_name): try: new_group = SearchTermGroup(name=group_name) session.add(new_group) session.commit() except Exception as e: session.rollback() logging.error(f"Error creating search term group: {str(e)}") def delete_search_term_group(session, group_id): try: group = session.query(SearchTermGroup).get(group_id) if group: # Set group_id to None for all search terms in this group session.query(SearchTerm).filter(SearchTerm.group_id == group_id).update({SearchTerm.group_id: None}) session.delete(group) except Exception as e: session.rollback() logging.error(f"Error deleting search term group: {str(e)}") def email_templates_page(): st.header("Email Templates") with db_session() as session: templates = session.query(EmailTemplate).all() with st.expander("Create New Template", expanded=False): new_template_name = st.text_input("Template Name", key="new_template_name") use_ai = st.checkbox("Use AI to generate template", key="use_ai") if use_ai: ai_prompt = st.text_area("Enter prompt for AI template generation", key="ai_prompt") use_kb = st.checkbox("Use Knowledge Base information", key="use_kb") if st.button("Generate Template", key="generate_ai_template"): with st.spinner("Generating template..."): kb_info = get_knowledge_base_info(session, get_active_project_id()) if use_kb else None generated_template = generate_or_adjust_email_template(ai_prompt, kb_info) new_template_subject = generated_template.get("subject", "AI Generated Subject") new_template_body = generated_template.get("body", "") if new_template_name: new_template = EmailTemplate( template_name=new_template_name, subject=new_template_subject, body_content=new_template_body, campaign_id=get_active_campaign_id() ) session.add(new_template) session.commit() st.success("AI-generated template created and saved!") templates = session.query(EmailTemplate).all() else: st.warning("Please provide a name for the template before generating.") st.subheader("Generated Template") st.text(f"Subject: {new_template_subject}") st.components.v1.html(wrap_email_body(new_template_body), height=400, scrolling=True) else: new_template_subject = st.text_input("Subject", key="new_template_subject") new_template_body = st.text_area("Body Content", height=200, key="new_template_body") if st.button("Create Template", key="create_template_button"): if all([new_template_name, new_template_subject, new_template_body]): new_template = EmailTemplate( template_name=new_template_name, subject=new_template_subject, body_content=new_template_body, campaign_id=get_active_campaign_id() ) session.add(new_template) session.commit() st.success("New template created successfully!") templates = session.query(EmailTemplate).all() else: st.warning("Please fill in all fields to create a new template.") if templates: st.subheader("Existing Templates") for template in templates: with st.expander(f"Template: {template.template_name}", expanded=False): col1, col2 = st.columns(2) edited_subject = col1.text_input("Subject", value=template.subject, key=f"subject_{template.id}") is_ai_customizable = col2.checkbox("AI Customizable", value=template.is_ai_customizable, key=f"ai_{template.id}") edited_body = st.text_area("Body Content", value=template.body_content, height=200, key=f"body_{template.id}") ai_adjustment_prompt = st.text_area("AI Adjustment Prompt", key=f"ai_prompt_{template.id}", placeholder="E.g., Make it less marketing-like and mention our main features") col3, col4 = st.columns(2) if col3.button("Apply AI Adjustment", key=f"apply_ai_{template.id}"): with st.spinner("Applying AI adjustment..."): kb_info = get_knowledge_base_info(session, get_active_project_id()) adjusted_template = generate_or_adjust_email_template(ai_adjustment_prompt, kb_info, current_template=edited_body) edited_subject = adjusted_template.get("subject", edited_subject) edited_body = adjusted_template.get("body", edited_body) st.success("AI adjustment applied. Please review and save changes.") if col4.button("Save Changes", key=f"save_{template.id}"): template.subject = edited_subject template.body_content = edited_body template.is_ai_customizable = is_ai_customizable session.commit() st.success("Template updated successfully!") st.markdown("### Preview") st.text(f"Subject: {edited_subject}") st.components.v1.html(wrap_email_body(edited_body), height=400, scrolling=True) if st.button("Delete Template", key=f"delete_{template.id}"): session.delete(template) session.commit() st.success("Template deleted successfully!") st.rerun() else: st.info("No email templates found. Create a new template to get started.") def get_email_preview(session, template_id, from_email, reply_to): template = session.query(EmailTemplate).filter_by(id=template_id).first() if template: wrapped_content = wrap_email_body(template.body_content) return wrapped_content return "

Template not found

" def fetch_all_search_terms(session): return session.query(SearchTerm).all() def get_knowledge_base_info(session, project_id): kb_info = session.query(KnowledgeBase).filter_by(project_id=project_id).first() return kb_info.to_dict() if kb_info else None def get_email_template_by_name(session, template_name): return session.query(EmailTemplate).filter_by(template_name=template_name).first() def bulk_send_page(): st.title("Bulk Email Sending") with db_session() as session: templates = fetch_email_templates(session) email_settings = fetch_email_settings(session) if not templates or not email_settings: st.error("No email templates or settings available. Please set them up first.") return template_option = st.selectbox("Email Template", options=templates, format_func=lambda x: x.split(":")[1].strip()) template_id = int(template_option.split(":")[0]) template = session.query(EmailTemplate).filter_by(id=template_id).first() col1, col2 = st.columns(2) with col1: subject = st.text_input("Subject", value=template.subject if template else "") email_setting_option = st.selectbox("From Email", options=email_settings, format_func=lambda x: f"{x['name']} ({x['email']})") if email_setting_option: from_email = email_setting_option['email'] reply_to = st.text_input("Reply To", email_setting_option['email']) else: st.error("Selected email setting not found. Please choose a valid email setting.") return with col2: send_option = st.radio("Send to:", ["All Leads", "Specific Email", "Leads from Chosen Search Terms", "Leads from Search Term Groups"]) specific_email = None selected_terms = None if send_option == "Specific Email": specific_email = st.text_input("Enter email") elif send_option == "Leads from Chosen Search Terms": search_terms_with_counts = fetch_search_terms_with_lead_count(session) selected_terms = st.multiselect("Select Search Terms", options=search_terms_with_counts['Term'].tolist()) selected_terms = [term.split(" (")[0] for term in selected_terms] elif send_option == "Leads from Search Term Groups": groups = fetch_search_term_groups(session) selected_groups = st.multiselect("Select Search Term Groups", options=groups) if selected_groups: group_ids = [int(group.split(':')[0]) for group in selected_groups] selected_terms = fetch_search_terms_for_groups(session, group_ids) exclude_previously_contacted = st.checkbox("Exclude Previously Contacted Domains", value=True) st.markdown("### Email Preview") st.text(f"From: {from_email}\nReply-To: {reply_to}\nSubject: {subject}") st.components.v1.html(get_email_preview(session, template_id, from_email, reply_to), height=600, scrolling=True) leads = fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted) total_leads = len(leads) eligible_leads = [lead for lead in leads if lead.get('language', template.language) == template.language] contactable_leads = [lead for lead in eligible_leads if not (exclude_previously_contacted and lead.get('domain_contacted', False))] st.info(f"Total leads: {total_leads}\n" f"Leads matching template language ({template.language}): {len(eligible_leads)}\n" f"Leads to be contacted: {len(contactable_leads)}") if st.button("Send Emails", type="primary"): if not contactable_leads: st.warning("No leads found matching the selected criteria.") return progress_bar = st.progress(0) status_text = st.empty() results = [] log_container = st.empty() logs, sent_count = bulk_send_emails(session, template_id, from_email, reply_to, contactable_leads, progress_bar, status_text, results, log_container) st.success(f"Emails sent successfully to {sent_count} leads.") st.subheader("Sending Results") results_df = pd.DataFrame(results) st.dataframe(results_df) success_rate = (results_df['Status'] == 'sent').mean() st.metric("Email Sending Success Rate", f"{success_rate:.2%}") def fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted): try: query = session.query(Lead) if send_option == "Specific Email": query = query.filter(Lead.email == specific_email) elif send_option in ["Leads from Chosen Search Terms", "Leads from Search Term Groups"] and selected_terms: query = query.join(LeadSource).join(SearchTerm).filter(SearchTerm.term.in_(selected_terms)) if exclude_previously_contacted: subquery = session.query(EmailCampaign.lead_id).filter(EmailCampaign.sent_at.isnot(None)).subquery() query = query.outerjoin(subquery, Lead.id == subquery.c.lead_id).filter(subquery.c.lead_id.is_(None)) return [{"Email": lead.email, "ID": lead.id} for lead in query.all()] except Exception as e: logging.error(f"Error fetching leads: {str(e)}") return [] def fetch_email_settings(session): try: settings = session.query(EmailSettings).all() return [{"id": setting.id, "name": setting.name, "email": setting.email} for setting in settings] except Exception as e: logging.error(f"Error fetching email settings: {e}") return [] def fetch_search_terms_with_lead_count(session): query = (session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')) .join(LeadSource, SearchTerm.id == LeadSource.search_term_id) .join(Lead, LeadSource.lead_id == Lead.id) .outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) .group_by(SearchTerm.term)) df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) return df def fetch_search_term_groups(session): return [f"{group.id}: {group.name}" for group in session.query(SearchTermGroup).all()] def fetch_search_terms_for_groups(session, group_ids): terms = session.query(SearchTerm).filter(SearchTerm.group_id.in_(group_ids)).all() return [term.term for term in terms] def ai_automation_loop(session, log_container, leads_container): automation_logs, total_search_terms, total_emails_sent = [], 0, 0 while st.session_state.get('automation_status', False): try: log_container.info("Starting automation cycle") kb_info = get_knowledge_base_info(session, get_active_project_id()) if not kb_info: log_container.warning("Knowledge Base not found. Skipping cycle.") time.sleep(3600) continue base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) st.subheader("Optimized Search Terms") st.write(", ".join(optimized_terms)) total_search_terms = len(optimized_terms) progress_bar = st.progress(0) for idx, term in enumerate(optimized_terms): results = manual_search(session, [term], 10, ignore_previously_fetched=True) new_leads = [(save_lead(session, res['Email'], url=res['URL']).id, res['Email']) for res in results['results'] if save_lead(session, res['Email'], url=res['URL'])] if new_leads: template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() if template: from_email = kb_info.get('contact_email', 'hello@indosy.com') reply_to = kb_info.get('contact_email', 'eugproductions@gmail.com') logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) automation_logs.extend(logs) total_emails_sent += sent_count leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) progress_bar.progress((idx + 1) / len(optimized_terms)) st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") time.sleep(3600) except Exception as e: log_container.error(f"Critical error in automation cycle: {str(e)}") time.sleep(300) log_container.info("Automation stopped") st.session_state.update({"automation_logs": automation_logs, "total_leads_found": total_search_terms, "total_emails_sent": total_emails_sent}) def display_search_results(results, key_suffix): if not results: return st.warning("No results to display.") with st.expander("Search Results", expanded=True): st.markdown(f"### Total Leads Found: **{len(results)}**") for i, res in enumerate(results): with st.expander(f"Lead: {res['Email']}", key=f"lead_expander_{key_suffix}_{i}"): st.markdown(f"**URL:** [{res['URL']}]({res['URL']}) \n**Title:** {res['Title']} \n**Description:** {res['Description']} \n**Tags:** {', '.join(res['Tags'])} \n**Lead Source:** {res['Lead Source']} \n**Lead Email:** {res['Email']}") def perform_quick_scan(session): with st.spinner("Performing quick scan..."): terms = session.query(SearchTerm).order_by(func.random()).limit(3).all() email_setting = fetch_email_settings(session)[0] if fetch_email_settings(session) else None from_email = email_setting['email'] if email_setting else None reply_to = from_email email_template = session.query(EmailTemplate).first() res = manual_search(session, [term.term for term in terms], 10, True, False, False, True, "EN", True, st.empty(), from_email, reply_to, f"{email_template.id}: {email_template.template_name}" if email_template else None) st.success(f"Quick scan completed! Found {len(res['results'])} new leads.") return {"new_leads": len(res['results']), "terms_used": [term.term for term in terms]} def generate_optimized_search_terms(session, base_terms, kb_info): prompt = f"Optimize and expand these search terms for lead generation:\n{', '.join(base_terms)}\n\nConsider:\n1. Relevance to business and target market\n2. Potential for high-quality leads\n3. Variations and related terms\n4. Industry-specific jargon\n\nRespond with a JSON array of optimized terms." response = openai_chat_completion([{"role": "system", "content": "You're an AI specializing in optimizing search terms for lead generation. Be concise and effective."}, {"role": "user", "content": prompt}], function_name="generate_optimized_search_terms") return response.get('optimized_terms', base_terms) if isinstance(response, dict) else base_terms def fetch_search_terms_with_lead_count(session): query = (session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')) .join(LeadSource, SearchTerm.id == LeadSource.search_term_id) .join(Lead, LeadSource.lead_id == Lead.id) .outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) .group_by(SearchTerm.term)) df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) return df def fetch_leads_for_search_terms(session, search_term_ids) -> List[Lead]: return session.query(Lead).distinct().join(LeadSource).filter(LeadSource.search_term_id.in_(search_term_ids)).all() def projects_campaigns_page(): with db_session() as session: st.header("Projects and Campaigns") st.subheader("Add New Project") with st.form("add_project_form"): project_name = st.text_input("Project Name") if st.form_submit_button("Add Project"): if project_name.strip(): try: session.add(Project(project_name=project_name, created_at=datetime.utcnow())) session.commit() st.success(f"Project '{project_name}' added successfully.") except SQLAlchemyError as e: st.error(f"Error adding project: {str(e)}") else: st.warning("Please enter a project name.") st.subheader("Existing Projects and Campaigns") projects = session.query(Project).all() for project in projects: with st.expander(f"Project: {project.project_name}"): st.info("Campaigns share resources and settings within a project.") with st.form(f"add_campaign_form_{project.id}"): campaign_name = st.text_input("Campaign Name", key=f"campaign_name_{project.id}") if st.form_submit_button("Add Campaign"): if campaign_name.strip(): try: session.add(Campaign(campaign_name=campaign_name, project_id=project.id, created_at=datetime.utcnow())) session.commit() st.success(f"Campaign '{campaign_name}' added to '{project.project_name}'.") except SQLAlchemyError as e: st.error(f"Error adding campaign: {str(e)}") else: st.warning("Please enter a campaign name.") campaigns = session.query(Campaign).filter_by(project_id=project.id).all() st.write("Campaigns:" if campaigns else f"No campaigns for {project.project_name} yet.") for campaign in campaigns: st.write(f"- {campaign.campaign_name}") st.subheader("Set Active Project and Campaign") project_options = [p.project_name for p in projects] if project_options: active_project = st.selectbox("Select Active Project", options=project_options, index=0) active_project_id = session.query(Project.id).filter_by(project_name=active_project).scalar() set_active_project_id(active_project_id) active_project_campaigns = session.query(Campaign).filter_by(project_id=active_project_id).all() if active_project_campaigns: campaign_options = [c.campaign_name for c in active_project_campaigns] active_campaign = st.selectbox("Select Active Campaign", options=campaign_options, index=0) active_campaign_id = session.query(Campaign.id).filter_by(campaign_name=active_campaign, project_id=active_project_id).scalar() set_active_campaign_id(active_campaign_id) st.success(f"Active Project: {active_project}, Active Campaign: {active_campaign}") else: st.warning(f"No campaigns available for {active_project}. Please add a campaign.") else: st.warning("No projects found. Please add a project first.") def knowledge_base_page(): st.title("Knowledge Base") with db_session() as session: project_options = fetch_projects(session) if not project_options: return st.warning("No projects found. Please create a project first.") selected_project = st.selectbox("Select Project", options=project_options) project_id = int(selected_project.split(":")[0]) set_active_project_id(project_id) kb_entry = session.query(KnowledgeBase).filter_by(project_id=project_id).first() with st.form("knowledge_base_form"): fields = ['kb_name', 'kb_bio', 'kb_values', 'contact_name', 'contact_role', 'contact_email', 'company_description', 'company_mission', 'company_target_market', 'company_other', 'product_name', 'product_description', 'product_target_customer', 'product_other', 'other_context', 'example_email'] form_data = {field: st.text_input(field.replace('_', ' ').title(), value=getattr(kb_entry, field, '')) if field in ['kb_name', 'contact_name', 'contact_role', 'contact_email', 'product_name'] else st.text_area(field.replace('_', ' ').title(), value=getattr(kb_entry, field, '')) for field in fields} if st.form_submit_button("Save Knowledge Base"): try: form_data.update({'project_id': project_id, 'created_at': datetime.utcnow()}) if kb_entry: for k, v in form_data.items(): setattr(kb_entry, k, v) else: session.add(KnowledgeBase(**form_data)) session.commit() st.success("Knowledge Base saved successfully!", icon="✅") except Exception as e: st.error(f"An error occurred while saving the Knowledge Base: {str(e)}") def autoclient_ai_page(): st.header("AutoclientAI - Automated Lead Generation") with st.expander("Knowledge Base Information", expanded=False): with db_session() as session: kb_info = get_knowledge_base_info(session, get_active_project_id()) if not kb_info: return st.error("Knowledge Base not found for the active project. Please set it up first.") st.json(kb_info) user_input = st.text_area("Enter additional context or specific goals for lead generation:", help="This information will be used to generate more targeted search terms.") if st.button("Generate Optimized Search Terms", key="generate_optimized_terms"): with st.spinner("Generating optimized search terms..."): with db_session() as session: base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) if optimized_terms: st.session_state.optimized_terms = optimized_terms st.success("Search terms optimized successfully!") st.subheader("Optimized Search Terms") st.write(", ".join(optimized_terms)) else: st.error("Failed to generate optimized search terms. Please try again.") if st.button("Start Automation", key="start_automation"): st.session_state.update({"automation_status": True, "automation_logs": [], "total_leads_found": 0, "total_emails_sent": 0}) st.success("Automation started!") if st.session_state.get('automation_status', False): st.subheader("Automation in Progress") progress_bar, log_container, leads_container, analytics_container = st.progress(0), st.empty(), st.empty(), st.empty() try: with db_session() as session: ai_automation_loop(session, log_container, leads_container) except Exception as e: st.error(f"An error occurred in the automation process: {str(e)}") st.session_state.automation_status = False if not st.session_state.get('automation_status', False) and st.session_state.get('automation_logs'): st.subheader("Automation Results") st.metric("Total Leads Found", st.session_state.total_leads_found) st.metric("Total Emails Sent", st.session_state.total_emails_sent) st.subheader("Automation Logs") st.text_area("Logs", "\n".join(st.session_state.automation_logs), height=300) if 'email_logs' in st.session_state: st.subheader("Email Sending Logs") df_logs = pd.DataFrame(st.session_state.email_logs) st.dataframe(df_logs) success_rate = (df_logs['Status'] == 'sent').mean() * 100 st.metric("Email Sending Success Rate", f"{success_rate:.2f}%") st.subheader("Debug Information") st.json(st.session_state) st.write("Current function:", autoclient_ai_page.__name__) st.write("Session state keys:", list(st.session_state.keys())) def update_search_terms(session, classified_terms): for group, terms in classified_terms.items(): for term in terms: existing_term = session.query(SearchTerm).filter_by(term=term, project_id=get_active_project_id()).first() if existing_term: existing_term.group = group else: session.add(SearchTerm(term=term, group=group, project_id=get_active_project_id())) session.commit() def update_results_display(results_container, results): results_container.markdown( f"""

Found Leads ({len(results)})

{"".join(f'
{res["Email"]}
{res["URL"]}
' for res in results[-10:])}
""", unsafe_allow_html=True ) def automation_control_panel_page(): # Initialize persistent state variables if 'automation_running' not in st.session_state: st.session_state.automation_running = False if 'automation_stats' not in st.session_state: st.session_state.automation_stats = { 'total_leads': 0, 'emails_sent': 0, 'last_run': None, 'current_term': None } if 'automation_logs' not in st.session_state: st.session_state.automation_logs = [] st.title("Automation Control Panel") col1, col2 = st.columns([2, 1]) with col1: status = "Active" if st.session_state.get('automation_status', False) else "Inactive" st.metric("Automation Status", status) with col2: button_text = "Stop Automation" if st.session_state.get('automation_status', False) else "Start Automation" if st.button(button_text, use_container_width=True): st.session_state.automation_status = not st.session_state.get('automation_status', False) if st.session_state.automation_status: st.session_state.automation_logs = [] st.rerun() if st.button("Perform Quick Scan", use_container_width=True): with st.spinner("Performing quick scan..."): try: with db_session() as session: new_leads = session.query(Lead).filter(Lead.is_processed == False).count() session.query(Lead).filter(Lead.is_processed == False).update({Lead.is_processed: True}) session.commit() st.success(f"Quick scan completed! Found {new_leads} new leads.") except Exception as e: st.error(f"An error occurred during quick scan: {str(e)}") st.subheader("Real-Time Analytics") try: with db_session() as session: total_leads = session.query(Lead).count() emails_sent = session.query(EmailCampaign).count() col1, col2 = st.columns(2) col1.metric("Total Leads", total_leads) col2.metric("Emails Sent", emails_sent) except Exception as e: st.error(f"An error occurred while displaying analytics: {str(e)}") st.subheader("Automation Logs") log_container = st.empty() update_display(log_container, st.session_state.get('automation_logs', []), "Latest Logs", "log") st.subheader("Recently Found Leads") leads_container = st.empty() if st.session_state.get('automation_status', False): st.info("Automation is currently running in the background.") try: with db_session() as session: while st.session_state.get('automation_status', False): kb_info = get_knowledge_base_info(session, get_active_project_id()) if not kb_info: st.session_state.automation_logs.append("Knowledge Base not found. Skipping cycle.") time.sleep(3600) continue base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) new_leads_all = [] for term in optimized_terms: results = manual_search(session, [term], 10) new_leads = [(res['Email'], res['URL']) for res in results['results'] if save_lead(session, res['Email'], url=res['URL'])] new_leads_all.extend(new_leads) if new_leads: template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() if template: from_email = kb_info.get('contact_email') or 'hello@indosy.com' reply_to = kb_info.get('contact_email') or 'eugproductions@gmail.com' logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for email, _ in new_leads]) st.session_state.automation_logs.extend(logs) if new_leads_all: leads_df = pd.DataFrame(new_leads_all, columns=['Email', 'URL']) leads_container.dataframe(leads_df, hide_index=True) else: leads_container.info("No new leads found in this cycle.") update_display(log_container, st.session_state.get('automation_logs', []), "Latest Logs", "log") time.sleep(3600) except Exception as e: st.error(f"An error occurred in the automation process: {str(e)}") def get_knowledge_base_info(session, project_id): kb = session.query(KnowledgeBase).filter_by(project_id=project_id).first() return kb.to_dict() if kb else None def generate_optimized_search_terms(session, base_terms, kb_info): ai_prompt = f"Generate 5 optimized search terms based on: {', '.join(base_terms)}. Context: {kb_info}" return get_ai_response(ai_prompt).split('\n') def update_display(container, items, title, item_type): container.markdown(f"

{title}

", unsafe_allow_html=True) for item in items[-10:]: container.text(item) def get_search_terms(session): return [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] def get_ai_response(prompt): return openai.Completion.create(engine="text-davinci-002", prompt=prompt, max_tokens=100).choices[0].text.strip() def fetch_email_settings(session): try: settings = session.query(EmailSettings).all() return [{"id": setting.id, "name": setting.name, "email": setting.email} for setting in settings] except Exception as e: logging.error(f"Error fetching email settings: {e}") return [] def bulk_send_emails(session, template_id, from_email, reply_to, leads, progress_bar=None, status_text=None, results=None, log_container=None): template = session.query(EmailTemplate).filter_by(id=template_id).first() if not template: logging.error(f"Email template with ID {template_id} not found.") return [], 0 email_subject = template.subject email_content = template.body_content logs, sent_count = [], 0 total_leads = len(leads) for index, lead in enumerate(leads): try: validate_email(lead['Email']) response, tracking_id = send_email_ses(session, from_email, lead['Email'], email_subject, email_content, reply_to=reply_to) if response: status = 'sent' message_id = response.get('MessageId', f"sent-{uuid.uuid4()}") sent_count += 1 log_message = f"✅ Email sent to: {lead['Email']}" else: status = 'failed' message_id = f"failed-{uuid.uuid4()}" log_message = f"❌ Failed to send email to: {lead['Email']}" save_email_campaign(session, lead['Email'], template_id, status, datetime.utcnow(), email_subject, message_id, email_content) logs.append(log_message) if progress_bar: progress_bar.progress((index + 1) / total_leads) if status_text: status_text.text(f"Processed {index + 1}/{total_leads} leads") if results is not None: results.append({"Email": lead['Email'], "Status": status}) if log_container: log_container.text(log_message) except EmailNotValidError: log_message = f"❌ Invalid email address: {lead['Email']}" logs.append(log_message) except Exception as e: error_message = f"Error sending email to {lead['Email']}: {str(e)}" logging.error(error_message) save_email_campaign(session, lead['Email'], template_id, 'failed', datetime.utcnow(), email_subject, f"error-{uuid.uuid4()}", email_content) logs.append(f"❌ Error sending email to: {lead['Email']} (Error: {str(e)})") return logs, sent_count def wrap_email_body(body_content): return f""" Email Template {body_content} """ def fetch_sent_email_campaigns(session): try: email_campaigns = session.query(EmailCampaign).join(Lead).join(EmailTemplate).options(joinedload(EmailCampaign.lead), joinedload(EmailCampaign.template)).order_by(EmailCampaign.sent_at.desc()).all() return pd.DataFrame({ 'ID': [ec.id for ec in email_campaigns], 'Sent At': [ec.sent_at.strftime("%Y-%m-%d %H:%M:%S") if ec.sent_at else "" for ec in email_campaigns], 'Email': [ec.lead.email for ec in email_campaigns], 'Template': [ec.template.template_name for ec in email_campaigns], 'Subject': [ec.customized_subject or "No subject" for ec in email_campaigns], 'Content': [ec.customized_content or "No content" for ec in email_campaigns], 'Status': [ec.status for ec in email_campaigns], 'Message ID': [ec.message_id or "No message ID" for ec in email_campaigns], 'Campaign ID': [ec.campaign_id for ec in email_campaigns], 'Lead Name': [f"{ec.lead.first_name or ''} {ec.lead.last_name or ''}".strip() or "Unknown" for ec in email_campaigns], 'Lead Company': [ec.lead.company or "Unknown" for ec in email_campaigns] }) except SQLAlchemyError as e: logging.error(f"Database error in fetch_sent_email_campaigns: {str(e)}") return pd.DataFrame() def display_logs(log_container, logs): if not logs: log_container.info("No logs to display yet.") return log_container.markdown( """ """, unsafe_allow_html=True ) log_entries = "".join(f'
{log}
' for log in logs[-20:]) log_container.markdown(f'
{log_entries}
', unsafe_allow_html=True) def view_sent_email_campaigns(): st.header("Sent Email Campaigns") try: with db_session() as session: email_campaigns = fetch_sent_email_campaigns(session) if not email_campaigns.empty: st.dataframe(email_campaigns) st.subheader("Detailed Content") selected_campaign = st.selectbox("Select a campaign to view details", email_campaigns['ID'].tolist()) if selected_campaign: campaign_content = email_campaigns[email_campaigns['ID'] == selected_campaign]['Content'].iloc[0] st.text_area("Content", campaign_content if campaign_content else "No content available", height=300) else: st.info("No sent email campaigns found.") except Exception as e: st.error(f"An error occurred while fetching sent email campaigns: {str(e)}") logging.error(f"Error in view_sent_email_campaigns: {str(e)}") def main(): st.set_page_config( page_title="Autoclient.ai | Lead Generation AI App", layout="wide", initial_sidebar_state="expanded", page_icon="" ) st.sidebar.title("AutoclientAI") st.sidebar.markdown("Select a page to navigate through the application.") pages = { "🔍 Manual Search": manual_search_page, "📦 Bulk Send": bulk_send_page, "👥 View Leads": view_leads_page, "🔑 Search Terms": search_terms_page, "✉️ Email Templates": email_templates_page, "🚀 Projects & Campaigns": projects_campaigns_page, "📚 Knowledge Base": knowledge_base_page, "🤖 AutoclientAI": autoclient_ai_page, "⚙️ Automation Control": automation_control_panel_page, "📨 Email Logs": view_campaign_logs, "🔄 Settings": settings_page, "📨 Sent Campaigns": view_sent_email_campaigns } with st.sidebar: selected = option_menu( menu_title="Navigation", options=list(pages.keys()), icons=["search", "send", "people", "key", "envelope", "folder", "book", "robot", "gear", "list-check", "gear", "envelope-open"], menu_icon="cast", default_index=0 ) try: pages[selected]() except Exception as e: st.error(f"An error occurred: {str(e)}") logging.exception("An error occurred in the main function") st.write("Please try refreshing the page or contact support if the issue persists.") st.sidebar.markdown("---") st.sidebar.info("© 2024 AutoclientAI. All rights reserved.") if __name__ == "__main__": main()