]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
just use the year for the path
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 from __future__ import annotations
4
5 import click
6 from watchdog.observers import Observer
7 from watchdog.events import FileSystemEventHandler, FileSystemEvent
8 from pathlib import Path
9 from typing import Union
10 from filelock import Timeout, FileLock
11 import subprocess
12 from logging import error, info, debug, warning
13 import re
14 from time import sleep
15
16
17 class ScannerWorkflowEvent(FileSystemEventHandler):
18     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
19
20     scanner_workflow: ScannerWorkflow
21
22     def __init__(self, scanner_workflow: ScannerWorkflow):
23         super().__init__()
24         self.scanner_workflow = scanner_workflow
25         if not self.scanner_workflow:
26             raise Exception("No scanner_workflow passed to ScannerWorkflowEvent")
27
28     def on_any_event(self, event: FileSystemEvent):
29         if event.is_directory:
30             return
31         if not event.src_path.endswith(".pdf"):
32             return
33         pdf_file = Path(event.src_path)
34         if pdf_file.exists():
35             self.scanner_workflow.process_pdf(pdf_file)
36
37
38 class ScannerWorkflow:
39     base_dir = None
40     failure_dir = None
41     output_dir = None
42     lock_file = None
43     input_dir = None
44     process_dir = None
45     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
46
47     def __init__(
48         self,
49         base_dir: Union[Path, str] = ".",
50         input_dir: Union[Path, str] = "input",
51         output_dir: Union[Path, str] = "output",
52         failure_dir: Union[Path, str] = "failure",
53         process_dir: Union[Path, str] = "process",
54         lock_file: Union[Path, str] = ".lock",
55     ):
56         def concat_if_not_abs(dir1: Path, dir2: Path):
57             if dir2.is_absolute():
58                 return dir2
59             else:
60                 return dir1 / dir2
61
62         super().__init__()
63         self.base_dir = Path(base_dir)
64         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
65         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
66         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
67         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
68         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
69         self.lock = FileLock(self.lock_file)
70         self.base_dir.mkdir(parents=True, exist_ok=True)
71         self.input_dir.mkdir(parents=True, exist_ok=True)
72         self.failure_dir.mkdir(parents=True, exist_ok=True)
73         self.process_dir.mkdir(parents=True, exist_ok=True)
74         self.output_dir.mkdir(parents=True, exist_ok=True)
75
76     def calculate_name(self, name: str):
77         res = re.match(
78             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
79             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
80             str(name),
81         )
82         if res:
83             name = (
84                 f"{res.group('scanner')}_"
85                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
86                 f"{res.group('time')}_{res.group('counter')}.pdf"
87             )
88         return name
89
90     def pdf_file_path(self, name: str):
91         res = re.match(
92             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
93             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
94             str(name),
95         )
96         if res:
97             return f"{res.group('year')}"
98         return ""
99
100     def process_pdf(self, pdf_file: Union[Path, str]):
101         """Process a single PDF."""
102         pdf_file = Path(pdf_file)
103         orig_pdf = pdf_file
104         # check that the pdf is good, otherwise wait to see if it
105         # might become good
106         pdf_good = False
107         for i in range(1, 10):
108             check = subprocess.run(["qpdf", "--check", pdf_file])
109             if check.returncode == 0:
110                 pdf_good = True
111                 break
112             file_size = pdf_file.stat().st_size
113             # sleep in a loop for 10 seconds if the file size is still
114             # increasing
115             while True:
116                 sleep(10)
117                 new_size = pdf_file.stat().st_size
118                 if new_size > file_size:
119                     file_size = new_size
120                 else:
121                     break
122         if not pdf_good:
123             error(f"PDF was not good, skipping {orig_pdf} for now")
124             return
125
126         # move to the processing directory
127         output_path = self.pdf_file_path(pdf_file.name)
128         pdf_file = pdf_file.rename(
129             self.process_dir / self.calculate_name(pdf_file.name)
130         )
131         (self.output_dir / output_path).mkdir(parents=True, exist_ok=True)
132         output_file = self.output_dir / output_path / pdf_file.name
133         res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
134         if res.returncode != 0:
135             error(
136                 f"Unable to properly OCR pdf {orig_pdf} into {output_file}: {res.stdout} {res.stderr}"
137             )
138             return
139         pdf_file.unlink()
140         info(f"Processed {orig_pdf} into {output_file}")
141
142     def event_loop(self):
143         """Main event loop; called from the command line."""
144         ev = ScannerWorkflowEvent(scanner_workflow=self)
145         observer = Observer()
146         observer.schedule(ev, self.input_dir, recursive=True)
147         observer.start()
148         # process any PDFs in input_dir
149         for file in self.input_dir.iterdir():
150             self.process_pdf(file)
151         try:
152             while observer.is_alive():
153                 observer.join(1)
154         finally:
155             observer.stop()
156             observer.join()
157
158
159 @click.command()
160 @click.option(
161     "-i",
162     "--input-dir",
163     default="input",
164     help="Directory to look for incoming PDFs",
165 )
166 @click.option(
167     "-p",
168     "--process-dir",
169     default="process",
170     help="Directory to store PDFs being processed",
171 )
172 @click.option(
173     "-o",
174     "--output-dir",
175     default="output",
176     help="Directory to output OCRed PDFs",
177 )
178 @click.option(
179     "-f",
180     "--failure-dir",
181     default="failure",
182     help="Directory to store failed PDFs",
183 )
184 @click.option(
185     "-b",
186     "--base-dir",
187     default=".",
188     help="Base directory",
189 )
190 @click.option(
191     "-l",
192     "--lock-file",
193     default=".lock",
194     help="Lock file to ensure only one instance is running",
195 )
196 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
197     """OCR scanner output and save in directory"""
198     sw = ScannerWorkflow(
199         input_dir=input_dir,
200         process_dir=process_dir,
201         output_dir=output_dir,
202         failure_dir=failure_dir,
203         base_dir=base_dir,
204         lock_file=lock_file,
205     )
206     try:
207         with sw.lock.acquire(timeout=10):
208             sw.event_loop()
209     except Timeout:
210         print("Another instance holds the lock")
211         exit(1)
212
213
214 cli()