| import gradio as gr |
| import os |
| import shutil |
| import requests |
| import zipfile |
| from PyPDF2 import PdfFileReader, PdfFileWriter |
| import PyPDF2 |
| from io import BytesIO |
| from reportlab.lib.pagesizes import letter |
| from reportlab.platypus import SimpleDocTemplate,Preformatted |
| from reportlab.platypus import Image as RLImage |
| from reportlab.platypus import Paragraph, Spacer |
| from reportlab.lib.styles import getSampleStyleSheet |
| from reportlab.lib.utils import ImageReader |
| from PIL import Image |
| import os |
| from langchain.indexes.vectorstore import VectorstoreIndexCreator |
| from langchain.chains import VectorDBQA,VectorDBQAWithSourcesChain |
| from langchain import OpenAI |
| from langchain.document_loaders import UnstructuredPDFLoader |
| from langchain.vectorstores.faiss import FAISS |
| from langchain.embeddings.openai import OpenAIEmbeddings |
| from flask import send_file |
| |
|
|
|
|
| class REPOGPT: |
| def __init__(self) -> None: |
| |
| self.repo_link = None |
| self.api_key = None |
|
|
| def init_agent(self, api_key, repo_link = None, load_vectorstore = None): |
| try: |
| os.remove('merged.pdf') |
| except: |
| pass |
| self.repo_link = repo_link |
| self.api_key = api_key |
| self.load_vectorstore = load_vectorstore |
| |
| assert self.api_key != None, "You need to provide an API key" |
| self.REPOGPT_Initialized() |
| return gr.update(visible = True),'Initialize Finished' |
|
|
|
|
|
|
| def REPOGPT_Initialized(self,image_included = False): |
|
|
| |
| os.environ["OPENAI_API_KEY"] = self.api_key |
| if self.load_vectorstore == None: |
| loader = UnstructuredPDFLoader( self.create_repo_pdf(self.repo_link,image_included = image_included)) |
| |
| self.index = VectorstoreIndexCreator(vectorstore_cls = FAISS).from_loaders([loader]) |
| self.vectorstore = self.index.vectorstore |
| print(' vectorstore created') |
| else: |
| embeddings = OpenAIEmbeddings() |
| self.vectorstore = FAISS.load_local(self.load_vectorstore,embeddings =embeddings) |
| print(' vectorstore loaded') |
|
|
| self.qa = VectorDBQA.from_chain_type(llm =OpenAI(temperature=0, model_name="gpt-3.5-turbo"), chain_type = "stuff",vectorstore = self.vectorstore ) |
|
|
| |
|
|
|
|
|
|
| def download_repo_zip(self, link, output_folder = "main.zip"): |
| username = link.split('/')[3] |
| repo = link.split('/')[4] |
| zip_url = f"https://github.com/{username}/{repo}/archive/refs/heads/master.zip" |
| self.zip_url = zip_url |
| response = requests.get(zip_url) |
| response.raise_for_status() |
| |
| with open('main.zip', 'wb') as f: |
| f.write(response.content) |
| |
| |
| |
|
|
| def extract_zip(self, zip_file, destination_folder): |
| with zipfile.ZipFile(zip_file) as zf: |
| zf.extractall(destination_folder) |
| |
| folder_name = zf.namelist()[0] |
| return folder_name |
|
|
| def convert_to_pdf(self, input_path, output_path): |
| if input_path.endswith(".pdf"): |
| |
| buffer = BytesIO() |
| doc = SimpleDocTemplate(buffer, pagesize=letter) |
| styles = getSampleStyleSheet() |
| elements = [] |
| heading = Paragraph(f"File path: {input_path}", styles["Heading2"]) |
| elements.append(heading) |
| elements.append(Spacer(1, 12)) |
| doc.build(elements) |
|
|
| |
| buffer.seek(0) |
| new_pdf = PdfFileReader(buffer) |
|
|
| |
| with open(input_path, "rb") as f: |
| input_pdf = PdfFileReader(f) |
|
|
| |
| pdf_writer = PdfFileWriter() |
| for page_num in range(new_pdf.getNumPages()): |
| pdf_writer.addPage(new_pdf.getPage(page_num)) |
|
|
| for page_num in range(input_pdf.getNumPages()): |
| pdf_writer.addPage(input_pdf.getPage(page_num)) |
|
|
| |
| with open(output_path, "wb") as f: |
| pdf_writer.write(f) |
|
|
| elif input_path.lower().endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff")): |
| img = Image.open(input_path) |
| img_reader = ImageReader(img) |
| img_width, img_height = img.size |
| aspect_ratio = img_height / img_width |
|
|
|
|
| max_pdf_width = letter[0] - 2 * 72 |
| max_pdf_height = letter[1] - 2 * 72 |
|
|
| if img_width > max_pdf_width: |
| img_width = max_pdf_width |
| img_height = img_width * aspect_ratio |
| if img_height > max_pdf_height: |
| img_height = max_pdf_height |
| img_width = img_height / aspect_ratio |
| img_width = int(img_width) |
| img_height = int(img_height) |
| |
| img = img.resize((int(img_width), int(img_height))) |
|
|
| img = img.resize((int(img_width), int(img_height))) |
| |
| img.save(output_path, "PNG") |
| |
| doc = SimpleDocTemplate(output_path, pagesize=letter) |
| styles = getSampleStyleSheet() |
|
|
| elements = [] |
| heading = Paragraph(f" {input_path}", styles["Heading2"]) |
| elements.append(heading) |
| elements.append(Spacer(1, 12)) |
|
|
| img_rl = RLImage(input_path, width=img_width, height=img_height, kind='proportional') |
| elements.append(img_rl) |
|
|
| doc.build(elements) |
|
|
| else: |
| with open(input_path, "r") as f: |
| content = f.read() |
| |
| doc = SimpleDocTemplate(output_path, pagesize=letter) |
| styles = getSampleStyleSheet() |
| elements = [] |
|
|
| |
| heading = Paragraph(f"{input_path}", styles["Heading2"]) |
| elements.append(heading) |
| elements.append(Spacer(1, 12)) |
|
|
| |
| text = Preformatted(content, style=styles["Code"]) |
| elements.append(text) |
|
|
| doc.build(elements) |
|
|
| def merge_pdfs(self, pdf_files, output_path): |
| pdf_writer = PyPDF2.PdfWriter() |
| for pdf_file in pdf_files: |
| with open(pdf_file, "rb") as f: |
| try: |
| pdf_reader = PyPDF2.PdfReader(f) |
| if pdf_reader.is_encrypted: |
| print(f"{pdf_file} is encrypted. Skipping.") |
| continue |
| except: |
| print(f"{pdf_file} is not a valid PDF. Skipping.") |
| continue |
| |
| |
| for page_num in range(len(pdf_reader.pages)): |
| pdf_writer.add_page(pdf_reader.pages[page_num]) |
|
|
| with open(output_path, "wb") as f: |
| pdf_writer.write(f) |
|
|
| def get_pdf(self): |
| return self.merged_pdf_path |
|
|
| def save_indexDB(self,save_path = 'indexDB.json'): |
| self.vectorstore.save_local(save_path) |
| print("indexDB saved at: ", save_path) |
|
|
| |
|
|
| def create_repo_pdf(self, repo_link, image_included = False, merged_pdf = "temp_merged.pdf"): |
| self.merged_pdf_path = merged_pdf |
| self.download_repo_zip(repo_link) |
| folder_name = self.extract_zip('./main.zip', './') |
| ingnore_list = ['__pycache__',] |
| if not image_included: |
| ingnore_list.append('.jpg') |
| ingnore_list.append('.png') |
| ingnore_list.append('.jpeg') |
| ingnore_list.append('.gif') |
| ingnore_list.append('.bmp') |
| ingnore_list.append('.tiff') |
|
|
| print('folder_name: ', folder_name) |
| pdf_files = [] |
| for root, dirs, files in os.walk(folder_name): |
| for file in files: |
| |
| input_file = os.path.join(root, file) |
| |
| if any(x in input_file for x in ingnore_list): |
| continue |
| |
| os.makedirs("temp", exist_ok=True) |
| output_file = os.path.join("temp", os.path.splitext(file)[0] + ".pdf") |
|
|
| try: |
| self.convert_to_pdf(input_file, output_file) |
| except: |
| print("Error converting file: ", input_file) |
| continue |
| pdf_files.append(output_file) |
|
|
| |
|
|
| self.merge_pdfs(pdf_files, self.merged_pdf_path) |
| |
| os.remove("main.zip") |
| shutil.rmtree(folder_name) |
| shutil.rmtree("temp") |
|
|
| return self.merged_pdf_path |
|
|
|
|
| def Answer_quetsion(self, question): |
| return self.qa.run(question) |
| |
| def Answer_quetsion_with_source(self, question): |
| return self.qa({"question": question}, return_only_outputs = True) |
|
|
|
|
|
|
| def call_output(string = 'REPOGPT Initializing'): |
| return string |
|
|
| def download_file(filename = 'merged.pdf'): |
| |
| return send_file(filename, as_attachment=True) |
|
|
|
|
| repogpt = REPOGPT() |
|
|
|
|
| with gr.Blocks() as demo: |
| with gr.Row(): |
| gr.Markdown("<h3><center>REPOGPT</center></h3>") |
| gr.Markdown( |
| """This is a demo to the work [REPOGPT](https://github.com/wuchangsheng951/RepoGPT).<br> |
| This space connects ChatGPT and RepoGPT is a Python library that allows you to search and answer questions about a GitHub repository's content.<br> |
| """ |
| ) |
| with gr.Row(): |
| apikey = gr.Textbox( |
| placeholder="Paste your OpenAI API key here to start Visual ChatGPT(sk-...) and press Enter ↵️", |
| show_label=True, |
| label = 'OpenAI API key', |
| lines=1, |
| type="password", |
| ) |
| with gr.Row(): |
| repo_link = gr.Textbox( |
| placeholder="Paste your repo_link and press Enter ↵️", |
| label = 'repo_link like: https://github.com/wuchangsheng951/RepoGPT', |
|
|
| show_label=True, |
| lines=1, |
| ) |
|
|
| with gr.Column(scale=0.7): |
| Initialize = gr.Button("Initialize RepoGPT") |
|
|
| output = gr.Textbox(label="Output Box") |
|
|
| with gr.Row(visible=False) as input_raws: |
| with gr.Column(scale=0.7): |
| txt = gr.Textbox(show_label=False, placeholder="Enter your question").style(container=False) |
|
|
| with gr.Column(scale=0.4): |
| AQ = gr.Button("Ask a Question").style(container=False) |
|
|
| |
| |
|
|
|
|
| gr.Examples( |
| examples=["Whats the name of this repo?", |
| "Whats this repo for?", |
| "How can I use this. Example code ? Step by step", |
| "how can I use this Experiment trackers ? Step by step", |
| "how can I Performing gradient accumulation with Accelerate? Step by step?", |
| "Make it like water-color painting", |
| "What is the background color", |
| "Describe this image", |
| "please detect the depth of this image", |
| "Can you use this depth image to generate a cute dog", |
| ], |
| inputs=txt |
| ) |
|
|
| apikey.submit(repogpt.init_agent, [apikey,repo_link], [input_raws, output]) |
| Initialize.click(repogpt.init_agent, [apikey,repo_link], [input_raws, output]) |
| apikey.submit(call_output, [],[output]) |
| txt.submit(repogpt.Answer_quetsion, [txt], [output]) |
| AQ.click(repogpt.Answer_quetsion, [txt], [output]) |
| |
|
|
|
|
| demo.launch() |