]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
sleep in a loop if the filesize is increasing
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 import click
4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
9 import subprocess
10 from logging import error, info, debug, warning
11 import re
12 from time import sleep
13
14
15 class ScannerWorkflowEvent(FileSystemEventHandler):
16     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
17
18     scanner_workflow = None
19
20     def __init__(self, scanner_workflow=None):
21         super().__init__()
22         self.scanner_workflow = scanner_workflow
23         if not self.scanner_workflow:
24             raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
25
26     def on_any_event(self, event: FileSystemEvent):
27         if event.is_directory:
28             return
29         if not event.src_path.endswith(".pdf"):
30             return
31         pdf_file = Path(event.src_path)
32         if pdf_file.exists():
33             self.scanner_workflow.process_pdf(pdf_file)
34
35
36 class ScannerWorkflow:
37     base_dir = None
38     failure_dir = None
39     output_dir = None
40     lock_file = None
41     input_dir = None
42     process_dir = None
43     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
44
45     def __init__(
46         self,
47         base_dir: Union[Path, str] = ".",
48         input_dir: Union[Path, str] = "input",
49         output_dir: Union[Path, str] = "output",
50         failure_dir: Union[Path, str] = "failure",
51         process_dir: Union[Path, str] = "process",
52         lock_file: Union[Path, str] = ".lock",
53     ):
54         def concat_if_not_abs(dir1: Path, dir2: Path):
55             if dir2.is_absolute():
56                 return dir2
57             else:
58                 return dir1 / dir2
59
60         super().__init__()
61         self.base_dir = Path(base_dir)
62         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
63         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
64         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
65         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
66         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
67         self.lock = FileLock(self.lock_file)
68         self.base_dir.mkdir(parents=True, exist_ok=True)
69         self.input_dir.mkdir(parents=True, exist_ok=True)
70         self.failure_dir.mkdir(parents=True, exist_ok=True)
71         self.process_dir.mkdir(parents=True, exist_ok=True)
72         self.output_dir.mkdir(parents=True, exist_ok=True)
73
74     def calculate_name(self, name: str):
75         res = re.match(
76             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
77             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
78             str(name),
79         )
80         if res:
81             name = (
82                 f"{res.group('scanner')}_"
83                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
84                 f"{res.group('time')}_{res.group('counter')}.pdf"
85             )
86         return name
87
88     def pdf_file_path(self, name: str):
89         res = re.match(
90             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
91             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
92             str(name),
93         )
94         if res:
95             return f"{res.group('year')}/{res.group('month')}_{res.group('day')}"
96         return ""
97
98     def process_pdf(self, pdf_file: Union[Path, str]):
99         """Process a single PDF."""
100         pdf_file = Path(pdf_file)
101         orig_pdf = pdf_file
102         # check that the pdf is good, otherwise wait to see if it
103         # might become good
104         pdf_good = False
105         for i in range(1, 10):
106             check = subprocess.run(["qpdf", "--check", pdf_file])
107             if check.returncode == 0:
108                 pdf_good = True
109                 break
110             file_size = pdf_file.stat().st_size
111             # sleep in a loop for 10 seconds if the file size is still
112             # increasing
113             while True:
114                 sleep(10)
115                 new_size = pdf_file.stat().st_size
116                 if new_size > file_size:
117                     file_size = new_size
118                 else:
119                     break
120         if not pdf_good:
121             error(f"PDF was not good, skipping {orig_pdf} for now")
122             return
123
124         # move to the processing directory
125         output_path = self.pdf_file_path(pdf_file.name)
126         pdf_file = pdf_file.rename(
127             self.process_dir / self.calculate_name(pdf_file.name)
128         )
129         (self.output_dir / output_path).mkdir(parents=True, exist_ok=True)
130         output_file = self.output_dir / output_path / pdf_file.name
131         res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
132         if res.returncode != 0:
133             error(
134                 f"Unable to properly OCR pdf {orig_pdf} into {output_file}: {res.stdout} {res.stderr}"
135             )
136             return
137         pdf_file.unlink()
138         info(f"Processed {orig_pdf} into {output_file}")
139
140     def event_loop(self):
141         """Main event loop; called from the command line."""
142         ev = ScannerWorkflowEvent(scanner_workflow=self)
143         observer = Observer()
144         observer.schedule(ev, self.input_dir, recursive=True)
145         observer.start()
146         # process any PDFs in input_dir
147         for file in self.input_dir.iterdir():
148             self.process_pdf(file)
149         try:
150             while observer.is_alive():
151                 observer.join(1)
152         finally:
153             observer.stop()
154             observer.join()
155
156
157 @click.command()
158 @click.option(
159     "-i",
160     "--input-dir",
161     default="input",
162     help="Directory to look for incoming PDFs",
163 )
164 @click.option(
165     "-p",
166     "--process-dir",
167     default="process",
168     help="Directory to store PDFs being processed",
169 )
170 @click.option(
171     "-o",
172     "--output-dir",
173     default="output",
174     help="Directory to output OCRed PDFs",
175 )
176 @click.option(
177     "-f",
178     "--failure-dir",
179     default="failure",
180     help="Directory to store failed PDFs",
181 )
182 @click.option(
183     "-b",
184     "--base-dir",
185     default=".",
186     help="Base directory",
187 )
188 @click.option(
189     "-l",
190     "--lock-file",
191     default=".lock",
192     help="Lock file to ensure only one instance is running",
193 )
194 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
195     """OCR scanner output and save in directory"""
196     sw = ScannerWorkflow(
197         input_dir=input_dir,
198         process_dir=process_dir,
199         output_dir=output_dir,
200         failure_dir=failure_dir,
201         base_dir=base_dir,
202         lock_file=lock_file,
203     )
204     try:
205         with sw.lock.acquire(timeout=10):
206             sw.event_loop()
207     except Timeout:
208         print("Another instance holds the lock")
209         exit(1)
210
211
212 cli()