import math import re from statistics import median from bs4 import BeautifulSoup from langchain.docstore.document import Document from langchain.document_loaders import PDFMinerPDFasHTMLLoader, WebBaseLoader deep_strip = lambda text: re.sub(r"\s+", " ", text or "").strip() def process_documents(urls): snippets = [] for url in urls: if url.endswith(".pdf"): snippets.extend(process_pdf(url)) else: snippets.extend(process_web(url)) for e, snippet in enumerate(snippets): snippet.metadata["chunk_id"] = e return snippets def process_pdf(url): data = PDFMinerPDFasHTMLLoader(url).load()[0] content = BeautifulSoup(data.page_content, "html.parser").find_all("div") snippets = get_pdf_snippets(content) filtered_snippets = filter_pdf_snippets(snippets, new_line_threshold_ratio=0.4) median_font_size = math.ceil( median([font_size for _, font_size in filtered_snippets]) ) semantic_snippets = get_pdf_semantic_snippets(filtered_snippets, median_font_size) document_snippets = [ Document( page_content=deep_strip(snip[1]["header_text"]) + " " + deep_strip(snip[0]), metadata={ "header": " ".join(snip[1]["header_text"].split()[:10]), "source_url": url, "source_type": "pdf", "chunk_id": i, }, ) for i, snip in enumerate(semantic_snippets) ] return document_snippets def get_pdf_snippets(content): current_font_size = None current_text = "" snippets = [] for cntnt in content: span = cntnt.find("span") if not span: continue style = span.get("style") if not style: continue font_size = re.findall("font-size:(\d+)px", style) if not font_size: continue font_size = int(font_size[0]) if not current_font_size: current_font_size = font_size if font_size == current_font_size: current_text += cntnt.text else: snippets.append((current_text, current_font_size)) current_font_size = font_size current_text = cntnt.text snippets.append((current_text, current_font_size)) return snippets def filter_pdf_snippets(content_list, new_line_threshold_ratio): filtered_list = [] for e, (content, font_size) in enumerate(content_list): newline_count = content.count("\n") total_chars = len(content) ratio = newline_count / total_chars if ratio <= new_line_threshold_ratio: filtered_list.append((content, font_size)) return filtered_list def get_pdf_semantic_snippets(filtered_snippets, median_font_size): semantic_snippets = [] current_header = None current_content = [] header_font_size = None content_font_sizes = [] for content, font_size in filtered_snippets: if font_size > median_font_size: if current_header is not None: metadata = { "header_font_size": header_font_size, "content_font_size": ( median(content_font_sizes) if content_font_sizes else None ), "header_text": current_header, } semantic_snippets.append((current_content, metadata)) current_content = [] content_font_sizes = [] current_header = content header_font_size = font_size else: content_font_sizes.append(font_size) if current_content: current_content += " " + content else: current_content = content if current_header is not None: metadata = { "header_font_size": header_font_size, "content_font_size": ( median(content_font_sizes) if content_font_sizes else None ), "header_text": current_header, } semantic_snippets.append((current_content, metadata)) return semantic_snippets def process_web(url): data = WebBaseLoader(url).load()[0] document_snippets = [ Document( page_content=deep_strip(data.page_content), metadata={ "header": data.metadata["title"], "source_url": url, "source_type": "web", "chunk_id": 0, }, ) ] return document_snippets