Spaces:
Sleeping
Sleeping
import os, json, re, logging, asyncio, time, requests, pandas as pd, streamlit as st, openai, boto3, uuid, aiohttp, urllib3, random, html, smtplib | |
from datetime import datetime, timedelta | |
from dotenv import load_dotenv | |
from bs4 import BeautifulSoup | |
from googlesearch import search as google_search | |
from fake_useragent import UserAgent | |
from sqlalchemy import func, create_engine, Column, BigInteger, Text, DateTime, ForeignKey, Boolean, JSON, select, text, distinct, and_ | |
from sqlalchemy.orm import declarative_base, sessionmaker, relationship, Session, joinedload | |
from sqlalchemy.exc import SQLAlchemyError | |
from botocore.exceptions import ClientError | |
from tenacity import retry, stop_after_attempt, wait_random_exponential, wait_fixed | |
from email_validator import validate_email, EmailNotValidError | |
from streamlit_option_menu import option_menu | |
from openai import OpenAI | |
from typing import List, Optional | |
from urllib.parse import urlparse, urlencode | |
from streamlit_tags import st_tags | |
import plotly.express as px | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
from email.mime.text import MIMEText | |
from email.mime.multipart import MIMEMultipart | |
from contextlib import contextmanager | |
#database info | |
DB_HOST = os.getenv("SUPABASE_DB_HOST") | |
DB_NAME = os.getenv("SUPABASE_DB_NAME") | |
DB_USER = os.getenv("SUPABASE_DB_USER") | |
DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD") | |
DB_PORT = os.getenv("SUPABASE_DB_PORT") | |
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" | |
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) | |
load_dotenv() | |
DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT = map(os.getenv, ["SUPABASE_DB_HOST", "SUPABASE_DB_NAME", "SUPABASE_DB_USER", "SUPABASE_DB_PASSWORD", "SUPABASE_DB_PORT"]) | |
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}" | |
engine = create_engine(DATABASE_URL, pool_size=20, max_overflow=0) | |
SessionLocal, Base = sessionmaker(bind=engine), declarative_base() | |
if not all([DB_HOST, DB_NAME, DB_USER, DB_PASSWORD, DB_PORT]): | |
raise ValueError("One or more required database environment variables are not set") | |
class Project(Base): | |
__tablename__ = 'projects' | |
id = Column(BigInteger, primary_key=True) | |
project_name = Column(Text, default="Default Project") | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
campaigns = relationship("Campaign", back_populates="project") | |
knowledge_base = relationship("KnowledgeBase", back_populates="project", uselist=False) | |
class Campaign(Base): | |
__tablename__ = 'campaigns' | |
id = Column(BigInteger, primary_key=True) | |
campaign_name = Column(Text, default="Default Campaign") | |
campaign_type = Column(Text, default="Email") | |
project_id = Column(BigInteger, ForeignKey('projects.id'), default=1) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
auto_send = Column(Boolean, default=False) | |
loop_automation = Column(Boolean, default=False) | |
ai_customization = Column(Boolean, default=False) | |
max_emails_per_group = Column(BigInteger, default=500) | |
loop_interval = Column(BigInteger, default=60) | |
project = relationship("Project", back_populates="campaigns") | |
email_campaigns = relationship("EmailCampaign", back_populates="campaign") | |
search_terms = relationship("SearchTerm", back_populates="campaign") | |
campaign_leads = relationship("CampaignLead", back_populates="campaign") | |
class CampaignLead(Base): | |
__tablename__ = 'campaign_leads' | |
id = Column(BigInteger, primary_key=True) | |
campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) | |
lead_id = Column(BigInteger, ForeignKey('leads.id')) | |
status = Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
lead = relationship("Lead", back_populates="campaign_leads") | |
campaign = relationship("Campaign", back_populates="campaign_leads") | |
class KnowledgeBase(Base): | |
__tablename__ = 'knowledge_base' | |
id = Column(BigInteger, primary_key=True) | |
project_id = Column(BigInteger, ForeignKey('projects.id'), nullable=False) | |
kb_name, kb_bio, kb_values, contact_name, contact_role, contact_email = [Column(Text) for _ in range(6)] | |
company_description, company_mission, company_target_market, company_other = [Column(Text) for _ in range(4)] | |
product_name, product_description, product_target_customer, product_other = [Column(Text) for _ in range(4)] | |
other_context, example_email = Column(Text), Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
updated_at = Column(DateTime(timezone=True), onupdate=func.now()) | |
project = relationship("Project", back_populates="knowledge_base") | |
def to_dict(self): | |
return {attr: getattr(self, attr) for attr in ['kb_name', 'kb_bio', 'kb_values', 'contact_name', 'contact_role', 'contact_email', 'company_description', 'company_mission', 'company_target_market', 'company_other', 'product_name', 'product_description', 'product_target_customer', 'product_other', 'other_context', 'example_email']} | |
# Update the Lead model to remove the domain attribute | |
class Lead(Base): | |
__tablename__ = 'leads' | |
id = Column(BigInteger, primary_key=True) | |
email = Column(Text, unique=True) | |
phone, first_name, last_name, company, job_title = [Column(Text) for _ in range(5)] | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
# Remove the domain column | |
campaign_leads = relationship("CampaignLead", back_populates="lead") | |
lead_sources = relationship("LeadSource", back_populates="lead") | |
email_campaigns = relationship("EmailCampaign", back_populates="lead") | |
class EmailTemplate(Base): | |
__tablename__ = 'email_templates' | |
id = Column(BigInteger, primary_key=True) | |
campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) | |
template_name, subject, body_content = Column(Text), Column(Text), Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
is_ai_customizable = Column(Boolean, default=False) | |
language = Column(Text, default='ES') # Add language column | |
campaign = relationship("Campaign") | |
email_campaigns = relationship("EmailCampaign", back_populates="template") | |
class EmailCampaign(Base): | |
__tablename__ = 'email_campaigns' | |
id = Column(BigInteger, primary_key=True) | |
campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) | |
lead_id = Column(BigInteger, ForeignKey('leads.id')) | |
template_id = Column(BigInteger, ForeignKey('email_templates.id')) | |
customized_subject = Column(Text) | |
customized_content = Column(Text) # Make sure this column exists | |
original_subject = Column(Text) | |
original_content = Column(Text) | |
status = Column(Text) | |
engagement_data = Column(JSON) | |
message_id = Column(Text) | |
tracking_id = Column(Text, unique=True) | |
sent_at = Column(DateTime(timezone=True)) | |
ai_customized = Column(Boolean, default=False) | |
opened_at = Column(DateTime(timezone=True)) | |
clicked_at = Column(DateTime(timezone=True)) | |
open_count = Column(BigInteger, default=0) | |
click_count = Column(BigInteger, default=0) | |
campaign = relationship("Campaign", back_populates="email_campaigns") | |
lead = relationship("Lead", back_populates="email_campaigns") | |
template = relationship("EmailTemplate", back_populates="email_campaigns") | |
class OptimizedSearchTerm(Base): | |
__tablename__ = 'optimized_search_terms' | |
id = Column(BigInteger, primary_key=True) | |
original_term_id = Column(BigInteger, ForeignKey('search_terms.id')) | |
term = Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
original_term = relationship("SearchTerm", back_populates="optimized_terms") | |
class SearchTermEffectiveness(Base): | |
__tablename__ = 'search_term_effectiveness' | |
id = Column(BigInteger, primary_key=True) | |
search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) | |
total_results, valid_leads, irrelevant_leads, blogs_found, directories_found = [Column(BigInteger) for _ in range(5)] | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
search_term = relationship("SearchTerm", back_populates="effectiveness") | |
class SearchTermGroup(Base): | |
__tablename__ = 'search_term_groups' | |
id = Column(BigInteger, primary_key=True) | |
name, email_template, description = Column(Text), Column(Text), Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
search_terms = relationship("SearchTerm", back_populates="group") | |
class SearchTerm(Base): | |
__tablename__ = 'search_terms' | |
id = Column(BigInteger, primary_key=True) | |
group_id = Column(BigInteger, ForeignKey('search_term_groups.id')) | |
campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) | |
term, category = Column(Text), Column(Text) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
language = Column(Text, default='ES') # Add language column | |
group = relationship("SearchTermGroup", back_populates="search_terms") | |
campaign = relationship("Campaign", back_populates="search_terms") | |
optimized_terms = relationship("OptimizedSearchTerm", back_populates="original_term") | |
lead_sources = relationship("LeadSource", back_populates="search_term") | |
effectiveness = relationship("SearchTermEffectiveness", back_populates="search_term", uselist=False) | |
class LeadSource(Base): | |
__tablename__ = 'lead_sources' | |
id = Column(BigInteger, primary_key=True) | |
lead_id = Column(BigInteger, ForeignKey('leads.id')) | |
search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) | |
url, domain, page_title, meta_description, scrape_duration = [Column(Text) for _ in range(5)] | |
meta_tags, phone_numbers, content, tags = [Column(Text) for _ in range(4)] | |
http_status = Column(BigInteger) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
lead = relationship("Lead", back_populates="lead_sources") | |
search_term = relationship("SearchTerm", back_populates="lead_sources") | |
class AIRequestLog(Base): | |
__tablename__ = 'ai_request_logs' | |
id = Column(BigInteger, primary_key=True) | |
function_name, prompt, response, model_used = [Column(Text) for _ in range(4)] | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
lead_id = Column(BigInteger, ForeignKey('leads.id')) | |
email_campaign_id = Column(BigInteger, ForeignKey('email_campaigns.id')) | |
lead = relationship("Lead") | |
email_campaign = relationship("EmailCampaign") | |
class AutomationLog(Base): | |
__tablename__ = 'automation_logs' | |
id = Column(BigInteger, primary_key=True) | |
campaign_id = Column(BigInteger, ForeignKey('campaigns.id')) | |
search_term_id = Column(BigInteger, ForeignKey('search_terms.id')) | |
leads_gathered, emails_sent = Column(BigInteger), Column(BigInteger) | |
start_time = Column(DateTime(timezone=True), server_default=func.now()) | |
end_time = Column(DateTime(timezone=True)) | |
status, logs = Column(Text), Column(JSON) | |
campaign = relationship("Campaign") | |
search_term = relationship("SearchTerm") | |
# Replace the existing EmailSettings class with this unified Settings class | |
class Settings(Base): | |
__tablename__ = 'settings' | |
id = Column(BigInteger, primary_key=True) | |
name = Column(Text, nullable=False) | |
setting_type = Column(Text, nullable=False) # 'general', 'email', etc. | |
value = Column(JSON, nullable=False) | |
created_at = Column(DateTime(timezone=True), server_default=func.now()) | |
updated_at = Column(DateTime(timezone=True), onupdate=func.now()) | |
class EmailSettings(Base): | |
__tablename__ = 'email_settings' | |
id = Column(BigInteger, primary_key=True) | |
name = Column(Text, nullable=False) | |
email = Column(Text, nullable=False) | |
provider = Column(Text, nullable=False) | |
smtp_server = Column(Text) | |
smtp_port = Column(BigInteger) | |
smtp_username = Column(Text) | |
smtp_password = Column(Text) | |
aws_access_key_id = Column(Text) | |
aws_secret_access_key = Column(Text) | |
aws_region = Column(Text) | |
DATABASE_URL = os.environ.get("DATABASE_URL") | |
if not DATABASE_URL: | |
raise ValueError("DATABASE_URL not set") | |
engine = create_engine(DATABASE_URL) | |
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
Base.metadata.create_all(bind=engine) | |
def db_session(): | |
session = SessionLocal() | |
try: | |
yield session | |
session.commit() | |
except Exception: | |
session.rollback() | |
raise | |
finally: | |
session.close() | |
def settings_page(): | |
st.title("Settings") | |
with db_session() as session: | |
general_settings = session.query(Settings).filter_by(setting_type='general').first() or Settings(name='General Settings', setting_type='general', value={}) | |
st.header("General Settings") | |
with st.form("general_settings_form"): | |
openai_api_key = st.text_input("OpenAI API Key", value=general_settings.value.get('openai_api_key', ''), type="password") | |
openai_api_base = st.text_input("OpenAI API Base URL", value=general_settings.value.get('openai_api_base', 'https://api.openai.com/v1')) | |
openai_model = st.text_input("OpenAI Model", value=general_settings.value.get('openai_model', 'gpt-4o-mini')) | |
if st.form_submit_button("Save General Settings"): | |
general_settings.value = {'openai_api_key': openai_api_key, 'openai_api_base': openai_api_base, 'openai_model': openai_model} | |
session.add(general_settings) | |
session.commit() | |
st.success("General settings saved successfully!") | |
st.header("Email Settings") | |
email_settings = session.query(EmailSettings).all() | |
for setting in email_settings: | |
with st.expander(f"{setting.name} ({setting.email})"): | |
st.write(f"Provider: {setting.provider}") | |
st.write(f"{'SMTP Server: ' + setting.smtp_server if setting.provider == 'smtp' else 'AWS Region: ' + setting.aws_region}") | |
if st.button(f"Delete {setting.name}", key=f"delete_{setting.id}"): | |
session.delete(setting) | |
session.commit() | |
st.success(f"Deleted {setting.name}") | |
st.rerun() | |
edit_id = st.selectbox("Edit existing setting", ["New Setting"] + [f"{s.id}: {s.name}" for s in email_settings]) | |
edit_setting = session.query(EmailSettings).get(int(edit_id.split(":")[0])) if edit_id != "New Setting" else None | |
with st.form("email_setting_form"): | |
name = st.text_input("Name", value=edit_setting.name if edit_setting else "", placeholder="e.g., Company Gmail") | |
email = st.text_input("Email", value=edit_setting.email if edit_setting else "", placeholder="[email protected]") | |
provider = st.selectbox("Provider", ["smtp", "ses"], index=0 if edit_setting and edit_setting.provider == "smtp" else 1) | |
if provider == "smtp": | |
smtp_server = st.text_input("SMTP Server", value=edit_setting.smtp_server if edit_setting else "", placeholder="smtp.gmail.com") | |
smtp_port = st.number_input("SMTP Port", min_value=1, max_value=65535, value=edit_setting.smtp_port if edit_setting else 587) | |
smtp_username = st.text_input("SMTP Username", value=edit_setting.smtp_username if edit_setting else "", placeholder="[email protected]") | |
smtp_password = st.text_input("SMTP Password", type="password", value=edit_setting.smtp_password if edit_setting else "", placeholder="Your SMTP password") | |
else: | |
aws_access_key_id = st.text_input("AWS Access Key ID", value=edit_setting.aws_access_key_id if edit_setting else "", placeholder="AKIAIOSFODNN7EXAMPLE") | |
aws_secret_access_key = st.text_input("AWS Secret Access Key", type="password", value=edit_setting.aws_secret_access_key if edit_setting else "", placeholder="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") | |
aws_region = st.text_input("AWS Region", value=edit_setting.aws_region if edit_setting else "", placeholder="us-west-2") | |
if st.form_submit_button("Save Email Setting"): | |
setting_data = {k: v for k, v in locals().items() if k in ['name', 'email', 'provider', 'smtp_server', 'smtp_port', 'smtp_username', 'smtp_password', 'aws_access_key_id', 'aws_secret_access_key', 'aws_region'] and v is not None} | |
try: | |
if edit_setting: | |
for k, v in setting_data.items(): | |
setattr(edit_setting, k, v) | |
else: | |
new_setting = EmailSettings(**setting_data) | |
session.add(new_setting) | |
session.commit() | |
st.success("Email setting saved successfully!") | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error saving email setting: {str(e)}") | |
session.rollback() | |
def send_email_ses(session, from_email, to_email, subject, body, charset='UTF-8', reply_to=None, ses_client=None): | |
email_settings = session.query(EmailSettings).filter_by(email=from_email).first() | |
if not email_settings: | |
logging.error(f"No email settings found for {from_email}") | |
return None, None | |
tracking_id = str(uuid.uuid4()) | |
tracking_pixel_url = f"https://autoclient-email-analytics.trigox.workers.dev/track?{urlencode({'id': tracking_id, 'type': 'open'})}" | |
wrapped_body = wrap_email_body(body) | |
tracked_body = wrapped_body.replace('</body>', f'<img src="{tracking_pixel_url}" width="1" height="1" style="display:none;"/></body>') | |
soup = BeautifulSoup(tracked_body, 'html.parser') | |
for a in soup.find_all('a', href=True): | |
original_url = a['href'] | |
tracked_url = f"https://autoclient-email-analytics.trigox.workers.dev/track?{urlencode({'id': tracking_id, 'type': 'click', 'url': original_url})}" | |
a['href'] = tracked_url | |
tracked_body = str(soup) | |
try: | |
test_response = requests.get(f"https://autoclient-email-analytics.trigox.workers.dev/test", timeout=5) | |
if test_response.status_code != 200: | |
logging.warning("Analytics worker is down. Using original URLs.") | |
tracked_body = wrapped_body | |
except requests.RequestException: | |
logging.warning("Failed to reach analytics worker. Using original URLs.") | |
tracked_body = wrapped_body | |
try: | |
if email_settings.provider == 'ses': | |
if ses_client is None: | |
aws_session = boto3.Session( | |
aws_access_key_id=email_settings.aws_access_key_id, | |
aws_secret_access_key=email_settings.aws_secret_access_key, | |
region_name=email_settings.aws_region | |
) | |
ses_client = aws_session.client('ses') | |
response = ses_client.send_email( | |
Source=from_email, | |
Destination={'ToAddresses': [to_email]}, | |
Message={ | |
'Subject': {'Data': subject, 'Charset': charset}, | |
'Body': {'Html': {'Data': tracked_body, 'Charset': charset}} | |
}, | |
ReplyToAddresses=[reply_to] if reply_to else [] | |
) | |
return response, tracking_id | |
elif email_settings.provider == 'smtp': | |
msg = MIMEMultipart() | |
msg['From'] = from_email | |
msg['To'] = to_email | |
msg['Subject'] = subject | |
if reply_to: | |
msg['Reply-To'] = reply_to | |
msg.attach(MIMEText(tracked_body, 'html')) | |
with smtplib.SMTP(email_settings.smtp_server, email_settings.smtp_port) as server: | |
server.starttls() | |
server.login(email_settings.smtp_username, email_settings.smtp_password) | |
server.send_message(msg) | |
return {'MessageId': f'smtp-{uuid.uuid4()}'}, tracking_id | |
else: | |
logging.error(f"Unknown email provider: {email_settings.provider}") | |
return None, None | |
except Exception as e: | |
logging.error(f"Error sending email: {str(e)}") | |
return None, None | |
def save_email_campaign(session, lead_email, template_id, status, sent_at, subject, message_id, email_body): | |
try: | |
lead = session.query(Lead).filter_by(email=lead_email).first() | |
if not lead: | |
logging.error(f"Lead with email {lead_email} not found.") | |
return | |
new_campaign = EmailCampaign( | |
lead_id=lead.id, | |
template_id=template_id, | |
status=status, | |
sent_at=sent_at, | |
customized_subject=subject or "No subject", | |
message_id=message_id or f"unknown-{uuid.uuid4()}", | |
customized_content=email_body or "No content", | |
campaign_id=get_active_campaign_id(), | |
tracking_id=str(uuid.uuid4()) | |
) | |
session.add(new_campaign) | |
session.commit() | |
except Exception as e: | |
logging.error(f"Error saving email campaign: {str(e)}") | |
session.rollback() | |
return None | |
return new_campaign | |
def update_log(log_container, message, level='info'): | |
icon = {'info': 'π΅', 'success': 'π’', 'warning': 'π ', 'error': 'π΄', 'email_sent': 'π£'}.get(level, 'βͺ') | |
log_entry = f"{icon} {message}" | |
# Simple console logging without HTML | |
print(f"{icon} {message.split('<')[0]}") # Only print the first part of the message before any HTML tags | |
if 'log_entries' not in st.session_state: | |
st.session_state.log_entries = [] | |
# HTML-formatted log entry for Streamlit display | |
html_log_entry = f"{icon} {message}" | |
st.session_state.log_entries.append(html_log_entry) | |
# Update the Streamlit display with all logs | |
log_html = f"<div style='height: 300px; overflow-y: auto; font-family: monospace; font-size: 0.8em; line-height: 1.2;'>{'<br>'.join(st.session_state.log_entries)}</div>" | |
log_container.markdown(log_html, unsafe_allow_html=True) | |
def optimize_search_term(search_term, language): | |
if language == 'english': | |
return f'"{search_term}" email OR contact OR "get in touch" site:.com' | |
elif language == 'spanish': | |
return f'"{search_term}" correo OR contacto OR "ponte en contacto" site:.es' | |
return search_term | |
def shuffle_keywords(term): | |
words = term.split() | |
random.shuffle(words) | |
return ' '.join(words) | |
def get_domain_from_url(url): | |
return urlparse(url).netloc | |
def is_valid_email(email): | |
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$' | |
return re.match(pattern, email) is not None | |
def extract_emails_from_html(html_content): | |
pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
return re.findall(pattern, html_content) | |
def extract_info_from_page(soup): | |
name = soup.find('meta', {'name': 'author'}) | |
name = name['content'] if name else '' | |
company = soup.find('meta', {'property': 'og:site_name'}) | |
company = company['content'] if company else '' | |
job_title = soup.find('meta', {'name': 'job_title'}) | |
job_title = job_title['content'] if job_title else '' | |
return name, company, job_title | |
def manual_search(session, terms, num_results, ignore_previously_fetched=True, optimize_english=False, optimize_spanish=False, shuffle_keywords_option=False, language='ES', enable_email_sending=True, log_container=None, from_email=None, reply_to=None, email_template=None): | |
ua, results, total_leads, domains_processed = UserAgent(), [], 0, set() | |
processed_emails_per_domain = {} # Track processed emails per domain | |
for original_term in terms: | |
try: | |
search_term_id = add_or_get_search_term(session, original_term, get_active_campaign_id()) | |
search_term = shuffle_keywords(original_term) if shuffle_keywords_option else original_term | |
search_term = optimize_search_term(search_term, 'english' if optimize_english else 'spanish') if optimize_english or optimize_spanish else search_term | |
update_log(log_container, f"Searching for '{original_term}' (Used '{search_term}')") | |
for url in google_search(search_term, num_results, lang=language): | |
domain = get_domain_from_url(url) | |
if ignore_previously_fetched and domain in domains_processed: | |
update_log(log_container, f"Skipping Previously Fetched: {domain}", 'warning') | |
continue | |
update_log(log_container, f"Fetching: {url}") | |
try: | |
if not url.startswith(('http://', 'https://')): | |
url = 'http://' + url | |
response = requests.get(url, timeout=10, verify=False, headers={'User-Agent': ua.random}) | |
response.raise_for_status() | |
html_content, soup = response.text, BeautifulSoup(response.text, 'html.parser') | |
# Extract all emails from the page | |
emails = extract_emails_from_html(html_content) | |
valid_emails = [email for email in emails if is_valid_email(email)] | |
update_log(log_container, f"Found {len(valid_emails)} valid email(s) on {url}", 'success') | |
if not valid_emails: | |
continue | |
# Extract page info once for all leads from this URL | |
name, company, job_title = extract_info_from_page(soup) | |
page_title = get_page_title(html_content) | |
page_description = get_page_description(html_content) | |
# Initialize set for this domain if not exists | |
if domain not in processed_emails_per_domain: | |
processed_emails_per_domain[domain] = set() | |
# Process each valid email from this URL | |
for email in valid_emails: | |
# Skip if we've already processed this email for this domain | |
if email in processed_emails_per_domain[domain]: | |
continue | |
processed_emails_per_domain[domain].add(email) | |
lead = save_lead(session, email=email, first_name=name, company=company, job_title=job_title, url=url, search_term_id=search_term_id, created_at=datetime.utcnow()) | |
if lead: | |
total_leads += 1 | |
results.append({ | |
'Email': email, | |
'URL': url, | |
'Lead Source': original_term, | |
'Title': page_title, | |
'Description': page_description, | |
'Tags': [], | |
'Name': name, | |
'Company': company, | |
'Job Title': job_title, | |
'Search Term ID': search_term_id | |
}) | |
update_log(log_container, f"Saved lead: {email}", 'success') | |
if enable_email_sending: | |
if not from_email or not email_template: | |
update_log(log_container, "Email sending is enabled but from_email or email_template is not provided", 'error') | |
continue | |
template = session.query(EmailTemplate).filter_by(id=int(email_template.split(":")[0])).first() | |
if not template: | |
update_log(log_container, "Email template not found", 'error') | |
continue | |
wrapped_content = wrap_email_body(template.body_content) | |
response, tracking_id = send_email_ses(session, from_email, email, template.subject, wrapped_content, reply_to=reply_to) | |
if response: | |
update_log(log_container, f"Sent email to: {email}", 'email_sent') | |
save_email_campaign(session, email, template.id, 'Sent', datetime.utcnow(), template.subject, response['MessageId'], wrapped_content) | |
else: | |
update_log(log_container, f"Failed to send email to: {email}", 'error') | |
save_email_campaign(session, email, template.id, 'Failed', datetime.utcnow(), template.subject, None, wrapped_content) | |
# Add domain to processed list after processing all its emails | |
domains_processed.add(domain) | |
except requests.RequestException as e: | |
update_log(log_container, f"Error processing URL {url}: {str(e)}", 'error') | |
except Exception as e: | |
update_log(log_container, f"Error processing term '{original_term}': {str(e)}", 'error') | |
update_log(log_container, f"Total leads found: {total_leads}", 'info') | |
return {"total_leads": total_leads, "results": results} | |
def generate_or_adjust_email_template(prompt, kb_info=None, current_template=None): | |
messages = [ | |
{"role": "system", "content": "You are an AI assistant specializing in creating and refining email templates for marketing campaigns. Always respond with a JSON object containing 'subject' and 'body' keys. The 'body' should contain HTML formatted content suitable for insertion into an email body."}, | |
{"role": "user", "content": f"""{'Adjust the following email template based on the given instructions:' if current_template else 'Create an email template based on the following prompt:'} {prompt} | |
{'Current Template:' if current_template else 'Guidelines:'} | |
{current_template if current_template else '1. Focus on benefits to the reader, address potential customer doubts, include clear CTAs, use a natural tone, and be concise.'} | |
Respond with a JSON object containing 'subject' and 'body' keys. The 'body' should contain HTML formatted content suitable for insertion into an email body. | |
Follow these guidelines: | |
1. Use proper HTML tags for structuring the email content (e.g., <p>, <h1>, <h2>, etc.). | |
2. Include inline CSS for styling where appropriate. | |
3. Ensure the content is properly structured and easy to read. | |
4. Include a call-to-action button or link with appropriate styling. | |
5. Make the design responsive for various screen sizes. | |
6. Do not include <html>, <head>, or <body> tags. | |
Example structure: | |
{{ | |
"subject": "Your compelling subject line here", | |
"body": "<h1>Welcome!</h1><p>Your email content here...</p><a href='#' style='display: inline-block; padding: 10px 20px; background-color: #007bff; color: white; text-decoration: none; border-radius: 5px;'>Call to Action</a>" | |
}}"""} | |
] | |
if kb_info: | |
messages.append({"role": "user", "content": f"Consider this knowledge base information: {json.dumps(kb_info)}"}) | |
response = openai_chat_completion(messages, function_name="generate_or_adjust_email_template") | |
if isinstance(response, str): | |
try: | |
return json.loads(response) | |
except json.JSONDecodeError: | |
return { | |
"subject": "AI Generated Subject", | |
"body": f"<p>{response}</p>" | |
} | |
elif isinstance(response, dict): | |
return response | |
else: | |
return {"subject": "", "body": "<p>Failed to generate email content.</p>"} | |
def fetch_leads_with_sources(session): | |
try: | |
query = session.query(Lead, func.string_agg(LeadSource.url, ', ').label('sources'), func.max(EmailCampaign.sent_at).label('last_contact'), func.string_agg(EmailCampaign.status, ', ').label('email_statuses')).outerjoin(LeadSource).outerjoin(EmailCampaign).group_by(Lead.id) | |
return pd.DataFrame([{**{k: getattr(lead, k) for k in ['id', 'email', 'first_name', 'last_name', 'company', 'job_title', 'created_at']}, 'Source': sources, 'Last Contact': last_contact, 'Last Email Status': email_statuses.split(', ')[-1] if email_statuses else 'Not Contacted', 'Delete': False} for lead, sources, last_contact, email_statuses in query.all()]) | |
except SQLAlchemyError as e: | |
logging.error(f"Database error in fetch_leads_with_sources: {str(e)}") | |
return pd.DataFrame() | |
def fetch_search_terms_with_lead_count(session): | |
query = session.query(SearchTerm.term, func.count(distinct(Lead.id)).label('lead_count'), func.count(distinct(EmailCampaign.id)).label('email_count')).join(LeadSource, SearchTerm.id == LeadSource.search_term_id).join(Lead, LeadSource.lead_id == Lead.id).outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id).group_by(SearchTerm.term) | |
return pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) | |
def add_search_term(session, term, campaign_id): | |
try: | |
new_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=datetime.utcnow()) | |
session.add(new_term) | |
session.commit() | |
return new_term.id | |
except SQLAlchemyError as e: | |
session.rollback() | |
logging.error(f"Error adding search term: {str(e)}") | |
raise | |
def update_search_term_group(session, group_id, updated_terms): | |
try: | |
# Get IDs of selected terms | |
selected_term_ids = [int(term.split(':')[0]) for term in updated_terms] | |
# Update all terms that should be in this group | |
session.query(SearchTerm)\ | |
.filter(SearchTerm.id.in_(selected_term_ids))\ | |
.update({SearchTerm.group_id: group_id}, synchronize_session=False) | |
# Remove group_id from terms that were unselected | |
session.query(SearchTerm)\ | |
.filter(SearchTerm.group_id == group_id)\ | |
.filter(~SearchTerm.id.in_(selected_term_ids))\ | |
.update({SearchTerm.group_id: None}, synchronize_session=False) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error in update_search_term_group: {str(e)}") | |
raise | |
def add_new_search_term(session, new_term, campaign_id, group_for_new_term): | |
try: | |
group_id = None | |
if group_for_new_term != "None": | |
group_id = int(group_for_new_term.split(':')[0]) | |
new_search_term = SearchTerm( | |
term=new_term, | |
campaign_id=campaign_id, | |
group_id=group_id, | |
created_at=datetime.utcnow() | |
) | |
session.add(new_search_term) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error adding search term: {str(e)}") | |
raise | |
def ai_group_search_terms(session, ungrouped_terms): | |
existing_groups = session.query(SearchTermGroup).all() | |
prompt = f"Categorize these search terms into existing groups or suggest new ones:\n{', '.join([term.term for term in ungrouped_terms])}\n\nExisting groups: {', '.join([group.name for group in existing_groups])}\n\nRespond with a JSON object: {{group_name: [term1, term2, ...]}}" | |
messages = [{"role": "system", "content": "You're an AI that categorizes search terms for lead generation. Be concise and efficient."}, {"role": "user", "content": prompt}] | |
response = openai_chat_completion(messages, function_name="ai_group_search_terms") | |
return response if isinstance(response, dict) else {} | |
def update_search_term_groups(session, grouped_terms): | |
for group_name, terms in grouped_terms.items(): | |
group = session.query(SearchTermGroup).filter_by(name=group_name).first() or SearchTermGroup(name=group_name) | |
if not group.id: session.add(group); session.flush() | |
for term in terms: | |
search_term = session.query(SearchTerm).filter_by(term=term).first() | |
if search_term: search_term.group_id = group.id | |
session.commit() | |
def create_search_term_group(session, group_name): | |
try: | |
session.add(SearchTermGroup(name=group_name)) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error creating search term group: {str(e)}") | |
def delete_search_term_group(session, group_id): | |
try: | |
group = session.query(SearchTermGroup).get(group_id) | |
if group: | |
session.query(SearchTerm).filter(SearchTerm.group_id == group_id).update({SearchTerm.group_id: None}) | |
session.delete(group) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error deleting search term group: {str(e)}") | |
def ai_automation_loop(session, log_container, leads_container): | |
automation_logs, total_search_terms, total_emails_sent = [], 0, 0 | |
while st.session_state.get('automation_status', False): | |
try: | |
log_container.info("Starting automation cycle") | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
if not kb_info: | |
log_container.warning("Knowledge Base not found. Skipping cycle.") | |
time.sleep(3600) | |
continue | |
base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) | |
st.subheader("Optimized Search Terms") | |
st.write(", ".join(optimized_terms)) | |
total_search_terms = len(optimized_terms) | |
progress_bar = st.progress(0) | |
for idx, term in enumerate(optimized_terms): | |
results = manual_search(session, [term], 10, ignore_previously_fetched=True) | |
new_leads = [] | |
for res in results['results']: | |
lead = save_lead(session, res['Email'], url=res['URL']) | |
if lead: | |
new_leads.append((lead.id, lead.email)) | |
if new_leads: | |
template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() | |
if template: | |
from_email = kb_info.get('contact_email') or '[email protected]' | |
reply_to = kb_info.get('contact_email') or '[email protected]' | |
logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) | |
automation_logs.extend(logs) | |
total_emails_sent += sent_count | |
leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) | |
progress_bar.progress((idx + 1) / len(optimized_terms)) | |
st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") | |
time.sleep(3600) | |
except Exception as e: | |
log_container.error(f"Critical error in automation cycle: {str(e)}") | |
time.sleep(300) | |
log_container.info("Automation stopped") | |
st.session_state.automation_logs = automation_logs | |
st.session_state.total_leads_found = total_search_terms | |
st.session_state.total_emails_sent = total_emails_sent | |
def openai_chat_completion(messages, temperature=0.7, function_name=None, lead_id=None, email_campaign_id=None): | |
with db_session() as session: | |
general_settings = session.query(Settings).filter_by(setting_type='general').first() | |
if not general_settings or 'openai_api_key' not in general_settings.value: | |
st.error("OpenAI API key not set. Please configure it in the settings.") | |
return None | |
client = OpenAI(api_key=general_settings.value['openai_api_key']) | |
model = general_settings.value.get('openai_model', "gpt-4o-mini") | |
try: | |
response = client.chat.completions.create( | |
model=model, | |
messages=messages, | |
temperature=temperature | |
) | |
result = response.choices[0].message.content | |
with db_session() as session: | |
log_ai_request(session, function_name, messages, result, lead_id, email_campaign_id, model) | |
try: | |
return json.loads(result) | |
except json.JSONDecodeError: | |
return result | |
except Exception as e: | |
st.error(f"Error in OpenAI API call: {str(e)}") | |
with db_session() as session: | |
log_ai_request(session, function_name, messages, str(e), lead_id, email_campaign_id, model) | |
return None | |
def log_ai_request(session, function_name, prompt, response, lead_id=None, email_campaign_id=None, model_used=None): | |
session.add(AIRequestLog( | |
function_name=function_name, | |
prompt=json.dumps(prompt), | |
response=json.dumps(response) if response else None, | |
lead_id=lead_id, | |
email_campaign_id=email_campaign_id, | |
model_used=model_used | |
)) | |
session.commit() | |
def save_lead(session, email, first_name=None, last_name=None, company=None, job_title=None, phone=None, url=None, search_term_id=None, created_at=None): | |
try: | |
existing_lead = session.query(Lead).filter_by(email=email).first() | |
if existing_lead: | |
for attr in ['first_name', 'last_name', 'company', 'job_title', 'phone', 'created_at']: | |
if locals()[attr]: setattr(existing_lead, attr, locals()[attr]) | |
lead = existing_lead | |
else: | |
lead = Lead(email=email, first_name=first_name, last_name=last_name, company=company, job_title=job_title, phone=phone, created_at=created_at or datetime.utcnow()) | |
session.add(lead) | |
session.flush() | |
lead_source = LeadSource(lead_id=lead.id, url=url, search_term_id=search_term_id) | |
session.add(lead_source) | |
campaign_lead = CampaignLead(campaign_id=get_active_campaign_id(), lead_id=lead.id, status="Not Contacted", created_at=datetime.utcnow()) | |
session.add(campaign_lead) | |
session.commit() | |
return lead | |
except Exception as e: | |
logging.error(f"Error saving lead: {str(e)}") | |
session.rollback() | |
return None | |
def save_lead_source(session, lead_id, search_term_id, url, http_status, scrape_duration, page_title=None, meta_description=None, content=None, tags=None, phone_numbers=None): | |
session.add(LeadSource(lead_id=lead_id, search_term_id=search_term_id, url=url, http_status=http_status, scrape_duration=scrape_duration, page_title=page_title or get_page_title(url), meta_description=meta_description or get_page_description(url), content=content or extract_visible_text(BeautifulSoup(requests.get(url).text, 'html.parser')), tags=tags, phone_numbers=phone_numbers)) | |
session.commit() | |
def get_page_title(url): | |
try: | |
response = requests.get(url, timeout=10) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
title = soup.title.string if soup.title else "No title found" | |
return title.strip() | |
except Exception as e: | |
logging.error(f"Error getting page title for {url}: {str(e)}") | |
return "Error fetching title" | |
def extract_visible_text(soup): | |
for script in soup(["script", "style"]): | |
script.extract() | |
text = soup.get_text() | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
return ' '.join(chunk for chunk in chunks if chunk) | |
def log_search_term_effectiveness(session, term, total_results, valid_leads, blogs_found, directories_found): | |
session.add(SearchTermEffectiveness(term=term, total_results=total_results, valid_leads=valid_leads, irrelevant_leads=total_results - valid_leads, blogs_found=blogs_found, directories_found=directories_found)) | |
session.commit() | |
get_active_project_id = lambda: st.session_state.get('active_project_id', 1) | |
get_active_campaign_id = lambda: st.session_state.get('active_campaign_id', 1) | |
set_active_project_id = lambda project_id: st.session_state.__setitem__('active_project_id', project_id) | |
set_active_campaign_id = lambda campaign_id: st.session_state.__setitem__('active_campaign_id', campaign_id) | |
def add_or_get_search_term(session, term, campaign_id, created_at=None): | |
search_term = session.query(SearchTerm).filter_by(term=term, campaign_id=campaign_id).first() | |
if not search_term: | |
search_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=created_at or datetime.utcnow()) | |
session.add(search_term) | |
session.commit() | |
session.refresh(search_term) | |
return search_term.id | |
def fetch_campaigns(session): | |
return [f"{camp.id}: {camp.campaign_name}" for camp in session.query(Campaign).all()] | |
def fetch_projects(session): | |
return [f"{project.id}: {project.project_name}" for project in session.query(Project).all()] | |
def fetch_email_templates(session): | |
return [f"{t.id}: {t.template_name}" for t in session.query(EmailTemplate).all()] | |
def create_or_update_email_template(session, template_name, subject, body_content, template_id=None, is_ai_customizable=False, created_at=None, language='ES'): | |
template = session.query(EmailTemplate).filter_by(id=template_id).first() if template_id else EmailTemplate(template_name=template_name, subject=subject, body_content=body_content, is_ai_customizable=is_ai_customizable, campaign_id=get_active_campaign_id(), created_at=created_at or datetime.utcnow()) | |
if template_id: template.template_name, template.subject, template.body_content, template.is_ai_customizable = template_name, subject, body_content, is_ai_customizable | |
template.language = language | |
session.add(template) | |
session.commit() | |
return template.id | |
safe_datetime_compare = lambda date1, date2: False if date1 is None or date2 is None else date1 > date2 | |
def fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted): | |
try: | |
query = session.query(Lead) | |
if send_option == "Specific Email": | |
query = query.filter(Lead.email == specific_email) | |
elif send_option in ["Leads from Chosen Search Terms", "Leads from Search Term Groups"] and selected_terms: | |
query = query.join(LeadSource).join(SearchTerm).filter(SearchTerm.term.in_(selected_terms)) | |
if exclude_previously_contacted: | |
subquery = session.query(EmailCampaign.lead_id).filter(EmailCampaign.sent_at.isnot(None)).subquery() | |
query = query.outerjoin(subquery, Lead.id == subquery.c.lead_id).filter(subquery.c.lead_id.is_(None)) | |
return [{"Email": lead.email, "ID": lead.id} for lead in query.all()] | |
except Exception as e: | |
logging.error(f"Error fetching leads: {str(e)}") | |
return [] | |
def update_display(container, items, title, item_key): | |
container.markdown( | |
f""" | |
<style> | |
.container {{ | |
max-height: 400px; | |
overflow-y: auto; | |
border: 1px solid rgba(49, 51, 63, 0.2); | |
border-radius: 0.25rem; | |
padding: 1rem; | |
background-color: rgba(49, 51, 63, 0.1); | |
}} | |
.entry {{ | |
margin-bottom: 0.5rem; | |
padding: 0.5rem; | |
background-color: rgba(255, 255, 255, 0.1); | |
border-radius: 0.25rem; | |
}} | |
</style> | |
<div class="container"> | |
<h4>{title} ({len(items)})</h4> | |
{"".join(f'<div class="entry">{item[item_key]}</div>' for item in items[-20:])} | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
def get_domain_from_url(url): return urlparse(url).netloc | |
def manual_search_page(): | |
st.title("Manual Search") | |
with db_session() as session: | |
# Fetch recent searches within the session | |
recent_searches = session.query(SearchTerm).order_by(SearchTerm.created_at.desc()).limit(5).all() | |
# Materialize the terms within the session | |
recent_search_terms = [term.term for term in recent_searches] | |
email_templates = fetch_email_templates(session) | |
email_settings = fetch_email_settings(session) | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
search_terms = st_tags( | |
label='Enter search terms:', | |
text='Press enter to add more', | |
value=recent_search_terms, | |
suggestions=['software engineer', 'data scientist', 'product manager'], | |
maxtags=10, | |
key='search_terms_input' | |
) | |
num_results = st.slider("Results per term", 1, 50000, 10) | |
with col2: | |
enable_email_sending = st.checkbox("Enable email sending", value=True) | |
ignore_previously_fetched = st.checkbox("Ignore fetched domains", value=True) | |
shuffle_keywords_option = st.checkbox("Shuffle Keywords", value=True) | |
optimize_english = st.checkbox("Optimize (English)", value=False) | |
optimize_spanish = st.checkbox("Optimize (Spanish)", value=False) | |
language = st.selectbox("Select Language", options=["ES", "EN"], index=0) | |
if enable_email_sending: | |
if not email_templates: | |
st.error("No email templates available. Please create a template first.") | |
return | |
if not email_settings: | |
st.error("No email settings available. Please add email settings first.") | |
return | |
col3, col4 = st.columns(2) | |
with col3: | |
email_template = st.selectbox("Email template", options=email_templates, format_func=lambda x: x.split(":")[1].strip()) | |
with col4: | |
email_setting_option = st.selectbox("From Email", options=email_settings, format_func=lambda x: f"{x['name']} ({x['email']})") | |
if email_setting_option: | |
from_email = email_setting_option['email'] | |
reply_to = st.text_input("Reply To", email_setting_option['email']) | |
else: | |
st.error("No email setting selected. Please select an email setting.") | |
return | |
if st.button("Search"): | |
if not search_terms: | |
return st.warning("Enter at least one search term.") | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
email_status = st.empty() | |
results = [] | |
leads_container = st.empty() | |
leads_found, emails_sent = [], [] | |
# Create a single log container for all search terms | |
log_container = st.empty() | |
for i, term in enumerate(search_terms): | |
status_text.text(f"Searching: '{term}' ({i+1}/{len(search_terms)})") | |
with db_session() as session: | |
term_results = manual_search(session, [term], num_results, ignore_previously_fetched, optimize_english, optimize_spanish, shuffle_keywords_option, language, enable_email_sending, log_container, from_email, reply_to, email_template) | |
results.extend(term_results['results']) | |
leads_found.extend([f"{res['Email']} - {res['Company']}" for res in term_results['results']]) | |
if enable_email_sending: | |
template = session.query(EmailTemplate).filter_by(id=int(email_template.split(":")[0])).first() | |
for result in term_results['results']: | |
if not result or 'Email' not in result or not is_valid_email(result['Email']): | |
status_text.text(f"Skipping invalid result or email: {result.get('Email') if result else 'None'}") | |
continue | |
wrapped_content = wrap_email_body(template.body_content) | |
response, tracking_id = send_email_ses(session, from_email, result['Email'], template.subject, wrapped_content, reply_to=reply_to) | |
if response: | |
save_email_campaign(session, result['Email'], template.id, 'sent', datetime.utcnow(), template.subject, response.get('MessageId', 'Unknown'), template.body_content) | |
emails_sent.append(f"β {result['Email']}") | |
status_text.text(f"Email sent to: {result['Email']}") | |
else: | |
save_email_campaign(session, result['Email'], template.id, 'failed', datetime.utcnow(), template.subject, None, template.body_content) | |
emails_sent.append(f"β {result['Email']}") | |
status_text.text(f"Failed to send email to: {result['Email']}") | |
leads_container.dataframe(pd.DataFrame({"Leads Found": leads_found, "Emails Sent": emails_sent + [""] * (len(leads_found) - len(emails_sent))})) | |
progress_bar.progress((i + 1) / len(search_terms)) | |
# Display final results | |
st.subheader("Search Results") | |
st.dataframe(pd.DataFrame(results)) | |
if enable_email_sending: | |
st.subheader("Email Sending Results") | |
success_rate = sum(1 for email in emails_sent if email.startswith("β ")) / len(emails_sent) if emails_sent else 0 | |
st.metric("Email Sending Success Rate", f"{success_rate:.2%}") | |
st.download_button( | |
label="Download CSV", | |
data=pd.DataFrame(results).to_csv(index=False).encode('utf-8'), | |
file_name="search_results.csv", | |
mime="text/csv", | |
) | |
# Update other functions that might be accessing detached objects | |
def fetch_search_terms_with_lead_count(session): | |
query = (session.query(SearchTerm.term, | |
func.count(distinct(Lead.id)).label('lead_count'), | |
func.count(distinct(EmailCampaign.id)).label('email_count')) | |
.join(LeadSource, SearchTerm.id == LeadSource.search_term_id) | |
.join(Lead, LeadSource.lead_id == Lead.id) | |
.outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) | |
.group_by(SearchTerm.term)) | |
df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) | |
return df | |
def ai_automation_loop(session, log_container, leads_container): | |
automation_logs, total_search_terms, total_emails_sent = [], 0, 0 | |
while st.session_state.get('automation_status', False): | |
try: | |
log_container.info("Starting automation cycle") | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
if not kb_info: | |
log_container.warning("Knowledge Base not found. Skipping cycle.") | |
time.sleep(3600) | |
continue | |
base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) | |
st.subheader("Optimized Search Terms") | |
st.write(", ".join(optimized_terms)) | |
total_search_terms = len(optimized_terms) | |
progress_bar = st.progress(0) | |
for idx, term in enumerate(optimized_terms): | |
results = manual_search(session, [term], 10, ignore_previously_fetched=True) | |
new_leads = [] | |
for res in results['results']: | |
lead = save_lead(session, res['Email'], url=res['URL']) | |
if lead: | |
new_leads.append((lead.id, lead.email)) | |
if new_leads: | |
template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() | |
if template: | |
from_email = kb_info.get('contact_email') or '[email protected]' | |
reply_to = kb_info.get('contact_email') or '[email protected]' | |
logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) | |
automation_logs.extend(logs) | |
total_emails_sent += sent_count | |
leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) | |
progress_bar.progress((idx + 1) / len(optimized_terms)) | |
st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") | |
time.sleep(3600) | |
except Exception as e: | |
log_container.error(f"Critical error in automation cycle: {str(e)}") | |
time.sleep(300) | |
log_container.info("Automation stopped") | |
st.session_state.automation_logs = automation_logs | |
st.session_state.total_leads_found = total_search_terms | |
st.session_state.total_emails_sent = total_emails_sent | |
# Make sure all database operations are performed within a session context | |
def get_knowledge_base_info(session, project_id): | |
kb_info = session.query(KnowledgeBase).filter_by(project_id=project_id).first() | |
return kb_info.to_dict() if kb_info else None | |
def shuffle_keywords(term): | |
words = term.split() | |
random.shuffle(words) | |
return ' '.join(words) | |
def get_page_description(html_content): | |
soup = BeautifulSoup(html_content, 'html.parser') | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
return meta_desc['content'] if meta_desc else "No description found" | |
def is_valid_email(email): | |
if email is None: return False | |
invalid_patterns = [ | |
r".*\.(png|jpg|jpeg|gif|css|js)$", | |
r"^(nr|bootstrap|jquery|core|icon-|noreply)@.*", | |
r"^(email|info|contact|support|hello|hola|hi|salutations|greetings|inquiries|questions)@.*", | |
r"^email@email\.com$", | |
r".*@example\.com$", | |
r".*@.*\.(png|jpg|jpeg|gif|css|js|jpga|PM|HL)$" | |
] | |
typo_domains = ["gmil.com", "gmal.com", "gmaill.com", "gnail.com"] | |
if any(re.match(pattern, email, re.IGNORECASE) for pattern in invalid_patterns): return False | |
if any(email.lower().endswith(f"@{domain}") for domain in typo_domains): return False | |
try: validate_email(email); return True | |
except EmailNotValidError: return False | |
def remove_invalid_leads(session): | |
invalid_leads = session.query(Lead).filter( | |
~Lead.email.op('~')(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') | | |
Lead.email.op('~')(r'.*\.(png|jpg|jpeg|gif|css|js)$') | | |
Lead.email.op('~')(r'^(nr|bootstrap|jquery|core|icon-|noreply)@.*') | | |
Lead.email == '[email protected]' | | |
Lead.email.like('%@example.com') | | |
Lead.email.op('~')(r'.*@.*\.(png|jpg|jpeg|gif|css|js|jpga|PM|HL)$') | | |
Lead.email.like('%@gmil.com') | | |
Lead.email.like('%@gmal.com') | | |
Lead.email.like('%@gmaill.com') | | |
Lead.email.like('%@gnail.com') | |
).all() | |
for lead in invalid_leads: | |
session.query(LeadSource).filter(LeadSource.lead_id == lead.id).delete() | |
session.delete(lead) | |
session.commit() | |
return len(invalid_leads) | |
def perform_quick_scan(session): | |
with st.spinner("Performing quick scan..."): | |
terms = session.query(SearchTerm).order_by(func.random()).limit(3).all() | |
email_setting = fetch_email_settings(session)[0] if fetch_email_settings(session) else None | |
from_email = email_setting['email'] if email_setting else None | |
reply_to = from_email | |
email_template = session.query(EmailTemplate).first() | |
res = manual_search(session, [term.term for term in terms], 10, True, False, False, True, "EN", True, st.empty(), from_email, reply_to, f"{email_template.id}: {email_template.template_name}" if email_template else None) | |
st.success(f"Quick scan completed! Found {len(res['results'])} new leads.") | |
return {"new_leads": len(res['results']), "terms_used": [term.term for term in terms]} | |
def bulk_send_emails(session, template_id, from_email, reply_to, leads, progress_bar=None, status_text=None, results=None, log_container=None): | |
template = session.query(EmailTemplate).filter_by(id=template_id).first() | |
if not template: | |
logging.error(f"Email template with ID {template_id} not found.") | |
return [], 0 | |
email_subject = template.subject | |
email_content = template.body_content | |
logs, sent_count = [], 0 | |
total_leads = len(leads) | |
for index, lead in enumerate(leads): | |
try: | |
validate_email(lead['Email']) | |
response, tracking_id = send_email_ses(session, from_email, lead['Email'], email_subject, email_content, reply_to=reply_to) | |
if response: | |
status = 'sent' | |
message_id = response.get('MessageId', f"sent-{uuid.uuid4()}") | |
sent_count += 1 | |
log_message = f"β Email sent to: {lead['Email']}" | |
else: | |
status = 'failed' | |
message_id = f"failed-{uuid.uuid4()}" | |
log_message = f"β Failed to send email to: {lead['Email']}" | |
save_email_campaign(session, lead['Email'], template_id, status, datetime.utcnow(), email_subject, message_id, email_content) | |
logs.append(log_message) | |
if progress_bar: | |
progress_bar.progress((index + 1) / total_leads) | |
if status_text: | |
status_text.text(f"Processed {index + 1}/{total_leads} leads") | |
if results is not None: | |
results.append({"Email": lead['Email'], "Status": status}) | |
if log_container: | |
log_container.text(log_message) | |
except EmailNotValidError: | |
log_message = f"β Invalid email address: {lead['Email']}" | |
logs.append(log_message) | |
except Exception as e: | |
error_message = f"Error sending email to {lead['Email']}: {str(e)}" | |
logging.error(error_message) | |
save_email_campaign(session, lead['Email'], template_id, 'failed', datetime.utcnow(), email_subject, f"error-{uuid.uuid4()}", email_content) | |
logs.append(f"β Error sending email to: {lead['Email']} (Error: {str(e)})") | |
return logs, sent_count | |
def view_campaign_logs(): | |
st.header("Email Logs") | |
with db_session() as session: | |
logs = fetch_all_email_logs(session) | |
if logs.empty: | |
st.info("No email logs found.") | |
else: | |
st.write(f"Total emails sent: {len(logs)}") | |
st.write(f"Success rate: {(logs['Status'] == 'sent').mean():.2%}") | |
col1, col2 = st.columns(2) | |
with col1: | |
start_date = st.date_input("Start Date", value=logs['Sent At'].min().date()) | |
with col2: | |
end_date = st.date_input("End Date", value=logs['Sent At'].max().date()) | |
filtered_logs = logs[(logs['Sent At'].dt.date >= start_date) & (logs['Sent At'].dt.date <= end_date)] | |
search_term = st.text_input("Search by email or subject") | |
if search_term: | |
filtered_logs = filtered_logs[filtered_logs['Email'].str.contains(search_term, case=False) | | |
filtered_logs['Subject'].str.contains(search_term, case=False)] | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
st.metric("Emails Sent", len(filtered_logs)) | |
with col2: | |
st.metric("Unique Recipients", filtered_logs['Email'].nunique()) | |
with col3: | |
st.metric("Success Rate", f"{(filtered_logs['Status'] == 'sent').mean():.2%}") | |
daily_counts = filtered_logs.resample('D', on='Sent At')['Email'].count() | |
st.bar_chart(daily_counts) | |
st.subheader("Detailed Email Logs") | |
for _, log in filtered_logs.iterrows(): | |
with st.expander(f"{log['Sent At'].strftime('%Y-%m-%d %H:%M:%S')} - {log['Email']} - {log['Status']}"): | |
st.write(f"**Subject:** {log['Subject']}") | |
st.write(f"**Content Preview:** {log['Content'][:100]}...") | |
if st.button("View Full Email", key=f"view_email_{log['ID']}"): | |
st.components.v1.html(wrap_email_body(log['Content']), height=400, scrolling=True) | |
if log['Status'] != 'sent': | |
st.error(f"Status: {log['Status']}") | |
logs_per_page = 20 | |
total_pages = (len(filtered_logs) - 1) // logs_per_page + 1 | |
page = st.number_input("Page", min_value=1, max_value=total_pages, value=1) | |
start_idx = (page - 1) * logs_per_page | |
end_idx = start_idx + logs_per_page | |
st.table(filtered_logs.iloc[start_idx:end_idx][['Sent At', 'Email', 'Subject', 'Status']]) | |
if st.button("Export Logs to CSV"): | |
csv = filtered_logs.to_csv(index=False) | |
st.download_button( | |
label="Download CSV", | |
data=csv, | |
file_name="email_logs.csv", | |
mime="text/csv" | |
) | |
def fetch_all_email_logs(session): | |
try: | |
email_campaigns = session.query(EmailCampaign).join(Lead).join(EmailTemplate).options(joinedload(EmailCampaign.lead), joinedload(EmailCampaign.template)).order_by(EmailCampaign.sent_at.desc()).all() | |
return pd.DataFrame({ | |
'ID': [ec.id for ec in email_campaigns], | |
'Sent At': [ec.sent_at for ec in email_campaigns], | |
'Email': [ec.lead.email for ec in email_campaigns], | |
'Template': [ec.template.template_name for ec in email_campaigns], | |
'Subject': [ec.customized_subject or "No subject" for ec in email_campaigns], | |
'Content': [ec.customized_content or "No content" for ec in email_campaigns], | |
'Status': [ec.status for ec in email_campaigns], | |
'Message ID': [ec.message_id or "No message ID" for ec in email_campaigns], | |
'Campaign ID': [ec.campaign_id for ec in email_campaigns], | |
'Lead Name': [f"{ec.lead.first_name or ''} {ec.lead.last_name or ''}".strip() or "Unknown" for ec in email_campaigns], | |
'Lead Company': [ec.lead.company or "Unknown" for ec in email_campaigns] | |
}) | |
except SQLAlchemyError as e: | |
logging.error(f"Database error in fetch_all_email_logs: {str(e)}") | |
return pd.DataFrame() | |
def update_lead(session, lead_id, updated_data): | |
try: | |
lead = session.query(Lead).filter(Lead.id == lead_id).first() | |
if lead: | |
for key, value in updated_data.items(): | |
setattr(lead, key, value) | |
return True | |
except SQLAlchemyError as e: | |
logging.error(f"Error updating lead {lead_id}: {str(e)}") | |
session.rollback() | |
return False | |
def delete_lead(session, lead_id): | |
try: | |
lead = session.query(Lead).filter(Lead.id == lead_id).first() | |
if lead: | |
session.delete(lead) | |
return True | |
except SQLAlchemyError as e: | |
logging.error(f"Error deleting lead {lead_id}: {str(e)}") | |
session.rollback() | |
return False | |
def is_valid_email(email): | |
try: | |
validate_email(email) | |
return True | |
except EmailNotValidError: | |
return False | |
def view_leads_page(): | |
st.title("Lead Management Dashboard") | |
with db_session() as session: | |
if 'leads' not in st.session_state or st.button("Refresh Leads"): | |
st.session_state.leads = fetch_leads_with_sources(session) | |
if not st.session_state.leads.empty: | |
total_leads = len(st.session_state.leads) | |
contacted_leads = len(st.session_state.leads[st.session_state.leads['Last Contact'].notna()]) | |
conversion_rate = (st.session_state.leads['Last Email Status'] == 'sent').mean() | |
st.columns(3)[0].metric("Total Leads", f"{total_leads:,}") | |
st.columns(3)[1].metric("Contacted Leads", f"{contacted_leads:,}") | |
st.columns(3)[2].metric("Conversion Rate", f"{conversion_rate:.2%}") | |
st.subheader("Leads Table") | |
search_term = st.text_input("Search leads by email, name, company, or source") | |
filtered_leads = st.session_state.leads[st.session_state.leads.apply(lambda row: search_term.lower() in str(row).lower(), axis=1)] | |
leads_per_page, page_number = 20, st.number_input("Page", min_value=1, value=1) | |
start_idx, end_idx = (page_number - 1) * leads_per_page, page_number * leads_per_page | |
edited_df = st.data_editor( | |
filtered_leads.iloc[start_idx:end_idx], | |
column_config={ | |
"ID": st.column_config.NumberColumn("ID", disabled=True), | |
"Email": st.column_config.TextColumn("Email"), | |
"First Name": st.column_config.TextColumn("First Name"), | |
"Last Name": st.column_config.TextColumn("Last Name"), | |
"Company": st.column_config.TextColumn("Company"), | |
"Job Title": st.column_config.TextColumn("Job Title"), | |
"Source": st.column_config.TextColumn("Source", disabled=True), | |
"Last Contact": st.column_config.DatetimeColumn("Last Contact", disabled=True), | |
"Last Email Status": st.column_config.TextColumn("Last Email Status", disabled=True), | |
"Delete": st.column_config.CheckboxColumn("Delete") | |
}, | |
disabled=["ID", "Source", "Last Contact", "Last Email Status"], | |
hide_index=True, | |
num_rows="dynamic" | |
) | |
if st.button("Save Changes", type="primary"): | |
for index, row in edited_df.iterrows(): | |
if row['Delete']: | |
if delete_lead_and_sources(session, row['ID']): | |
st.success(f"Deleted lead: {row['Email']}") | |
else: | |
updated_data = {k: row[k] for k in ['Email', 'First Name', 'Last Name', 'Company', 'Job Title']} | |
if update_lead(session, row['ID'], updated_data): | |
st.success(f"Updated lead: {row['Email']}") | |
st.rerun() | |
st.download_button( | |
"Export Filtered Leads to CSV", | |
filtered_leads.to_csv(index=False).encode('utf-8'), | |
"exported_leads.csv", | |
"text/csv" | |
) | |
st.subheader("Lead Growth") | |
if 'Created At' in st.session_state.leads.columns: | |
lead_growth = st.session_state.leads.groupby(pd.to_datetime(st.session_state.leads['Created At']).dt.to_period('M')).size().cumsum() | |
st.line_chart(lead_growth) | |
else: | |
st.warning("Created At data is not available for lead growth chart.") | |
st.subheader("Email Campaign Performance") | |
email_status_counts = st.session_state.leads['Last Email Status'].value_counts() | |
st.plotly_chart(px.pie( | |
values=email_status_counts.values, | |
names=email_status_counts.index, | |
title="Distribution of Email Statuses" | |
), use_container_width=True) | |
else: | |
st.info("No leads available. Start by adding some leads to your campaigns.") | |
def fetch_leads_with_sources(session): | |
try: | |
query = session.query(Lead, func.string_agg(LeadSource.url, ', ').label('sources'), func.max(EmailCampaign.sent_at).label('last_contact'), func.string_agg(EmailCampaign.status, ', ').label('email_statuses')).outerjoin(LeadSource).outerjoin(EmailCampaign).group_by(Lead.id) | |
return pd.DataFrame([{**{k: getattr(lead, k) for k in ['id', 'email', 'first_name', 'last_name', 'company', 'job_title', 'created_at']}, 'Source': sources, 'Last Contact': last_contact, 'Last Email Status': email_statuses.split(', ')[-1] if email_statuses else 'Not Contacted', 'Delete': False} for lead, sources, last_contact, email_statuses in query.all()]) | |
except SQLAlchemyError as e: | |
logging.error(f"Database error in fetch_leads_with_sources: {str(e)}") | |
return pd.DataFrame() | |
def delete_lead_and_sources(session, lead_id): | |
try: | |
session.query(LeadSource).filter(LeadSource.lead_id == lead_id).delete() | |
lead = session.query(Lead).filter(Lead.id == lead_id).first() | |
if lead: | |
session.delete(lead) | |
return True | |
except SQLAlchemyError as e: | |
logging.error(f"Error deleting lead {lead_id} and its sources: {str(e)}") | |
session.rollback() | |
return False | |
def fetch_search_terms_with_lead_count(session): | |
query = (session.query(SearchTerm.term, | |
func.count(distinct(Lead.id)).label('lead_count'), | |
func.count(distinct(EmailCampaign.id)).label('email_count')) | |
.join(LeadSource, SearchTerm.id == LeadSource.search_term_id) | |
.join(Lead, LeadSource.lead_id == Lead.id) | |
.outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) | |
.group_by(SearchTerm.term)) | |
df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) | |
return df | |
def add_search_term(session, term, campaign_id): | |
try: | |
new_term = SearchTerm(term=term, campaign_id=campaign_id, created_at=datetime.utcnow()) | |
session.add(new_term) | |
session.commit() | |
return new_term.id | |
except SQLAlchemyError as e: | |
session.rollback() | |
logging.error(f"Error adding search term: {str(e)}") | |
raise | |
def get_active_campaign_id(): | |
return st.session_state.get('active_campaign_id', 1) | |
def search_terms_page(): | |
st.title("Search Terms Management") | |
with db_session() as session: | |
# Create new group section | |
with st.expander("Create New Group", expanded=False): | |
new_group_name = st.text_input("New Group Name") | |
if st.button("Create Group"): | |
if new_group_name.strip(): | |
create_search_term_group(session, new_group_name) | |
st.success(f"Group '{new_group_name}' created successfully!") | |
st.rerun() | |
else: | |
st.warning("Please enter a group name") | |
# Manage existing groups | |
groups = session.query(SearchTermGroup).all() | |
if groups: | |
st.subheader("Existing Groups") | |
for group in groups: | |
with st.expander(f"Group: {group.name}", expanded=False): | |
# Get all search terms | |
all_terms = session.query(SearchTerm).filter_by(campaign_id=get_active_campaign_id()).all() | |
# Get terms currently in this group | |
group_terms = [term for term in all_terms if term.group_id == group.id] | |
# Create options for multiselect | |
term_options = [f"{term.id}:{term.term}" for term in all_terms] | |
default_values = [f"{term.id}:{term.term}" for term in group_terms] | |
# Display multiselect for terms | |
selected_terms = st.multiselect( | |
"Select terms for this group", | |
options=term_options, | |
default=default_values, | |
format_func=lambda x: x.split(':')[1] | |
) | |
if st.button("Update Group", key=f"update_{group.id}"): | |
update_search_term_group(session, group.id, selected_terms) | |
st.success("Group updated successfully!") | |
st.rerun() | |
if st.button("Delete Group", key=f"delete_{group.id}"): | |
delete_search_term_group(session, group.id) | |
st.success("Group deleted successfully!") | |
st.rerun() | |
# Add new search terms section | |
st.subheader("Add New Search Term") | |
with st.form("add_search_term_form"): | |
new_term = st.text_input("New Search Term") | |
group_options = ["None"] + [f"{g.id}:{g.name}" for g in groups] | |
group_for_new_term = st.selectbox("Assign to Group", options=group_options) | |
if st.form_submit_button("Add Term"): | |
if new_term.strip(): | |
add_new_search_term(session, new_term, get_active_campaign_id(), group_for_new_term) | |
st.success(f"Term '{new_term}' added successfully!") | |
st.rerun() | |
else: | |
st.warning("Please enter a search term") | |
def update_search_term_group(session, group_id, updated_terms): | |
try: | |
# Get IDs of selected terms | |
selected_term_ids = [int(term.split(':')[0]) for term in updated_terms] | |
# Update all terms that should be in this group | |
session.query(SearchTerm)\ | |
.filter(SearchTerm.id.in_(selected_term_ids))\ | |
.update({SearchTerm.group_id: group_id}, synchronize_session=False) | |
# Remove group_id from terms that were unselected | |
session.query(SearchTerm)\ | |
.filter(SearchTerm.group_id == group_id)\ | |
.filter(~SearchTerm.id.in_(selected_term_ids))\ | |
.update({SearchTerm.group_id: None}, synchronize_session=False) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error in update_search_term_group: {str(e)}") | |
raise | |
def add_new_search_term(session, new_term, campaign_id, group_for_new_term): | |
try: | |
group_id = None | |
if group_for_new_term != "None": | |
group_id = int(group_for_new_term.split(':')[0]) | |
new_search_term = SearchTerm( | |
term=new_term, | |
campaign_id=campaign_id, | |
group_id=group_id, | |
created_at=datetime.utcnow() | |
) | |
session.add(new_search_term) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error adding search term: {str(e)}") | |
raise | |
def ai_group_search_terms(session, ungrouped_terms): | |
existing_groups = session.query(SearchTermGroup).all() | |
prompt = f""" | |
Categorize these search terms into existing groups or suggest new ones: | |
{', '.join([term.term for term in ungrouped_terms])} | |
Existing groups: {', '.join([group.name for group in existing_groups])} | |
Respond with a JSON object: {{group_name: [term1, term2, ...]}} | |
""" | |
messages = [ | |
{"role": "system", "content": "You're an AI that categorizes search terms for lead generation. Be concise and efficient."}, | |
{"role": "user", "content": prompt} | |
] | |
response = openai_chat_completion(messages, function_name="ai_group_search_terms") | |
return response if isinstance(response, dict) else {} | |
def update_search_term_groups(session, grouped_terms): | |
for group_name, terms in grouped_terms.items(): | |
group = session.query(SearchTermGroup).filter_by(name=group_name).first() | |
if not group: | |
group = SearchTermGroup(name=group_name) | |
session.add(group) | |
session.flush() | |
for term in terms: | |
search_term = session.query(SearchTerm).filter_by(term=term).first() | |
if search_term: | |
search_term.group_id = group.id | |
session.commit() | |
def create_search_term_group(session, group_name): | |
try: | |
new_group = SearchTermGroup(name=group_name) | |
session.add(new_group) | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error creating search term group: {str(e)}") | |
def delete_search_term_group(session, group_id): | |
try: | |
group = session.query(SearchTermGroup).get(group_id) | |
if group: | |
# Set group_id to None for all search terms in this group | |
session.query(SearchTerm).filter(SearchTerm.group_id == group_id).update({SearchTerm.group_id: None}) | |
session.delete(group) | |
except Exception as e: | |
session.rollback() | |
logging.error(f"Error deleting search term group: {str(e)}") | |
def email_templates_page(): | |
st.header("Email Templates") | |
with db_session() as session: | |
templates = session.query(EmailTemplate).all() | |
with st.expander("Create New Template", expanded=False): | |
new_template_name = st.text_input("Template Name", key="new_template_name") | |
use_ai = st.checkbox("Use AI to generate template", key="use_ai") | |
if use_ai: | |
ai_prompt = st.text_area("Enter prompt for AI template generation", key="ai_prompt") | |
use_kb = st.checkbox("Use Knowledge Base information", key="use_kb") | |
if st.button("Generate Template", key="generate_ai_template"): | |
with st.spinner("Generating template..."): | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) if use_kb else None | |
generated_template = generate_or_adjust_email_template(ai_prompt, kb_info) | |
new_template_subject = generated_template.get("subject", "AI Generated Subject") | |
new_template_body = generated_template.get("body", "") | |
if new_template_name: | |
new_template = EmailTemplate( | |
template_name=new_template_name, | |
subject=new_template_subject, | |
body_content=new_template_body, | |
campaign_id=get_active_campaign_id() | |
) | |
session.add(new_template) | |
session.commit() | |
st.success("AI-generated template created and saved!") | |
templates = session.query(EmailTemplate).all() | |
else: | |
st.warning("Please provide a name for the template before generating.") | |
st.subheader("Generated Template") | |
st.text(f"Subject: {new_template_subject}") | |
st.components.v1.html(wrap_email_body(new_template_body), height=400, scrolling=True) | |
else: | |
new_template_subject = st.text_input("Subject", key="new_template_subject") | |
new_template_body = st.text_area("Body Content", height=200, key="new_template_body") | |
if st.button("Create Template", key="create_template_button"): | |
if all([new_template_name, new_template_subject, new_template_body]): | |
new_template = EmailTemplate( | |
template_name=new_template_name, | |
subject=new_template_subject, | |
body_content=new_template_body, | |
campaign_id=get_active_campaign_id() | |
) | |
session.add(new_template) | |
session.commit() | |
st.success("New template created successfully!") | |
templates = session.query(EmailTemplate).all() | |
else: | |
st.warning("Please fill in all fields to create a new template.") | |
if templates: | |
st.subheader("Existing Templates") | |
for template in templates: | |
with st.expander(f"Template: {template.template_name}", expanded=False): | |
col1, col2 = st.columns(2) | |
edited_subject = col1.text_input("Subject", value=template.subject, key=f"subject_{template.id}") | |
is_ai_customizable = col2.checkbox("AI Customizable", value=template.is_ai_customizable, key=f"ai_{template.id}") | |
edited_body = st.text_area("Body Content", value=template.body_content, height=200, key=f"body_{template.id}") | |
ai_adjustment_prompt = st.text_area("AI Adjustment Prompt", key=f"ai_prompt_{template.id}", placeholder="E.g., Make it less marketing-like and mention our main features") | |
col3, col4 = st.columns(2) | |
if col3.button("Apply AI Adjustment", key=f"apply_ai_{template.id}"): | |
with st.spinner("Applying AI adjustment..."): | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
adjusted_template = generate_or_adjust_email_template(ai_adjustment_prompt, kb_info, current_template=edited_body) | |
edited_subject = adjusted_template.get("subject", edited_subject) | |
edited_body = adjusted_template.get("body", edited_body) | |
st.success("AI adjustment applied. Please review and save changes.") | |
if col4.button("Save Changes", key=f"save_{template.id}"): | |
template.subject = edited_subject | |
template.body_content = edited_body | |
template.is_ai_customizable = is_ai_customizable | |
session.commit() | |
st.success("Template updated successfully!") | |
st.markdown("### Preview") | |
st.text(f"Subject: {edited_subject}") | |
st.components.v1.html(wrap_email_body(edited_body), height=400, scrolling=True) | |
if st.button("Delete Template", key=f"delete_{template.id}"): | |
session.delete(template) | |
session.commit() | |
st.success("Template deleted successfully!") | |
st.rerun() | |
else: | |
st.info("No email templates found. Create a new template to get started.") | |
def get_email_preview(session, template_id, from_email, reply_to): | |
template = session.query(EmailTemplate).filter_by(id=template_id).first() | |
if template: | |
wrapped_content = wrap_email_body(template.body_content) | |
return wrapped_content | |
return "<p>Template not found</p>" | |
def fetch_all_search_terms(session): | |
return session.query(SearchTerm).all() | |
def get_knowledge_base_info(session, project_id): | |
kb_info = session.query(KnowledgeBase).filter_by(project_id=project_id).first() | |
return kb_info.to_dict() if kb_info else None | |
def get_email_template_by_name(session, template_name): | |
return session.query(EmailTemplate).filter_by(template_name=template_name).first() | |
def bulk_send_page(): | |
st.title("Bulk Email Sending") | |
with db_session() as session: | |
templates = fetch_email_templates(session) | |
email_settings = fetch_email_settings(session) | |
if not templates or not email_settings: | |
st.error("No email templates or settings available. Please set them up first.") | |
return | |
template_option = st.selectbox("Email Template", options=templates, format_func=lambda x: x.split(":")[1].strip()) | |
template_id = int(template_option.split(":")[0]) | |
template = session.query(EmailTemplate).filter_by(id=template_id).first() | |
col1, col2 = st.columns(2) | |
with col1: | |
subject = st.text_input("Subject", value=template.subject if template else "") | |
email_setting_option = st.selectbox("From Email", options=email_settings, format_func=lambda x: f"{x['name']} ({x['email']})") | |
if email_setting_option: | |
from_email = email_setting_option['email'] | |
reply_to = st.text_input("Reply To", email_setting_option['email']) | |
else: | |
st.error("Selected email setting not found. Please choose a valid email setting.") | |
return | |
with col2: | |
send_option = st.radio("Send to:", ["All Leads", "Specific Email", "Leads from Chosen Search Terms", "Leads from Search Term Groups"]) | |
specific_email = None | |
selected_terms = None | |
if send_option == "Specific Email": | |
specific_email = st.text_input("Enter email") | |
elif send_option == "Leads from Chosen Search Terms": | |
search_terms_with_counts = fetch_search_terms_with_lead_count(session) | |
selected_terms = st.multiselect("Select Search Terms", options=search_terms_with_counts['Term'].tolist()) | |
selected_terms = [term.split(" (")[0] for term in selected_terms] | |
elif send_option == "Leads from Search Term Groups": | |
groups = fetch_search_term_groups(session) | |
selected_groups = st.multiselect("Select Search Term Groups", options=groups) | |
if selected_groups: | |
group_ids = [int(group.split(':')[0]) for group in selected_groups] | |
selected_terms = fetch_search_terms_for_groups(session, group_ids) | |
exclude_previously_contacted = st.checkbox("Exclude Previously Contacted Domains", value=True) | |
st.markdown("### Email Preview") | |
st.text(f"From: {from_email}\nReply-To: {reply_to}\nSubject: {subject}") | |
st.components.v1.html(get_email_preview(session, template_id, from_email, reply_to), height=600, scrolling=True) | |
leads = fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted) | |
total_leads = len(leads) | |
eligible_leads = [lead for lead in leads if lead.get('language', template.language) == template.language] | |
contactable_leads = [lead for lead in eligible_leads if not (exclude_previously_contacted and lead.get('domain_contacted', False))] | |
st.info(f"Total leads: {total_leads}\n" | |
f"Leads matching template language ({template.language}): {len(eligible_leads)}\n" | |
f"Leads to be contacted: {len(contactable_leads)}") | |
if st.button("Send Emails", type="primary"): | |
if not contactable_leads: | |
st.warning("No leads found matching the selected criteria.") | |
return | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
results = [] | |
log_container = st.empty() | |
logs, sent_count = bulk_send_emails(session, template_id, from_email, reply_to, contactable_leads, progress_bar, status_text, results, log_container) | |
st.success(f"Emails sent successfully to {sent_count} leads.") | |
st.subheader("Sending Results") | |
results_df = pd.DataFrame(results) | |
st.dataframe(results_df) | |
success_rate = (results_df['Status'] == 'sent').mean() | |
st.metric("Email Sending Success Rate", f"{success_rate:.2%}") | |
def fetch_leads(session, template_id, send_option, specific_email, selected_terms, exclude_previously_contacted): | |
try: | |
query = session.query(Lead) | |
if send_option == "Specific Email": | |
query = query.filter(Lead.email == specific_email) | |
elif send_option in ["Leads from Chosen Search Terms", "Leads from Search Term Groups"] and selected_terms: | |
query = query.join(LeadSource).join(SearchTerm).filter(SearchTerm.term.in_(selected_terms)) | |
if exclude_previously_contacted: | |
subquery = session.query(EmailCampaign.lead_id).filter(EmailCampaign.sent_at.isnot(None)).subquery() | |
query = query.outerjoin(subquery, Lead.id == subquery.c.lead_id).filter(subquery.c.lead_id.is_(None)) | |
return [{"Email": lead.email, "ID": lead.id} for lead in query.all()] | |
except Exception as e: | |
logging.error(f"Error fetching leads: {str(e)}") | |
return [] | |
def fetch_email_settings(session): | |
try: | |
settings = session.query(EmailSettings).all() | |
return [{"id": setting.id, "name": setting.name, "email": setting.email} for setting in settings] | |
except Exception as e: | |
logging.error(f"Error fetching email settings: {e}") | |
return [] | |
def fetch_search_terms_with_lead_count(session): | |
query = (session.query(SearchTerm.term, | |
func.count(distinct(Lead.id)).label('lead_count'), | |
func.count(distinct(EmailCampaign.id)).label('email_count')) | |
.join(LeadSource, SearchTerm.id == LeadSource.search_term_id) | |
.join(Lead, LeadSource.lead_id == Lead.id) | |
.outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) | |
.group_by(SearchTerm.term)) | |
df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) | |
return df | |
def fetch_search_term_groups(session): | |
return [f"{group.id}: {group.name}" for group in session.query(SearchTermGroup).all()] | |
def fetch_search_terms_for_groups(session, group_ids): | |
terms = session.query(SearchTerm).filter(SearchTerm.group_id.in_(group_ids)).all() | |
return [term.term for term in terms] | |
def ai_automation_loop(session, log_container, leads_container): | |
automation_logs, total_search_terms, total_emails_sent = [], 0, 0 | |
while st.session_state.get('automation_status', False): | |
try: | |
log_container.info("Starting automation cycle") | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
if not kb_info: | |
log_container.warning("Knowledge Base not found. Skipping cycle.") | |
time.sleep(3600) | |
continue | |
base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) | |
st.subheader("Optimized Search Terms") | |
st.write(", ".join(optimized_terms)) | |
total_search_terms = len(optimized_terms) | |
progress_bar = st.progress(0) | |
for idx, term in enumerate(optimized_terms): | |
results = manual_search(session, [term], 10, ignore_previously_fetched=True) | |
new_leads = [(save_lead(session, res['Email'], url=res['URL']).id, res['Email']) for res in results['results'] if save_lead(session, res['Email'], url=res['URL'])] | |
if new_leads: | |
template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() | |
if template: | |
from_email = kb_info.get('contact_email', '[email protected]') | |
reply_to = kb_info.get('contact_email', '[email protected]') | |
logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for _, email in new_leads]) | |
automation_logs.extend(logs) | |
total_emails_sent += sent_count | |
leads_container.text_area("New Leads Found", "\n".join([email for _, email in new_leads]), height=200) | |
progress_bar.progress((idx + 1) / len(optimized_terms)) | |
st.success(f"Automation cycle completed. Total search terms: {total_search_terms}, Total emails sent: {total_emails_sent}") | |
time.sleep(3600) | |
except Exception as e: | |
log_container.error(f"Critical error in automation cycle: {str(e)}") | |
time.sleep(300) | |
log_container.info("Automation stopped") | |
st.session_state.update({"automation_logs": automation_logs, "total_leads_found": total_search_terms, "total_emails_sent": total_emails_sent}) | |
def display_search_results(results, key_suffix): | |
if not results: return st.warning("No results to display.") | |
with st.expander("Search Results", expanded=True): | |
st.markdown(f"### Total Leads Found: **{len(results)}**") | |
for i, res in enumerate(results): | |
with st.expander(f"Lead: {res['Email']}", key=f"lead_expander_{key_suffix}_{i}"): | |
st.markdown(f"**URL:** [{res['URL']}]({res['URL']}) \n**Title:** {res['Title']} \n**Description:** {res['Description']} \n**Tags:** {', '.join(res['Tags'])} \n**Lead Source:** {res['Lead Source']} \n**Lead Email:** {res['Email']}") | |
def perform_quick_scan(session): | |
with st.spinner("Performing quick scan..."): | |
terms = session.query(SearchTerm).order_by(func.random()).limit(3).all() | |
email_setting = fetch_email_settings(session)[0] if fetch_email_settings(session) else None | |
from_email = email_setting['email'] if email_setting else None | |
reply_to = from_email | |
email_template = session.query(EmailTemplate).first() | |
res = manual_search(session, [term.term for term in terms], 10, True, False, False, True, "EN", True, st.empty(), from_email, reply_to, f"{email_template.id}: {email_template.template_name}" if email_template else None) | |
st.success(f"Quick scan completed! Found {len(res['results'])} new leads.") | |
return {"new_leads": len(res['results']), "terms_used": [term.term for term in terms]} | |
def generate_optimized_search_terms(session, base_terms, kb_info): | |
prompt = f"Optimize and expand these search terms for lead generation:\n{', '.join(base_terms)}\n\nConsider:\n1. Relevance to business and target market\n2. Potential for high-quality leads\n3. Variations and related terms\n4. Industry-specific jargon\n\nRespond with a JSON array of optimized terms." | |
response = openai_chat_completion([{"role": "system", "content": "You're an AI specializing in optimizing search terms for lead generation. Be concise and effective."}, {"role": "user", "content": prompt}], function_name="generate_optimized_search_terms") | |
return response.get('optimized_terms', base_terms) if isinstance(response, dict) else base_terms | |
def fetch_search_terms_with_lead_count(session): | |
query = (session.query(SearchTerm.term, | |
func.count(distinct(Lead.id)).label('lead_count'), | |
func.count(distinct(EmailCampaign.id)).label('email_count')) | |
.join(LeadSource, SearchTerm.id == LeadSource.search_term_id) | |
.join(Lead, LeadSource.lead_id == Lead.id) | |
.outerjoin(EmailCampaign, Lead.id == EmailCampaign.lead_id) | |
.group_by(SearchTerm.term)) | |
df = pd.DataFrame(query.all(), columns=['Term', 'Lead Count', 'Email Count']) | |
return df | |
def fetch_leads_for_search_terms(session, search_term_ids) -> List[Lead]: | |
return session.query(Lead).distinct().join(LeadSource).filter(LeadSource.search_term_id.in_(search_term_ids)).all() | |
def projects_campaigns_page(): | |
with db_session() as session: | |
st.header("Projects and Campaigns") | |
st.subheader("Add New Project") | |
with st.form("add_project_form"): | |
project_name = st.text_input("Project Name") | |
if st.form_submit_button("Add Project"): | |
if project_name.strip(): | |
try: | |
session.add(Project(project_name=project_name, created_at=datetime.utcnow())) | |
session.commit() | |
st.success(f"Project '{project_name}' added successfully.") | |
except SQLAlchemyError as e: | |
st.error(f"Error adding project: {str(e)}") | |
else: | |
st.warning("Please enter a project name.") | |
st.subheader("Existing Projects and Campaigns") | |
projects = session.query(Project).all() | |
for project in projects: | |
with st.expander(f"Project: {project.project_name}"): | |
st.info("Campaigns share resources and settings within a project.") | |
with st.form(f"add_campaign_form_{project.id}"): | |
campaign_name = st.text_input("Campaign Name", key=f"campaign_name_{project.id}") | |
if st.form_submit_button("Add Campaign"): | |
if campaign_name.strip(): | |
try: | |
session.add(Campaign(campaign_name=campaign_name, project_id=project.id, created_at=datetime.utcnow())) | |
session.commit() | |
st.success(f"Campaign '{campaign_name}' added to '{project.project_name}'.") | |
except SQLAlchemyError as e: | |
st.error(f"Error adding campaign: {str(e)}") | |
else: | |
st.warning("Please enter a campaign name.") | |
campaigns = session.query(Campaign).filter_by(project_id=project.id).all() | |
st.write("Campaigns:" if campaigns else f"No campaigns for {project.project_name} yet.") | |
for campaign in campaigns: | |
st.write(f"- {campaign.campaign_name}") | |
st.subheader("Set Active Project and Campaign") | |
project_options = [p.project_name for p in projects] | |
if project_options: | |
active_project = st.selectbox("Select Active Project", options=project_options, index=0) | |
active_project_id = session.query(Project.id).filter_by(project_name=active_project).scalar() | |
set_active_project_id(active_project_id) | |
active_project_campaigns = session.query(Campaign).filter_by(project_id=active_project_id).all() | |
if active_project_campaigns: | |
campaign_options = [c.campaign_name for c in active_project_campaigns] | |
active_campaign = st.selectbox("Select Active Campaign", options=campaign_options, index=0) | |
active_campaign_id = session.query(Campaign.id).filter_by(campaign_name=active_campaign, project_id=active_project_id).scalar() | |
set_active_campaign_id(active_campaign_id) | |
st.success(f"Active Project: {active_project}, Active Campaign: {active_campaign}") | |
else: | |
st.warning(f"No campaigns available for {active_project}. Please add a campaign.") | |
else: | |
st.warning("No projects found. Please add a project first.") | |
def knowledge_base_page(): | |
st.title("Knowledge Base") | |
with db_session() as session: | |
project_options = fetch_projects(session) | |
if not project_options: return st.warning("No projects found. Please create a project first.") | |
selected_project = st.selectbox("Select Project", options=project_options) | |
project_id = int(selected_project.split(":")[0]) | |
set_active_project_id(project_id) | |
kb_entry = session.query(KnowledgeBase).filter_by(project_id=project_id).first() | |
with st.form("knowledge_base_form"): | |
fields = ['kb_name', 'kb_bio', 'kb_values', 'contact_name', 'contact_role', 'contact_email', 'company_description', 'company_mission', 'company_target_market', 'company_other', 'product_name', 'product_description', 'product_target_customer', 'product_other', 'other_context', 'example_email'] | |
form_data = {field: st.text_input(field.replace('_', ' ').title(), value=getattr(kb_entry, field, '')) if field in ['kb_name', 'contact_name', 'contact_role', 'contact_email', 'product_name'] else st.text_area(field.replace('_', ' ').title(), value=getattr(kb_entry, field, '')) for field in fields} | |
if st.form_submit_button("Save Knowledge Base"): | |
try: | |
form_data.update({'project_id': project_id, 'created_at': datetime.utcnow()}) | |
if kb_entry: | |
for k, v in form_data.items(): setattr(kb_entry, k, v) | |
else: session.add(KnowledgeBase(**form_data)) | |
session.commit() | |
st.success("Knowledge Base saved successfully!", icon="β ") | |
except Exception as e: st.error(f"An error occurred while saving the Knowledge Base: {str(e)}") | |
def autoclient_ai_page(): | |
st.header("AutoclientAI - Automated Lead Generation") | |
with st.expander("Knowledge Base Information", expanded=False): | |
with db_session() as session: | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
if not kb_info: | |
return st.error("Knowledge Base not found for the active project. Please set it up first.") | |
st.json(kb_info) | |
user_input = st.text_area("Enter additional context or specific goals for lead generation:", help="This information will be used to generate more targeted search terms.") | |
if st.button("Generate Optimized Search Terms", key="generate_optimized_terms"): | |
with st.spinner("Generating optimized search terms..."): | |
with db_session() as session: | |
base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) | |
if optimized_terms: | |
st.session_state.optimized_terms = optimized_terms | |
st.success("Search terms optimized successfully!") | |
st.subheader("Optimized Search Terms") | |
st.write(", ".join(optimized_terms)) | |
else: | |
st.error("Failed to generate optimized search terms. Please try again.") | |
if st.button("Start Automation", key="start_automation"): | |
st.session_state.update({"automation_status": True, "automation_logs": [], "total_leads_found": 0, "total_emails_sent": 0}) | |
st.success("Automation started!") | |
if st.session_state.get('automation_status', False): | |
st.subheader("Automation in Progress") | |
progress_bar, log_container, leads_container, analytics_container = st.progress(0), st.empty(), st.empty(), st.empty() | |
try: | |
with db_session() as session: | |
ai_automation_loop(session, log_container, leads_container) | |
except Exception as e: | |
st.error(f"An error occurred in the automation process: {str(e)}") | |
st.session_state.automation_status = False | |
if not st.session_state.get('automation_status', False) and st.session_state.get('automation_logs'): | |
st.subheader("Automation Results") | |
st.metric("Total Leads Found", st.session_state.total_leads_found) | |
st.metric("Total Emails Sent", st.session_state.total_emails_sent) | |
st.subheader("Automation Logs") | |
st.text_area("Logs", "\n".join(st.session_state.automation_logs), height=300) | |
if 'email_logs' in st.session_state: | |
st.subheader("Email Sending Logs") | |
df_logs = pd.DataFrame(st.session_state.email_logs) | |
st.dataframe(df_logs) | |
success_rate = (df_logs['Status'] == 'sent').mean() * 100 | |
st.metric("Email Sending Success Rate", f"{success_rate:.2f}%") | |
st.subheader("Debug Information") | |
st.json(st.session_state) | |
st.write("Current function:", autoclient_ai_page.__name__) | |
st.write("Session state keys:", list(st.session_state.keys())) | |
def update_search_terms(session, classified_terms): | |
for group, terms in classified_terms.items(): | |
for term in terms: | |
existing_term = session.query(SearchTerm).filter_by(term=term, project_id=get_active_project_id()).first() | |
if existing_term: | |
existing_term.group = group | |
else: | |
session.add(SearchTerm(term=term, group=group, project_id=get_active_project_id())) | |
session.commit() | |
def update_results_display(results_container, results): | |
results_container.markdown( | |
f""" | |
<style> | |
.results-container {{ | |
max-height: 400px; | |
overflow-y: auto; | |
border: 1px solid rgba(49, 51, 63, 0.2); | |
border-radius: 0.25rem; | |
padding: 1rem; | |
background-color: rgba(49, 51, 63, 0.1); | |
}} | |
.result-entry {{ | |
margin-bottom: 0.5rem; | |
padding: 0.5rem; | |
background-color: rgba(255, 255, 255, 0.1); | |
border-radius: 0.25rem; | |
}} | |
</style> | |
<div class="results-container"> | |
<h4>Found Leads ({len(results)})</h4> | |
{"".join(f'<div class="result-entry"><strong>{res["Email"]}</strong><br>{res["URL"]}</div>' for res in results[-10:])} | |
</div> | |
""", | |
unsafe_allow_html=True | |
) | |
def automation_control_panel_page(): | |
# Initialize persistent state variables | |
if 'automation_running' not in st.session_state: | |
st.session_state.automation_running = False | |
if 'automation_stats' not in st.session_state: | |
st.session_state.automation_stats = { | |
'total_leads': 0, | |
'emails_sent': 0, | |
'last_run': None, | |
'current_term': None | |
} | |
if 'automation_logs' not in st.session_state: | |
st.session_state.automation_logs = [] | |
st.title("Automation Control Panel") | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
status = "Active" if st.session_state.get('automation_status', False) else "Inactive" | |
st.metric("Automation Status", status) | |
with col2: | |
button_text = "Stop Automation" if st.session_state.get('automation_status', False) else "Start Automation" | |
if st.button(button_text, use_container_width=True): | |
st.session_state.automation_status = not st.session_state.get('automation_status', False) | |
if st.session_state.automation_status: | |
st.session_state.automation_logs = [] | |
st.rerun() | |
if st.button("Perform Quick Scan", use_container_width=True): | |
with st.spinner("Performing quick scan..."): | |
try: | |
with db_session() as session: | |
new_leads = session.query(Lead).filter(Lead.is_processed == False).count() | |
session.query(Lead).filter(Lead.is_processed == False).update({Lead.is_processed: True}) | |
session.commit() | |
st.success(f"Quick scan completed! Found {new_leads} new leads.") | |
except Exception as e: | |
st.error(f"An error occurred during quick scan: {str(e)}") | |
st.subheader("Real-Time Analytics") | |
try: | |
with db_session() as session: | |
total_leads = session.query(Lead).count() | |
emails_sent = session.query(EmailCampaign).count() | |
col1, col2 = st.columns(2) | |
col1.metric("Total Leads", total_leads) | |
col2.metric("Emails Sent", emails_sent) | |
except Exception as e: | |
st.error(f"An error occurred while displaying analytics: {str(e)}") | |
st.subheader("Automation Logs") | |
log_container = st.empty() | |
update_display(log_container, st.session_state.get('automation_logs', []), "Latest Logs", "log") | |
st.subheader("Recently Found Leads") | |
leads_container = st.empty() | |
if st.session_state.get('automation_status', False): | |
st.info("Automation is currently running in the background.") | |
try: | |
with db_session() as session: | |
while st.session_state.get('automation_status', False): | |
kb_info = get_knowledge_base_info(session, get_active_project_id()) | |
if not kb_info: | |
st.session_state.automation_logs.append("Knowledge Base not found. Skipping cycle.") | |
time.sleep(3600) | |
continue | |
base_terms = [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
optimized_terms = generate_optimized_search_terms(session, base_terms, kb_info) | |
new_leads_all = [] | |
for term in optimized_terms: | |
results = manual_search(session, [term], 10) | |
new_leads = [(res['Email'], res['URL']) for res in results['results'] if save_lead(session, res['Email'], url=res['URL'])] | |
new_leads_all.extend(new_leads) | |
if new_leads: | |
template = session.query(EmailTemplate).filter_by(project_id=get_active_project_id()).first() | |
if template: | |
from_email = kb_info.get('contact_email') or '[email protected]' | |
reply_to = kb_info.get('contact_email') or '[email protected]' | |
logs, sent_count = bulk_send_emails(session, template.id, from_email, reply_to, [{'Email': email} for email, _ in new_leads]) | |
st.session_state.automation_logs.extend(logs) | |
if new_leads_all: | |
leads_df = pd.DataFrame(new_leads_all, columns=['Email', 'URL']) | |
leads_container.dataframe(leads_df, hide_index=True) | |
else: | |
leads_container.info("No new leads found in this cycle.") | |
update_display(log_container, st.session_state.get('automation_logs', []), "Latest Logs", "log") | |
time.sleep(3600) | |
except Exception as e: | |
st.error(f"An error occurred in the automation process: {str(e)}") | |
def get_knowledge_base_info(session, project_id): | |
kb = session.query(KnowledgeBase).filter_by(project_id=project_id).first() | |
return kb.to_dict() if kb else None | |
def generate_optimized_search_terms(session, base_terms, kb_info): | |
ai_prompt = f"Generate 5 optimized search terms based on: {', '.join(base_terms)}. Context: {kb_info}" | |
return get_ai_response(ai_prompt).split('\n') | |
def update_display(container, items, title, item_type): | |
container.markdown(f"<h4>{title}</h4>", unsafe_allow_html=True) | |
for item in items[-10:]: | |
container.text(item) | |
def get_search_terms(session): | |
return [term.term for term in session.query(SearchTerm).filter_by(project_id=get_active_project_id()).all()] | |
def get_ai_response(prompt): | |
return openai.Completion.create(engine="text-davinci-002", prompt=prompt, max_tokens=100).choices[0].text.strip() | |
def fetch_email_settings(session): | |
try: | |
settings = session.query(EmailSettings).all() | |
return [{"id": setting.id, "name": setting.name, "email": setting.email} for setting in settings] | |
except Exception as e: | |
logging.error(f"Error fetching email settings: {e}") | |
return [] | |
def bulk_send_emails(session, template_id, from_email, reply_to, leads, progress_bar=None, status_text=None, results=None, log_container=None): | |
template = session.query(EmailTemplate).filter_by(id=template_id).first() | |
if not template: | |
logging.error(f"Email template with ID {template_id} not found.") | |
return [], 0 | |
email_subject = template.subject | |
email_content = template.body_content | |
logs, sent_count = [], 0 | |
total_leads = len(leads) | |
for index, lead in enumerate(leads): | |
try: | |
validate_email(lead['Email']) | |
response, tracking_id = send_email_ses(session, from_email, lead['Email'], email_subject, email_content, reply_to=reply_to) | |
if response: | |
status = 'sent' | |
message_id = response.get('MessageId', f"sent-{uuid.uuid4()}") | |
sent_count += 1 | |
log_message = f"β Email sent to: {lead['Email']}" | |
else: | |
status = 'failed' | |
message_id = f"failed-{uuid.uuid4()}" | |
log_message = f"β Failed to send email to: {lead['Email']}" | |
save_email_campaign(session, lead['Email'], template_id, status, datetime.utcnow(), email_subject, message_id, email_content) | |
logs.append(log_message) | |
if progress_bar: | |
progress_bar.progress((index + 1) / total_leads) | |
if status_text: | |
status_text.text(f"Processed {index + 1}/{total_leads} leads") | |
if results is not None: | |
results.append({"Email": lead['Email'], "Status": status}) | |
if log_container: | |
log_container.text(log_message) | |
except EmailNotValidError: | |
log_message = f"β Invalid email address: {lead['Email']}" | |
logs.append(log_message) | |
except Exception as e: | |
error_message = f"Error sending email to {lead['Email']}: {str(e)}" | |
logging.error(error_message) | |
save_email_campaign(session, lead['Email'], template_id, 'failed', datetime.utcnow(), email_subject, f"error-{uuid.uuid4()}", email_content) | |
logs.append(f"β Error sending email to: {lead['Email']} (Error: {str(e)})") | |
return logs, sent_count | |
def wrap_email_body(body_content): | |
return f""" | |
<!DOCTYPE html> | |
<html lang="en"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Email Template</title> | |
<style> | |
body {{ | |
font-family: Arial, sans-serif; | |
line-height: 1.6; | |
color: #333; | |
max-width: 600px; | |
margin: 0 auto; | |
padding: 20px; | |
}} | |
</style> | |
</head> | |
<body> | |
{body_content} | |
</body> | |
</html> | |
""" | |
def fetch_sent_email_campaigns(session): | |
try: | |
email_campaigns = session.query(EmailCampaign).join(Lead).join(EmailTemplate).options(joinedload(EmailCampaign.lead), joinedload(EmailCampaign.template)).order_by(EmailCampaign.sent_at.desc()).all() | |
return pd.DataFrame({ | |
'ID': [ec.id for ec in email_campaigns], | |
'Sent At': [ec.sent_at.strftime("%Y-%m-%d %H:%M:%S") if ec.sent_at else "" for ec in email_campaigns], | |
'Email': [ec.lead.email for ec in email_campaigns], | |
'Template': [ec.template.template_name for ec in email_campaigns], | |
'Subject': [ec.customized_subject or "No subject" for ec in email_campaigns], | |
'Content': [ec.customized_content or "No content" for ec in email_campaigns], | |
'Status': [ec.status for ec in email_campaigns], | |
'Message ID': [ec.message_id or "No message ID" for ec in email_campaigns], | |
'Campaign ID': [ec.campaign_id for ec in email_campaigns], | |
'Lead Name': [f"{ec.lead.first_name or ''} {ec.lead.last_name or ''}".strip() or "Unknown" for ec in email_campaigns], | |
'Lead Company': [ec.lead.company or "Unknown" for ec in email_campaigns] | |
}) | |
except SQLAlchemyError as e: | |
logging.error(f"Database error in fetch_sent_email_campaigns: {str(e)}") | |
return pd.DataFrame() | |
def display_logs(log_container, logs): | |
if not logs: | |
log_container.info("No logs to display yet.") | |
return | |
log_container.markdown( | |
""" | |
<style> | |
.log-container { | |
max-height: 300px; | |
overflow-y: auto; | |
border: 1px solid rgba(49, 51, 63, 0.2); | |
border-radius: 0.25rem; | |
padding: 1rem; | |
} | |
.log-entry { | |
margin-bottom: 0.5rem; | |
padding: 0.5rem; | |
border-radius: 0.25rem; | |
background-color: rgba(28, 131, 225, 0.1); | |
} | |
</style> | |
""", | |
unsafe_allow_html=True | |
) | |
log_entries = "".join(f'<div class="log-entry">{log}</div>' for log in logs[-20:]) | |
log_container.markdown(f'<div class="log-container">{log_entries}</div>', unsafe_allow_html=True) | |
def view_sent_email_campaigns(): | |
st.header("Sent Email Campaigns") | |
try: | |
with db_session() as session: | |
email_campaigns = fetch_sent_email_campaigns(session) | |
if not email_campaigns.empty: | |
st.dataframe(email_campaigns) | |
st.subheader("Detailed Content") | |
selected_campaign = st.selectbox("Select a campaign to view details", email_campaigns['ID'].tolist()) | |
if selected_campaign: | |
campaign_content = email_campaigns[email_campaigns['ID'] == selected_campaign]['Content'].iloc[0] | |
st.text_area("Content", campaign_content if campaign_content else "No content available", height=300) | |
else: | |
st.info("No sent email campaigns found.") | |
except Exception as e: | |
st.error(f"An error occurred while fetching sent email campaigns: {str(e)}") | |
logging.error(f"Error in view_sent_email_campaigns: {str(e)}") | |
def main(): | |
st.set_page_config( | |
page_title="Autoclient.ai | Lead Generation AI App", | |
layout="wide", | |
initial_sidebar_state="expanded", | |
page_icon="" | |
) | |
st.sidebar.title("AutoclientAI") | |
st.sidebar.markdown("Select a page to navigate through the application.") | |
pages = { | |
"π Manual Search": manual_search_page, | |
"π¦ Bulk Send": bulk_send_page, | |
"π₯ View Leads": view_leads_page, | |
"π Search Terms": search_terms_page, | |
"βοΈ Email Templates": email_templates_page, | |
"π Projects & Campaigns": projects_campaigns_page, | |
"π Knowledge Base": knowledge_base_page, | |
"π€ AutoclientAI": autoclient_ai_page, | |
"βοΈ Automation Control": automation_control_panel_page, | |
"π¨ Email Logs": view_campaign_logs, | |
"π Settings": settings_page, | |
"π¨ Sent Campaigns": view_sent_email_campaigns | |
} | |
with st.sidebar: | |
selected = option_menu( | |
menu_title="Navigation", | |
options=list(pages.keys()), | |
icons=["search", "send", "people", "key", "envelope", "folder", "book", "robot", "gear", "list-check", "gear", "envelope-open"], | |
menu_icon="cast", | |
default_index=0 | |
) | |
try: | |
pages[selected]() | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
logging.exception("An error occurred in the main function") | |
st.write("Please try refreshing the page or contact support if the issue persists.") | |
st.sidebar.markdown("---") | |
st.sidebar.info("Β© 2024 AutoclientAI. All rights reserved.") | |
if __name__ == "__main__": | |
main() |