from pdfminer.high_level import extract_pages from pdfminer.layout import LTTextContainer from tqdm import tqdm import re import io import zipfile import gradio as gr import os from llama_cpp import Llama import tempfile def process_document(pdf_path, page_ids=None): extracted_pages = extract_pages(pdf_path, page_numbers=page_ids) page2content = {} # Process each extracted page for extracted_page in tqdm(extracted_pages): page_id = extracted_page.pageid content = process_page(extracted_page) page2content[page_id] = content return page2content def process_page(extracted_page): content = [] elements = [element for element in extracted_page._objs] elements.sort(key=lambda a: a.y1, reverse=True) for i, element in enumerate(elements): # Extract text if the element is a text container # and text extraction is enabled if isinstance(element, LTTextContainer): line_text = extract_text_and_normalize(element) content.append(line_text) content = re.sub('\n+', ' ', ''.join(content)) return content def extract_text_and_normalize(element): # Extract text from line and split it with new lines line_texts = element.get_text().split('\n') norm_text = '' for line_text in line_texts: line_text=line_text.strip() # empty strings after striping convert to newline character if not line_text: line_text = '\n' else: line_text = re.sub('\s+', ' ', line_text) # if the last character is not a letter or number, # add newline character to a line if not re.search('[\w\d\,\-]', line_text[-1]): line_text+='\n' else: line_text+=' ' # concatenate into single string norm_text+=line_text return norm_text def txt_to_html(text): html_content = "" for line in text.split('\n'): html_content += "

{}

".format(line.strip()) html_content += "" return html_content def deidentify_doc(pdftext="", prompt="", maxtokens=600, temperature=0.0001, top_probability=0.95): prompt = "Task: Please anonymize the following clinical note. Specific Rules: Replace all the following information with the term \"[redacted]\": 1. Redact any strings that is person name 2. Redact any medical staff names 3. Redact any strings that is location or address, such as \"3970 Longview Drive\" 4. Redact any strings that is age of person 5. Redact any dates and IDs 6. Redact clinic and hospital names 7. Redact professions such as \"manager\" 8. Redact any contact information" # output = model.create_chat_completion( # messages = [ # {"role": "assistant", "content": prompt}, # { # "role": "user", # "content": pdftext # } # ], # max_tokens=800, # temperature=0 # ) # output = output['choices'][0]['message']['content'] # if (pdftext): # prompt = prompt + ': ' + pdftext # output = model.generate(prompt=prompt, max_tokens=1024, n_batch=128) messages = [ {"role": "assistant", "content": prompt}, {"role": "user", "content": pdftext}, ] prompt = model.tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) terminators = [ model.tokenizer.eos_token_id, model.tokenizer.convert_tokens_to_ids("<|eot_id|>") ] outputs = model( prompt, max_new_tokens=maxtokens, eos_token_id=terminators, do_sample=True, temperature=temperature, top_p=top_probability, ) output = outputs[0]["generated_text"][len(prompt):] return output def mkdir(dir): if not os.path.exists(dir): os.makedirs(dir) def pdf_to_text(files, prompt="", maxtokens=600, temperature=1.2, top_probability=0.95): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf: for file in files: file_name = os.path.basename(file) file_name_splt = file_name.split('.') if(len(file_name_splt)>1 and file_name_splt[1]=='pdf'): page2content = process_document(file, page_ids=[0]) pdftext = page2content[1] if(pdftext): anonymized_text = deidentify_doc(pdftext, prompt, maxtokens, temperature, top_probability) zf.writestr(file_name_splt[0]+'.txt', anonymized_text) zip_buffer.seek(0) with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as temp_file: temp_file.write(zip_buffer.getvalue()) temp_file_path = temp_file.name return temp_file_path # model_id = "D:/llama/meta-llama/Meta-Llama-3-8B-Instruct.Q5_K_M.gguf" # model = Llama(model_path=model_id, n_ctx=2048, n_threads=8, n_gpu_layers=32, n_batch=64) # model = GPT4All("Meta-Llama-3-8B-Instruct.Q4_0.gguf", n_threads=8, device='gpu') # model.chat_session() model_id = "Meta-Llama-3-8B-Instruct" model = transformers.pipeline( "text-generation", model=model_id, model_kwargs={"torch_dtype": torch.bfloat16}, device="cuda", ) temp_slider = gr.Slider(minimum=0, maximum=2, value=0.2, label="Temperature Value") prob_slider = gr.Slider(minimum=0, maximum=1, value=0.95, label="Max Probability Value") max_tokens = gr.Number(value=600, label="Max Tokens") input_folder = gr.File(file_count='multiple') output_text = gr.Textbox() output_path_component = gr.File(label="Select Output Path") iface = gr.Interface( fn = pdf_to_text, inputs = ['files'], outputs='file', title='COBIx Endoscopy Report De-Identification', description="This application assists to remove personal information from the uploaded clinical report", theme=gr.themes.Soft(), ) iface.launch()