Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
from pathlib import Path | |
from private_gpt.di import global_injector | |
from private_gpt.server.ingest.ingest_service import IngestService | |
from private_gpt.server.ingest.ingest_watcher import IngestWatcher | |
logger = logging.getLogger(__name__) | |
class LocalIngestWorker: | |
def __init__(self, ingest_service: IngestService) -> None: | |
self.ingest_service = ingest_service | |
self.total_documents = 0 | |
self.current_document_count = 0 | |
self._files_under_root_folder: list[Path] = [] | |
def _find_all_files_in_folder(self, root_path: Path, ignored: list[str]) -> None: | |
"""Search all files under the root folder recursively. | |
Count them at the same time | |
""" | |
for file_path in root_path.iterdir(): | |
if file_path.is_file() and file_path.name not in ignored: | |
self.total_documents += 1 | |
self._files_under_root_folder.append(file_path) | |
elif file_path.is_dir() and file_path.name not in ignored: | |
self._find_all_files_in_folder(file_path, ignored) | |
def ingest_folder(self, folder_path: Path, ignored: list[str]) -> None: | |
# Count total documents before ingestion | |
self._find_all_files_in_folder(folder_path, ignored) | |
self._ingest_all(self._files_under_root_folder) | |
def _ingest_all(self, files_to_ingest: list[Path]) -> None: | |
logger.info("Ingesting files=%s", [f.name for f in files_to_ingest]) | |
self.ingest_service.bulk_ingest([(str(p.name), p) for p in files_to_ingest]) | |
def ingest_on_watch(self, changed_path: Path) -> None: | |
logger.info("Detected change in at path=%s, ingesting", changed_path) | |
self._do_ingest_one(changed_path) | |
def _do_ingest_one(self, changed_path: Path) -> None: | |
try: | |
if changed_path.exists(): | |
logger.info(f"Started ingesting file={changed_path}") | |
self.ingest_service.ingest_file(changed_path.name, changed_path) | |
logger.info(f"Completed ingesting file={changed_path}") | |
except Exception: | |
logger.exception( | |
f"Failed to ingest document: {changed_path}, find the exception attached" | |
) | |
parser = argparse.ArgumentParser(prog="ingest_folder.py") | |
parser.add_argument("folder", help="Folder to ingest") | |
parser.add_argument( | |
"--watch", | |
help="Watch for changes", | |
action=argparse.BooleanOptionalAction, | |
default=False, | |
) | |
parser.add_argument( | |
"--ignored", | |
nargs="*", | |
help="List of files/directories to ignore", | |
default=[], | |
) | |
parser.add_argument( | |
"--log-file", | |
help="Optional path to a log file. If provided, logs will be written to this file.", | |
type=str, | |
default=None, | |
) | |
args = parser.parse_args() | |
# Set up logging to a file if a path is provided | |
if args.log_file: | |
file_handler = logging.FileHandler(args.log_file, mode="a") | |
file_handler.setFormatter( | |
logging.Formatter( | |
"[%(asctime)s.%(msecs)03d] [%(levelname)s] %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
) | |
logger.addHandler(file_handler) | |
if __name__ == "__main__": | |
root_path = Path(args.folder) | |
if not root_path.exists(): | |
raise ValueError(f"Path {args.folder} does not exist") | |
ingest_service = global_injector.get(IngestService) | |
worker = LocalIngestWorker(ingest_service) | |
worker.ingest_folder(root_path, args.ignored) | |
if args.ignored: | |
logger.info(f"Skipping following files and directories: {args.ignored}") | |
if args.watch: | |
logger.info(f"Watching {args.folder} for changes, press Ctrl+C to stop...") | |
directories_to_watch = [ | |
dir | |
for dir in root_path.iterdir() | |
if dir.is_dir() and dir.name not in args.ignored | |
] | |
watcher = IngestWatcher(args.folder, worker.ingest_on_watch) | |
watcher.start() | |