3 from __future__ import annotations
6 from watchdog.observers import Observer
7 from watchdog.events import FileSystemEventHandler, FileSystemEvent
8 from pathlib import Path
9 from typing import Union
10 from filelock import Timeout, FileLock
12 from logging import error, info, debug, warning
14 from time import sleep
17 class ScannerWorkflowEvent(FileSystemEventHandler):
18 """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
20 scanner_workflow: ScannerWorkflow
22 def __init__(self, scanner_workflow: ScannerWorkflow):
24 self.scanner_workflow = scanner_workflow
25 if not self.scanner_workflow:
26 raise Exception("No scanner_workflow passed to ScannerWorkflowEvent")
28 def on_any_event(self, event: FileSystemEvent):
29 if event.is_directory:
31 if not event.src_path.endswith(".pdf"):
33 pdf_file = Path(event.src_path)
35 self.scanner_workflow.process_pdf(pdf_file)
38 class ScannerWorkflow:
45 ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
49 base_dir: Union[Path, str] = ".",
50 input_dir: Union[Path, str] = "input",
51 output_dir: Union[Path, str] = "output",
52 failure_dir: Union[Path, str] = "failure",
53 process_dir: Union[Path, str] = "process",
54 lock_file: Union[Path, str] = ".lock",
56 def concat_if_not_abs(dir1: Path, dir2: Path):
57 if dir2.is_absolute():
63 self.base_dir = Path(base_dir)
64 self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
65 self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
66 self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
67 self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
68 self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
69 self.lock = FileLock(self.lock_file)
70 self.base_dir.mkdir(parents=True, exist_ok=True)
71 self.input_dir.mkdir(parents=True, exist_ok=True)
72 self.failure_dir.mkdir(parents=True, exist_ok=True)
73 self.process_dir.mkdir(parents=True, exist_ok=True)
74 self.output_dir.mkdir(parents=True, exist_ok=True)
76 def calculate_name(self, name: str):
78 r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
79 r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
84 f"{res.group('scanner')}_"
85 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
86 f"{res.group('time')}_{res.group('counter')}.pdf"
90 def pdf_file_path(self, name: str):
92 r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
93 r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
97 return f"{res.group('year')}"
100 def process_pdf(self, pdf_file: Union[Path, str]):
101 """Process a single PDF."""
102 pdf_file = Path(pdf_file)
104 # check that the pdf is good, otherwise wait to see if it
107 for i in range(1, 10):
108 check = subprocess.run(["qpdf", "--check", pdf_file])
109 if check.returncode == 0:
112 file_size = pdf_file.stat().st_size
113 # sleep in a loop for 10 seconds if the file size is still
117 new_size = pdf_file.stat().st_size
118 if new_size > file_size:
123 error(f"PDF was not good, skipping {orig_pdf} for now")
126 # move to the processing directory
127 output_path = self.pdf_file_path(pdf_file.name)
128 pdf_file = pdf_file.rename(
129 self.process_dir / self.calculate_name(pdf_file.name)
131 (self.output_dir / output_path).mkdir(parents=True, exist_ok=True)
132 output_file = self.output_dir / output_path / pdf_file.name
133 res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
134 if res.returncode != 0:
136 f"Unable to properly OCR pdf {orig_pdf} into {output_file}: {res.stdout} {res.stderr}"
140 info(f"Processed {orig_pdf} into {output_file}")
142 def event_loop(self):
143 """Main event loop; called from the command line."""
144 ev = ScannerWorkflowEvent(scanner_workflow=self)
145 observer = Observer()
146 observer.schedule(ev, self.input_dir, recursive=True)
148 # process any PDFs in input_dir
149 for file in self.input_dir.iterdir():
150 self.process_pdf(file)
152 while observer.is_alive():
164 help="Directory to look for incoming PDFs",
170 help="Directory to store PDFs being processed",
176 help="Directory to output OCRed PDFs",
182 help="Directory to store failed PDFs",
188 help="Base directory",
194 help="Lock file to ensure only one instance is running",
196 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
197 """OCR scanner output and save in directory"""
198 sw = ScannerWorkflow(
200 process_dir=process_dir,
201 output_dir=output_dir,
202 failure_dir=failure_dir,
207 with sw.lock.acquire(timeout=10):
210 print("Another instance holds the lock")