company_insight / app.py
VinitSan's picture
Update app.py
5742a7c verified
raw
history blame
6.02 kB
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline
from langdetect import detect
import numpy as np
import gradio as gr
import re
import torch
import requests
import os
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
vectorizer = TfidfVectorizer(stop_words='english')
API_KEY=os.getenv("API_KEY")
def text_fetch(url, driver):
try:
print(url)
driver.get(url)
soup = BeautifulSoup(driver.page_source, 'lxml')
cookie_selectors = [
'div.cookie-banner',
'div.cookie-popup',
'div.cookie-notice',
'footer',
'aside',
'div#cookie-consent',
'div#cookie-banner',
'nav',
'header']
for selector in cookie_selectors:
for element in soup.select(selector):
element.decompose()
para = soup.find_all('p')
cleaned_text=''
despose={'cookies','cookie','news','privacy',"\n", "verifying you are human"}
for p in para:
try:
text=p.get_text()
k=text
k.lower()
l=set(k)
if l.intersection(despose):
continue
cleaned_text+=' '+text
except:
continue
print("ok")
return cleaned_text
except TimeoutException:
return None
except WebDriverException as e:
return None
except Exception as e:
return None
def api_search(query):
params = {
'access_key': API_KEY,
'query': query,
'type': 'news',
'auto_location': 0,
'gl': 'in',
'hl': 'en'
}
api_result = requests.get('https://api.serpstack.com/search', params)
api_response = api_result.json()
anchors=[]
for news in api_response['news_results']:
anchors.append(news['url'])
print(len(anchors))
return anchors
def clean_text(text):
try:
if detect(text) == 'en':
cleaned_text = re.sub(r'[^a-zA-Z\s.,?!]', '', text)
return cleaned_text
else:
return ''
except Exception as e:
return ''
def text_generation(query):
links = api_search(query)
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
driver = webdriver.Chrome(options=options)
driver.set_page_load_timeout(10)
print("driver started")
pair = {}
i=1
for url in links:
print(i,end=' ')
i+=1
text = text_fetch(url,driver)
if text:
if url[-1]=='/':
url=url[:-1]
text=clean_text(text)
pair[text] = url
driver.quit()
print("driver quit")
print(len(pair),"pairs")
return pair
def clustering(pairs,e=0.8):
texts=[text for text in pairs]
try:
X = vectorizer.fit_transform(texts)
cosine_sim_matrix = cosine_similarity(X)
cosine_dist_matrix = 1 - cosine_sim_matrix
cosine_dist_matrix = np.clip(cosine_dist_matrix, 0.0, 1.0)
db = DBSCAN(metric="precomputed", eps=e, min_samples=2)
labels = db.fit_predict(cosine_dist_matrix)
except:
return []
best_l=-1
best_cluster=None
cluster={}
for i, label in enumerate(labels):
if label<0:
continue
if label not in cluster:
cluster[label]=[[],[]]
cluster[label][0].append(texts[i])
cluster[label][1].append(pairs[texts[i]])
best_l=len(cluster)
best_cluster=cluster
if len(cluster)<3:
cluster=clustering(pairs,e-0.1)
for i in cluster:
cluster[i][0].sort(key = lambda x:len(x))
if best_l>len(cluster):
return best_cluster
return cluster
def summarization(text):
text=text.split()
if len(text)<=250:
return None
text=' '.join(text[0:min(700,len(text))])
summary = summarizer(text, max_length=250, min_length=100, do_sample=False)
return summary[0]['summary_text']
def output(cluster):
label=0
out={}
while label>-1:
if label not in cluster:
break
summary = summarization(cluster[label][0][-1])
if not summary:
continue
out[summary]=cluster[label][1]
label+=1
return out
def query(company,domain=''):
query = company+' '+domain
pairs = text_generation(query)
print("clustering start")
cluster = clustering(pairs)
print(len(cluster),"clusters")
print("summarization")
result = output(cluster)
print("Done")
return result
def gradio_fun(name,domain=''):
result = query(name,domain)
out_str = f"\"{name.capitalize()}\" Report on \"{domain.capitalize()}\" domain.\n\n"
i=1
for text in result:
out_str+=str(i)+". "+text+"\n\nSupporting URLs\n"
i+=1
for link in result[text]:
out_str+=">>"+link+"\n"
out_str+='\n\n'
return out_str
with gr.Blocks(fill_height=True) as demo:
title="Company Insight",
description="Fill below information to see Insight."
with gr.Row():
with gr.Column():
name = gr.Textbox(placeholder="Organization Name",label='Organization')
domain = gr.Textbox(placeholder="Domain",label='Domain')
submit = gr.Button(value="Submit")
with gr.Column():
result = gr.Textbox(label="Output",lines=28)
submit.click(gradio_fun, inputs=[name,domain], outputs=result)
demo.launch()