company_insight / app.py
VinitSan's picture
Update app.py
e6fb4ba verified
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline
from langdetect import detect
import numpy as np
import gradio as gr
import re
import torch
import requests
import os
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
vectorizer = TfidfVectorizer(stop_words='english')
API_KEY=os.getenv("API_KEY")
def text_fetch(url, driver):
try:
print(url)
driver.get(url)
soup = BeautifulSoup(driver.page_source, 'lxml')
cookie_selectors = [
'div.cookie-banner',
'div.cookie-popup',
'div.cookie-notice',
'footer',
'aside',
'div#cookie-consent',
'div#cookie-banner',
'nav',
'header']
for selector in cookie_selectors:
for element in soup.select(selector):
element.decompose()
para = soup.find_all('p')
cleaned_text=''
despose={'cookies','cookie','news','privacy',"\n", "verifying you are human"}
for p in para:
try:
text=p.get_text()
k=text
k.lower()
l=set(k)
if l.intersection(despose):
continue
cleaned_text+=' '+text
except:
continue
response = requests.get(url)
if response.status_code == 200:
if response.encoding != 'utf-8':
response.encoding = 'utf-8'
soup = BeautifulSoup(response.text, 'lxml')
para = soup.find_all('p')
temp=''
despose={'cookies','Cookies','cookie','Cookie','COOKIES','COOKIE','News','Focus','Privacy',"\\n"}
for p in para:
try:
text=p.get_text()
if any(k in text for k in despose):
continue
temp+=' '+text
except:
pass
if len(temp)>len(cleaned_text):
cleaned_text=temp
print("ok")
return cleaned_text
except TimeoutException:
return None
except WebDriverException as e:
return None
except Exception as e:
return None
def api_search(query):
params = {
'access_key': API_KEY,
'query': query,
'type': 'news',
'auto_location': 0,
'gl': 'in',
'hl': 'en'
}
try:
api_result = requests.get('https://api.serpstack.com/search', params)
api_response = api_result.json()
anchors=[]
for news in api_response['news_results']:
anchors.append(news['url'])
print(len(anchors))
return anchors
except Exception as e:
print(e)
def clean_text(text):
try:
if detect(text) == 'en':
cleaned_text = re.sub(r'[^a-zA-Z\s.,?!]', '', text)
return cleaned_text
else:
return ''
except Exception as e:
return ''
def text_generation(query):
links = api_search(query)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
driver.set_page_load_timeout(10)
print("driver started")
pair = {}
i=1
for url in links:
print(i,end=' ')
i+=1
text = text_fetch(url,driver)
if text:
if url[-1]=='/':
url=url[:-1]
text=clean_text(text)
pair[text] = url
driver.quit()
print("driver quit")
print(len(pair),"pairs")
return pair
def clustering(pairs,texts,e=0.8,k=0):
try:
X = vectorizer.fit_transform(texts)
cosine_sim_matrix = cosine_similarity(X)
cosine_dist_matrix = 1 - cosine_sim_matrix
cosine_dist_matrix = np.clip(cosine_dist_matrix, 0.0, 1.0)
db = DBSCAN(metric="precomputed", eps=e, min_samples=2)
labels = db.fit_predict(cosine_dist_matrix)
best_l = -1
best_cluster=None
cluster = {}
for i, label in enumerate(labels):
if label<0:
continue
if label not in cluster:
cluster[label] = [[],[]]
cluster[label][0].append(texts[i])
cluster[label][1].append(pairs[texts[i]])
k = max(len(cluster),k)
if e>0.1:
temp = clustering(pairs,e-0.1,k)
if len(temp)>k:
return temp
return cluster
except Exception as e:
print(e)
return {}
def summarization(text):
text=text.split()
if len(text)<=250:
return None
text=' '.join(text[0:min(700,len(text))])
summary = summarizer(text, max_length=250, min_length=100, do_sample=False)
return summary[0]['summary_text']
def output(cluster):
if not cluster:
return {}
out={}
l=list(cluster.keys())
l.sort()
i=0
for label in l:
print(i)
i+=1
print(cluster[label][0][-1])
summary = summarization(cluster[label][0][-1])
if not summary:
continue
out[summary]=cluster[label][1]
label+=1
return out
def query(company,domain=''):
query = company+' '+domain
pairs = text_generation(query)
texts = [text for text in pairs]
print("clustering start")
cluster = clustering(pairs, texts)
print(len(cluster),"clusters")
print("summarization")
result = output(cluster)
print("Done")
return result
def gradio_fun(name,domain=''):
result = query(name,domain)
out_str = f"\"{name.capitalize()}\" Report on \"{domain.capitalize()}\" domain.\n\n"
i=1
for text in result:
out_str+=str(i)+". "+text+"\n\nSupporting URLs\n"
i+=1
for link in result[text]:
out_str+=">>"+link+"\n"
out_str+='\n\n'
return out_str
with gr.Blocks(fill_height=True) as demo:
title="Company Insight",
description="Fill below information to see Insight."
with gr.Row():
with gr.Column():
name = gr.Textbox(placeholder="Organization Name",label='Organization')
domain = gr.Textbox(placeholder="Domain",label='Domain')
submit = gr.Button(value="Submit")
with gr.Column():
result = gr.Textbox(label="Output",lines=28)
submit.click(gradio_fun, inputs=[name,domain], outputs=result)
demo.launch()