]> git.donarmstrong.com Git - scanner_workflow.git/blob - scanner_workflow.py
rename files to follow iso8601
[scanner_workflow.git] / scanner_workflow.py
1 #!/usr/bin/env python3
2
3 import click
4 from watchdog.observers import Observer
5 from watchdog.events import FileSystemEventHandler, FileSystemEvent
6 from pathlib import Path
7 from typing import Union
8 from filelock import Timeout, FileLock
9 import subprocess
10 from logging import error, info, debug, warning
11 import re
12
13
14 class ScannerWorkflowEvent(FileSystemEventHandler):
15     """Subclass of FileSystemEventHandler to handle OCRing PDFs"""
16
17     scanner_workflow = None
18
19     def __init__(self, scanner_workflow=None):
20         super().__init__()
21         self.scanner_workflow = scanner_workflow
22         if not self.scanner_workflow:
23             raise Error("No scanner_workflow passed to ScannerWorkflowEvent")
24
25     def on_closed(self, event: FileSystemEvent):
26         if event.is_directory:
27             return
28         if not event.src_path.endswith(".pdf"):
29             return
30         self.scanner_workflow.process_pdf(event.src_path)
31
32
33 class ScannerWorkflow:
34     base_dir = None
35     failure_dir = None
36     output_dir = None
37     lock_file = None
38     input_dir = None
39     process_dir = None
40     ocrmypdf_opts = ["-r", "-q", "--deskew", "--clean"]
41
42     def __init__(
43         self,
44         base_dir: Union[Path, str] = ".",
45         input_dir: Union[Path, str] = "input",
46         output_dir: Union[Path, str] = "output",
47         failure_dir: Union[Path, str] = "failure",
48         process_dir: Union[Path, str] = "process",
49         lock_file: Union[Path, str] = ".lock",
50     ):
51         def concat_if_not_abs(dir1: Path, dir2: Path):
52             if dir2.is_absolute():
53                 return dir2
54             else:
55                 return dir1 / dir2
56
57         super().__init__()
58         self.base_dir = Path(base_dir)
59         self.input_dir = concat_if_not_abs(self.base_dir, Path(input_dir))
60         self.output_dir = concat_if_not_abs(self.base_dir, Path(output_dir))
61         self.failure_dir = concat_if_not_abs(self.base_dir, Path(failure_dir))
62         self.process_dir = concat_if_not_abs(self.base_dir, Path(process_dir))
63         self.lock_file = concat_if_not_abs(self.base_dir, Path(lock_file))
64         self.lock = FileLock(self.lock_file)
65         self.base_dir.mkdir(parents=True, exist_ok=True)
66         self.input_dir.mkdir(parents=True, exist_ok=True)
67         self.failure_dir.mkdir(parents=True, exist_ok=True)
68         self.process_dir.mkdir(parents=True, exist_ok=True)
69         self.output_dir.mkdir(parents=True, exist_ok=True)
70
71     def calculate_name(self, name: str):
72         res = re.match(
73             r"(?P<scanner>[^_]+)_(?P<month>\d{2})(?P<day>\d{2})(?P<year>\d{4})_"
74             r"(?P<time>\d+)_(?P<counter>\d+)\.pdf",
75             str(name),
76         )
77         if res:
78             name = (
79                 f"{res.group('scanner')}_"
80                 f"{res.group('year')}{res.group('month')}{res.group('day')}_"
81                 f"{res.group('time')}_{res.group('counter')}.pdf"
82             )
83         return name
84
85     def process_pdf(self, pdf_file: Union[Path, str]):
86         """Process a single PDF."""
87         pdf_file = Path(pdf_file)
88         # move to the processing directory
89         pdf_file = pdf_file.rename(
90             self.process_dir / self.calculate_name(pdf_file.name)
91         )
92         res = subprocess.run(
93             ["ocrmypdf", *self.ocrmypdf_opts, pdf_file, self.output_dir / pdf_file.name]
94         )
95         if res.returncode != 0:
96             error(f"Unable to properly OCR pdf: {res.stdout} {res.stderr}")
97             return
98         pdf_file.unlink()
99
100     def event_loop(self):
101         """Main event loop; called from the command line."""
102         ev = ScannerWorkflowEvent(scanner_workflow=self)
103         observer = Observer()
104         observer.schedule(ev, self.input_dir, recursive=True)
105         observer.start()
106         # process any PDFs in input_dir
107         for file in self.input_dir.iterdir():
108             self.process_pdf(file)
109         try:
110             while observer.is_alive():
111                 observer.join(1)
112         finally:
113             observer.stop()
114             observer.join()
115
116
117 @click.command()
118 @click.option(
119     "-i",
120     "--input-dir",
121     default="input",
122     help="Directory to look for incoming PDFs",
123 )
124 @click.option(
125     "-p",
126     "--process-dir",
127     default="process",
128     help="Directory to store PDFs being processed",
129 )
130 @click.option(
131     "-o",
132     "--output-dir",
133     default="output",
134     help="Directory to output OCRed PDFs",
135 )
136 @click.option(
137     "-f",
138     "--failure-dir",
139     default="failure",
140     help="Directory to store failed PDFs",
141 )
142 @click.option(
143     "-b",
144     "--base-dir",
145     default=".",
146     help="Base directory",
147 )
148 @click.option(
149     "-l",
150     "--lock-file",
151     default=".lock",
152     help="Lock file to ensure only one instance is running",
153 )
154 def cli(input_dir, process_dir, output_dir, failure_dir, base_dir, lock_file):
155     """OCR scanner output and save in directory"""
156     sw = ScannerWorkflow(
157         input_dir=input_dir,
158         process_dir=process_dir,
159         output_dir=output_dir,
160         failure_dir=failure_dir,
161         base_dir=base_dir,
162         lock_file=lock_file,
163     )
164     try:
165         with sw.lock.acquire(timeout=10):
166             sw.event_loop()
167     except Timeout:
168         print("Another instance holds the lock")
169         exit(1)
170
171
172 cli()