]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
add logging of completed files
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 import click
4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
9 import subprocess
10 from logging import error, info, debug, warning
11 import re
12
13
14 class ScannerWorkflowEvent(FileSystemEventHandler):
15     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
16
17     scanner_workflow = None
18
19     def __init__(self, scanner_workflow=None):
20         super().__init__()
21         self.scanner_workflow = scanner_workflow
22         if not self.scanner_workflow:
23             raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
24
25     def on_closed(self, event: FileSystemEvent):
26         if event.is_directory:
27             return
28         if not event.src_path.endswith(".pdf"):
29             return
30         self.scanner_workflow.process_pdf(event.src_path)
31
32
33 class ScannerWorkflow:
34     base_dir = None
35     failure_dir = None
36     output_dir = None
37     lock_file = None
38     input_dir = None
39     process_dir = None
40     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
41
42     def __init__(
43         self,
44         base_dir: Union[Path, str] = ".",
45         input_dir: Union[Path, str] = "input",
46         output_dir: Union[Path, str] = "output",
47         failure_dir: Union[Path, str] = "failure",
48         process_dir: Union[Path, str] = "process",
49         lock_file: Union[Path, str] = ".lock",
50     ):
51         def concat_if_not_abs(dir1: Path, dir2: Path):
52             if dir2.is_absolute():
53                 return dir2
54             else:
55                 return dir1 / dir2
56
57         super().__init__()
58         self.base_dir = Path(base_dir)
59         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
60         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
61         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
62         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
63         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
64         self.lock = FileLock(self.lock_file)
65         self.base_dir.mkdir(parents=True, exist_ok=True)
66         self.input_dir.mkdir(parents=True, exist_ok=True)
67         self.failure_dir.mkdir(parents=True, exist_ok=True)
68         self.process_dir.mkdir(parents=True, exist_ok=True)
69         self.output_dir.mkdir(parents=True, exist_ok=True)
70
71     def calculate_name(self, name: str):
72         res = re.match(
73             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
74             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
75             str(name),
76         )
77         if res:
78             name = (
79                 f"{res.group('scanner')}_"
80                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
81                 f"{res.group('time')}_{res.group('counter')}.pdf"
82             )
83         return name
84
85     def process_pdf(self, pdf_file: Union[Path, str]):
86         """Process a single PDF."""
87         pdf_file = Path(pdf_file)
88         orig_pdf = pdf_file
89         # move to the processing directory
90         pdf_file = pdf_file.rename(
91             self.process_dir / self.calculate_name(pdf_file.name)
92         )
93         output_file = self.output_dir / pdf_file.name
94         res = subprocess.run(["ocrmypdf", *self.ocrmypdf_opts, pdf_file, output_file])
95         if res.returncode != 0:
96             error(f"Unable to properly OCR pdf: {res.stdout} {res.stderr}")
97             return
98         pdf_file.unlink()
99         info("Processed {orig_pdf} into {output_file}")
100
101     def event_loop(self):
102         """Main event loop; called from the command line."""
103         ev = ScannerWorkflowEvent(scanner_workflow=self)
104         observer = Observer()
105         observer.schedule(ev, self.input_dir, recursive=True)
106         observer.start()
107         # process any PDFs in input_dir
108         for file in self.input_dir.iterdir():
109             self.process_pdf(file)
110         try:
111             while observer.is_alive():
112                 observer.join(1)
113         finally:
114             observer.stop()
115             observer.join()
116
117
118 @click.command()
119 @click.option(
120     "-i",
121     "--input-dir",
122     default="input",
123     help="Directory to look for incoming PDFs",
124 )
125 @click.option(
126     "-p",
127     "--process-dir",
128     default="process",
129     help="Directory to store PDFs being processed",
130 )
131 @click.option(
132     "-o",
133     "--output-dir",
134     default="output",
135     help="Directory to output OCRed PDFs",
136 )
137 @click.option(
138     "-f",
139     "--failure-dir",
140     default="failure",
141     help="Directory to store failed PDFs",
142 )
143 @click.option(
144     "-b",
145     "--base-dir",
146     default=".",
147     help="Base directory",
148 )
149 @click.option(
150     "-l",
151     "--lock-file",
152     default=".lock",
153     help="Lock file to ensure only one instance is running",
154 )
155 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
156     """OCR scanner output and save in directory"""
157     sw = ScannerWorkflow(
158         input_dir=input_dir,
159         process_dir=process_dir,
160         output_dir=output_dir,
161         failure_dir=failure_dir,
162         base_dir=base_dir,
163         lock_file=lock_file,
164     )
165     try:
166         with sw.lock.acquire(timeout=10):
167             sw.event_loop()
168     except Timeout:
169         print("Another instance holds the lock")
170         exit(1)
171
172
173 cli()