]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
add input/output files when reporting error message
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 import click
4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
9 import subprocess
10 from logging import error, info, debug, warning
11 import re
12
13
14 class ScannerWorkflowEvent(FileSystemEventHandler):
15     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
16
17     scanner_workflow = None
18
19     def __init__(self, scanner_workflow=None):
20         super().__init__()
21         self.scanner_workflow = scanner_workflow
22         if not self.scanner_workflow:
23             raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
24
25     def on_any_event(self, event: FileSystemEvent):
26         if event.is_directory:
27             return
28         if not event.src_path.endswith(".pdf"):
29             return
30         pdf_file = Path(event.src_path)
31         if pdf_file.exists():
32             self.scanner_workflow.process_pdf(pdf_file)
33
34
35 class ScannerWorkflow:
36     base_dir = None
37     failure_dir = None
38     output_dir = None
39     lock_file = None
40     input_dir = None
41     process_dir = None
42     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
43
44     def __init__(
45         self,
46         base_dir: Union[Path, str] = ".",
47         input_dir: Union[Path, str] = "input",
48         output_dir: Union[Path, str] = "output",
49         failure_dir: Union[Path, str] = "failure",
50         process_dir: Union[Path, str] = "process",
51         lock_file: Union[Path, str] = ".lock",
52     ):
53         def concat_if_not_abs(dir1: Path, dir2: Path):
54             if dir2.is_absolute():
55                 return dir2
56             else:
57                 return dir1 / dir2
58
59         super().__init__()
60         self.base_dir = Path(base_dir)
61         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
62         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
63         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
64         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
65         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
66         self.lock = FileLock(self.lock_file)
67         self.base_dir.mkdir(parents=True, exist_ok=True)
68         self.input_dir.mkdir(parents=True, exist_ok=True)
69         self.failure_dir.mkdir(parents=True, exist_ok=True)
70         self.process_dir.mkdir(parents=True, exist_ok=True)
71         self.output_dir.mkdir(parents=True, exist_ok=True)
72
73     def calculate_name(self, name: str):
74         res = re.match(
75             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
76             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
77             str(name),
78         )
79         if res:
80             name = (
81                 f"{res.group('scanner')}_"
82                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
83                 f"{res.group('time')}_{res.group('counter')}.pdf"
84             )
85         return name
86
87     def pdf_file_path(self, name: str):
88         res = re.match(
89             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
90             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
91             str(name),
92         )
93         if res:
94             return f"{res.group('year')}/{res.group('month')}_{res.group('day')}"
95         return ""
96
97     def process_pdf(self, pdf_file: Union[Path, str]):
98         """Process a single PDF."""
99         pdf_file = Path(pdf_file)
100         orig_pdf = pdf_file
101         # move to the processing directory
102         output_path = self.pdf_file_path(pdf_file.name)
103         pdf_file = pdf_file.rename(
104             self.process_dir / self.calculate_name(pdf_file.name)
105         )
106         (self.output_dir / output_path).mkdir(parents=True, exist_ok=True)
107         output_file = self.output_dir / output_path / pdf_file.name
108         res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
109         if res.returncode != 0:
110             error(
111                 f"Unable to properly OCR pdf {orig_pdf} into {output_file}: {res.stdout} {res.stderr}"
112             )
113             return
114         pdf_file.unlink()
115         info(f"Processed {orig_pdf} into {output_file}")
116
117     def event_loop(self):
118         """Main event loop; called from the command line."""
119         ev = ScannerWorkflowEvent(scanner_workflow=self)
120         observer = Observer()
121         observer.schedule(ev, self.input_dir, recursive=True)
122         observer.start()
123         # process any PDFs in input_dir
124         for file in self.input_dir.iterdir():
125             self.process_pdf(file)
126         try:
127             while observer.is_alive():
128                 observer.join(1)
129         finally:
130             observer.stop()
131             observer.join()
132
133
134 @click.command()
135 @click.option(
136     "-i",
137     "--input-dir",
138     default="input",
139     help="Directory to look for incoming PDFs",
140 )
141 @click.option(
142     "-p",
143     "--process-dir",
144     default="process",
145     help="Directory to store PDFs being processed",
146 )
147 @click.option(
148     "-o",
149     "--output-dir",
150     default="output",
151     help="Directory to output OCRed PDFs",
152 )
153 @click.option(
154     "-f",
155     "--failure-dir",
156     default="failure",
157     help="Directory to store failed PDFs",
158 )
159 @click.option(
160     "-b",
161     "--base-dir",
162     default=".",
163     help="Base directory",
164 )
165 @click.option(
166     "-l",
167     "--lock-file",
168     default=".lock",
169     help="Lock file to ensure only one instance is running",
170 )
171 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
172     """OCR scanner output and save in directory"""
173     sw = ScannerWorkflow(
174         input_dir=input_dir,
175         process_dir=process_dir,
176         output_dir=output_dir,
177         failure_dir=failure_dir,
178         base_dir=base_dir,
179         lock_file=lock_file,
180     )
181     try:
182         with sw.lock.acquire(timeout=10):
183             sw.event_loop()
184     except Timeout:
185         print("Another instance holds the lock")
186         exit(1)
187
188
189 cli()