4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
10 from logging import error, info, debug, warning
14 class ScannerWorkflowEvent(FileSystemEventHandler):
15 """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
17 scanner_workflow = None
19 def __init__(self, scanner_workflow=None):
21 self.scanner_workflow = scanner_workflow
22 if not self.scanner_workflow:
23 raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
25 def on_any_event(self, event: FileSystemEvent):
26 if event.is_directory:
28 if not event.src_path.endswith(".pdf"):
30 pdf_file = Path(event.src_path)
32 self.scanner_workflow.process_pdf(pdf_file)
35 class ScannerWorkflow:
42 ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
46 base_dir: Union[Path, str] = ".",
47 input_dir: Union[Path, str] = "input",
48 output_dir: Union[Path, str] = "output",
49 failure_dir: Union[Path, str] = "failure",
50 process_dir: Union[Path, str] = "process",
51 lock_file: Union[Path, str] = ".lock",
53 def concat_if_not_abs(dir1: Path, dir2: Path):
54 if dir2.is_absolute():
60 self.base_dir = Path(base_dir)
61 self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
62 self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
63 self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
64 self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
65 self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
66 self.lock = FileLock(self.lock_file)
67 self.base_dir.mkdir(parents=True, exist_ok=True)
68 self.input_dir.mkdir(parents=True, exist_ok=True)
69 self.failure_dir.mkdir(parents=True, exist_ok=True)
70 self.process_dir.mkdir(parents=True, exist_ok=True)
71 self.output_dir.mkdir(parents=True, exist_ok=True)
73 def calculate_name(self, name: str):
75 r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
76 r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
81 f"{res.group('scanner')}_"
82 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
83 f"{res.group('time')}_{res.group('counter')}.pdf"
87 def process_pdf(self, pdf_file: Union[Path, str]):
88 """Process a single PDF."""
89 pdf_file = Path(pdf_file)
91 # move to the processing directory
92 pdf_file = pdf_file.rename(
93 self.process_dir / self.calculate_name(pdf_file.name)
95 output_file = self.output_dir / pdf_file.name
96 res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
97 if res.returncode != 0:
98 error(f"Unable to properly OCR pdf: {res.stdout} {res.stderr}")
101 info("Processed {orig_pdf} into {output_file}")
103 def event_loop(self):
104 """Main event loop; called from the command line."""
105 ev = ScannerWorkflowEvent(scanner_workflow=self)
106 observer = Observer()
107 observer.schedule(ev, self.input_dir, recursive=True)
109 # process any PDFs in input_dir
110 for file in self.input_dir.iterdir():
111 self.process_pdf(file)
113 while observer.is_alive():
125 help="Directory to look for incoming PDFs",
131 help="Directory to store PDFs being processed",
137 help="Directory to output OCRed PDFs",
143 help="Directory to store failed PDFs",
149 help="Base directory",
155 help="Lock file to ensure only one instance is running",
157 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
158 """OCR scanner output and save in directory"""
159 sw = ScannerWorkflow(
161 process_dir=process_dir,
162 output_dir=output_dir,
163 failure_dir=failure_dir,
168 with sw.lock.acquire(timeout=10):
171 print("Another instance holds the lock")