Spaces:
Sleeping
Sleeping
File size: 9,088 Bytes
0d18784 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from crewai_tools import BaseTool
import pandas as pd
import numpy as np
import yfinance as yf
import yahooquery
import tensorflow_hub as hub
import requests
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import os
class CompetitorAnalysisTool(BaseTool):
name: str = "Competitor Analysis Tool"
description: str = "Identify competitors in a specific industry and perform clustering based on business model descriptions."
def _run(self, company_name: str) -> str:
def get_ticker(company_name):
yfinance = "https://query2.finance.yahoo.com/v1/finance/search"
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
params = {"q": company_name, "quotes_count": 1, "country": "United States"}
res = requests.get(url=yfinance, params=params, headers={'User-Agent': user_agent})
data = res.json()
company_code = data['quotes'][0]['symbol']
return company_code
"""
Initialises the class based on the requested company given as a command-line variable.
self.ticker is a string representing the ticker symbol for the company.
self.info saves all the basic information about the company from the yfinance api.
"""
try:
self.ticker = get_ticker(company_name)
self.info = yf.Ticker(self.ticker).info
except Exception as e:
print(e)
raise
def similar_group(self) -> str :
"""
AVAILABLE_SCREENS is a constant that stores the categories that the available stocks are sorted into.
In this method, we use the industry of our chosen company and work out which of the categories of stocks best fits the industry.
To compute the 'best fit', we encode the screen names and industry name using the Google Universal Sentence Encoder as it captures the semantics of the sentence as well.
The method then returns the stocks obtained from the resultant category.
"""
screener = yahooquery.Screener()
use_model = hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4")
screeners_embedded = use_model(AVAILABLE_SCREENS)
information = pd.DataFrame({'screeners':AVAILABLE_SCREENS, 'embeddings':np.array(x.numpy() for x in screeners_embedded)})
industry_vector = use_model([self.info['industry']])[0].numpy()
compute_similarity = lambda x: cosine_similarity([x], [industry_vector])[0][0]
information['similarity'] = information['embeddings'].map(compute_similarity)
maximum_similarity = max(information['similarity'])
predicted_screen = information[information['similarity'] == maximum_similarity]
return list(predicted_screen['screeners'])[0]
def identify_market_competition(self) -> pd.DataFrame :
"""
This method processes the result of the similar_group method.
The list of stocks obtained from the similar_group method are all potential competitors to our chosen company.
So, it collects all companies in a pandas dataframe and attaches the description of the business model of each to the dataframe.
This dataframe is then the output of the method.
"""
screener = yahooquery.Screener()
most_likely_category = self.similar_group()
potential_competition = screener.get_screeners(most_likely_category)[most_likely_category]['quotes']
competitor_list = list(x['symbol'] for x in potential_competition)
if self.ticker not in competitor_list:
competitor_list.append(self.ticker)
competitor_list = yahooquery.Ticker(competitor_list)
data = pd.DataFrame(competitor_list.asset_profile).T
relevent_data = pd.DataFrame(data, columns=['longBusinessSummary'])
return relevent_data
def prepare_clustering_data(self, paragraphs:pd.DataFrame) -> pd.DataFrame :
"""
This method takes as input, the dataframe outputted by the self.identify_market_competition method.
The next step is to process the description of the business model but any ML algorithm cannot process string-like data.
So, this method uses the Google Universal Sentence Encoder to encode each description as a vector and attaches this to the dataframe.
The method then outputs this modified dataframe.
"""
paragraph_embedder = hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4")
business_description_embedded = paragraph_embedder(paragraphs['longBusinessSummary'])
paragraphs['summaryEncoded'] = list(x.numpy() for x in business_description_embedded)
return paragraphs
def kmeans_clustering(self, descriptions:pd.DataFrame, clusters:int) -> np.ndarray :
"""
Takes the set of vectors representing the business model and runs a kmeans clustering algorithm to group together similar business descriptions.
This method only runs the algorithm for a given number of clusters and this number is optimized in self.get_optimized_labels to maximise information gain.
"""
kmeans_object = KMeans(n_clusters=clusters, random_state=1)
kmeans_object.fit(np.array(list(descriptions['summaryEncoded'])))
return kmeans_object.labels_
def get_optimized_labels(self, descriptions:pd.DataFrame, optimisation_range:tuple) -> np.ndarray :
"""
This method runs self.kmeans_clustering on a given range of cluster numbers and works out the optimum number to maximise the quality of clusters.
This is done via maximising the silhouette score.
The cluster associated with each description is then outputted.
"""
k_values = range(optimisation_range[0], optimisation_range[1])
max_silhouette_score = [0,0]
for k in k_values:
current_labels = self.kmeans_clustering(descriptions, k)
silhouette_average = silhouette_score(np.array(list(descriptions['summaryEncoded'])), current_labels)
if (silhouette_average > max_silhouette_score[0]):
max_silhouette_score = [silhouette_average, current_labels]
return max_silhouette_score[1]
def obtain_competitors(self, descriptions:pd.DataFrame) -> list :
"""
Runs the kmeans clustering algorithm on potential competitors and ones which are chosen into the same cluster are assumed to have the most similar business model.
So, this methods groups together companies in the same cluster as the chosen one and this list is outputted.
"""
if (len(descriptions) <= 5):
return list(descriptions.index)
max_clusters = int(len(descriptions) / 2) + 1
optimum_labels = self.get_optimized_labels(descriptions, (2,max_clusters))
descriptions['clusterLabel'] = optimum_labels
select_cluster = int(descriptions.loc[self.ticker]['clusterLabel'])
selected_competitors = list(descriptions[descriptions['clusterLabel'] == select_cluster].index)
selected_competitors.remove(self.ticker)
return selected_competitors
def competitor_analysis_report(self):
"""
Uses the result of self.obtain_competitors and writes a mini summary of results into a text file.
"""
potential_competitors = self.identify_market_competition()
revised_competitors = self.prepare_clustering_data(potential_competitors)
final_companies = self.obtain_competitors(revised_competitors)
ticker_list = yahooquery.Ticker(final_companies)
information_dict = ticker_list.quote_type
requested_company = yahooquery.Ticker([self.ticker]).quote_type[self.ticker]['shortName']
file_name = f"competitionAnalysisReport_{self.ticker}.txt"
if len(final_companies) > 0:
write_string = f"Requested Company : {requested_company} ({self.ticker})\n\nPossible market competitors in current economic landscape based on business model :\n"
count = 1
for x in information_dict:
write_string += f"\t{count}. {information_dict[x]['shortName']} ({x})\n"
count += 1
else:
write_string = f"Requested Company : {requested_company} ({self.ticker})\n\nAlgorithm could not find another major company with noticably similar business models.\nLooks like the company chosen has exploited a gap in the market!"
with open(file_name, "w") as f:
f.write(write_string)
return write_string |