]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
check to make sure the pdf is good before processing
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 import click
4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
9 import subprocess
10 from logging import error, info, debug, warning
11 import re
12 from time import sleep
13
14
15 class ScannerWorkflowEvent(FileSystemEventHandler):
16     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
17
18     scanner_workflow = None
19
20     def __init__(self, scanner_workflow=None):
21         super().__init__()
22         self.scanner_workflow = scanner_workflow
23         if not self.scanner_workflow:
24             raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
25
26     def on_any_event(self, event: FileSystemEvent):
27         if event.is_directory:
28             return
29         if not event.src_path.endswith(".pdf"):
30             return
31         pdf_file = Path(event.src_path)
32         if pdf_file.exists():
33             self.scanner_workflow.process_pdf(pdf_file)
34
35
36 class ScannerWorkflow:
37     base_dir = None
38     failure_dir = None
39     output_dir = None
40     lock_file = None
41     input_dir = None
42     process_dir = None
43     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
44
45     def __init__(
46         self,
47         base_dir: Union[Path, str] = ".",
48         input_dir: Union[Path, str] = "input",
49         output_dir: Union[Path, str] = "output",
50         failure_dir: Union[Path, str] = "failure",
51         process_dir: Union[Path, str] = "process",
52         lock_file: Union[Path, str] = ".lock",
53     ):
54         def concat_if_not_abs(dir1: Path, dir2: Path):
55             if dir2.is_absolute():
56                 return dir2
57             else:
58                 return dir1 / dir2
59
60         super().__init__()
61         self.base_dir = Path(base_dir)
62         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
63         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
64         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
65         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
66         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
67         self.lock = FileLock(self.lock_file)
68         self.base_dir.mkdir(parents=True, exist_ok=True)
69         self.input_dir.mkdir(parents=True, exist_ok=True)
70         self.failure_dir.mkdir(parents=True, exist_ok=True)
71         self.process_dir.mkdir(parents=True, exist_ok=True)
72         self.output_dir.mkdir(parents=True, exist_ok=True)
73
74     def calculate_name(self, name: str):
75         res = re.match(
76             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
77             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
78             str(name),
79         )
80         if res:
81             name = (
82                 f"{res.group('scanner')}_"
83                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
84                 f"{res.group('time')}_{res.group('counter')}.pdf"
85             )
86         return name
87
88     def pdf_file_path(self, name: str):
89         res = re.match(
90             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
91             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
92             str(name),
93         )
94         if res:
95             return f"{res.group('year')}/{res.group('month')}_{res.group('day')}"
96         return ""
97
98     def process_pdf(self, pdf_file: Union[Path, str]):
99         """Process a single PDF."""
100         pdf_file = Path(pdf_file)
101         orig_pdf = pdf_file
102         # check that the pdf is good, otherwise wait to see if it
103         # might become good
104         pdf_good = False
105         for i in range(1, 10):
106             check = subprocess.run(["qpdf", "--check", pdf_file])
107             if check.returncode == 0:
108                 pdf_good = True
109                 break
110             # sleep for 10 seconds if the PDF was bad
111             sleep(10)
112         if not pdf_good:
113             error(f"PDF was not good, skipping {orig_pdf} for now")
114             return
115
116         # move to the processing directory
117         output_path = self.pdf_file_path(pdf_file.name)
118         pdf_file = pdf_file.rename(
119             self.process_dir / self.calculate_name(pdf_file.name)
120         )
121         (self.output_dir / output_path).mkdir(parents=True, exist_ok=True)
122         output_file = self.output_dir / output_path / pdf_file.name
123         res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
124         if res.returncode != 0:
125             error(
126                 f"Unable to properly OCR pdf {orig_pdf} into {output_file}: {res.stdout} {res.stderr}"
127             )
128             return
129         pdf_file.unlink()
130         info(f"Processed {orig_pdf} into {output_file}")
131
132     def event_loop(self):
133         """Main event loop; called from the command line."""
134         ev = ScannerWorkflowEvent(scanner_workflow=self)
135         observer = Observer()
136         observer.schedule(ev, self.input_dir, recursive=True)
137         observer.start()
138         # process any PDFs in input_dir
139         for file in self.input_dir.iterdir():
140             self.process_pdf(file)
141         try:
142             while observer.is_alive():
143                 observer.join(1)
144         finally:
145             observer.stop()
146             observer.join()
147
148
149 @click.command()
150 @click.option(
151     "-i",
152     "--input-dir",
153     default="input",
154     help="Directory to look for incoming PDFs",
155 )
156 @click.option(
157     "-p",
158     "--process-dir",
159     default="process",
160     help="Directory to store PDFs being processed",
161 )
162 @click.option(
163     "-o",
164     "--output-dir",
165     default="output",
166     help="Directory to output OCRed PDFs",
167 )
168 @click.option(
169     "-f",
170     "--failure-dir",
171     default="failure",
172     help="Directory to store failed PDFs",
173 )
174 @click.option(
175     "-b",
176     "--base-dir",
177     default=".",
178     help="Base directory",
179 )
180 @click.option(
181     "-l",
182     "--lock-file",
183     default=".lock",
184     help="Lock file to ensure only one instance is running",
185 )
186 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
187     """OCR scanner output and save in directory"""
188     sw = ScannerWorkflow(
189         input_dir=input_dir,
190         process_dir=process_dir,
191         output_dir=output_dir,
192         failure_dir=failure_dir,
193         base_dir=base_dir,
194         lock_file=lock_file,
195     )
196     try:
197         with sw.lock.acquire(timeout=10):
198             sw.event_loop()
199     except Timeout:
200         print("Another instance holds the lock")
201         exit(1)
202
203
204 cli()