]> git.donarmstrong.com Git - qmk_firmware.git/blob - lib/python/milc.py
QMK CLI and JSON keymap support (#6176)
[qmk_firmware.git] / lib / python / milc.py
1 #!/usr/bin/env python3
2 # coding=utf-8
3 """MILC - A CLI Framework
4
5 PYTHON_ARGCOMPLETE_OK
6
7 MILC is an opinionated framework for writing CLI apps. It optimizes for the
8 most common unix tool pattern- small tools that are run from the command
9 line but generally do not feature any user interaction while they run.
10
11 For more details see the MILC documentation:
12
13     <https://github.com/clueboard/milc/tree/master/docs>
14 """
15 from __future__ import division, print_function, unicode_literals
16 import argparse
17 import logging
18 import os
19 import re
20 import sys
21 from decimal import Decimal
22 from tempfile import NamedTemporaryFile
23 from time import sleep
24
25 try:
26     from ConfigParser import RawConfigParser
27 except ImportError:
28     from configparser import RawConfigParser
29
30 try:
31     import thread
32     import threading
33 except ImportError:
34     thread = None
35
36 import argcomplete
37 import colorama
38
39 # Log Level Representations
40 EMOJI_LOGLEVELS = {
41     'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}',
42     'ERROR': '{fg_red}☒{style_reset_all}',
43     'WARNING': '{fg_yellow}⚠{style_reset_all}',
44     'INFO': '{fg_blue}ℹ{style_reset_all}',
45     'DEBUG': '{fg_cyan}☐{style_reset_all}',
46     'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯'
47 }
48 EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL']
49 EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING']
50
51 # ANSI Color setup
52 # Regex was gratefully borrowed from kfir on stackoverflow:
53 # https://stackoverflow.com/a/45448194
54 ansi_regex = r'\x1b(' \
55              r'(\[\??\d+[hl])|' \
56              r'([=<>a-kzNM78])|' \
57              r'([\(\)][a-b0-2])|' \
58              r'(\[\d{0,2}[ma-dgkjqi])|' \
59              r'(\[\d+;\d+[hfy]?)|' \
60              r'(\[;?[hf])|' \
61              r'(#[3-68])|' \
62              r'([01356]n)|' \
63              r'(O[mlnp-z]?)|' \
64              r'(/Z)|' \
65              r'(\d+)|' \
66              r'(\[\?\d;\d0c)|' \
67              r'(\d;\dR))'
68 ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE)
69 ansi_styles = (
70     ('fg', colorama.ansi.AnsiFore()),
71     ('bg', colorama.ansi.AnsiBack()),
72     ('style', colorama.ansi.AnsiStyle()),
73 )
74 ansi_colors = {}
75
76 for prefix, obj in ansi_styles:
77     for color in [x for x in obj.__dict__ if not x.startswith('_')]:
78         ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color)
79
80
81 def format_ansi(text):
82     """Return a copy of text with certain strings replaced with ansi.
83     """
84     # Avoid .format() so we don't have to worry about the log content
85     for color in ansi_colors:
86         text = text.replace('{%s}' % color, ansi_colors[color])
87     return text + ansi_colors['style_reset_all']
88
89
90 class ANSIFormatter(logging.Formatter):
91     """A log formatter that inserts ANSI color.
92     """
93
94     def format(self, record):
95         msg = super(ANSIFormatter, self).format(record)
96         return format_ansi(msg)
97
98
99 class ANSIEmojiLoglevelFormatter(ANSIFormatter):
100     """A log formatter that makes the loglevel an emoji.
101     """
102
103     def format(self, record):
104         record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors)
105         return super(ANSIEmojiLoglevelFormatter, self).format(record)
106
107
108 class ANSIStrippingFormatter(ANSIFormatter):
109     """A log formatter that strips ANSI.
110     """
111
112     def format(self, record):
113         msg = super(ANSIStrippingFormatter, self).format(record)
114         return ansi_escape.sub('', msg)
115
116
117 class Configuration(object):
118     """Represents the running configuration.
119
120     This class never raises IndexError, instead it will return None if a
121     section or option does not yet exist.
122     """
123
124     def __contains__(self, key):
125         return self._config.__contains__(key)
126
127     def __iter__(self):
128         return self._config.__iter__()
129
130     def __len__(self):
131         return self._config.__len__()
132
133     def __repr__(self):
134         return self._config.__repr__()
135
136     def keys(self):
137         return self._config.keys()
138
139     def items(self):
140         return self._config.items()
141
142     def values(self):
143         return self._config.values()
144
145     def __init__(self, *args, **kwargs):
146         self._config = {}
147         self.default_container = ConfigurationOption
148
149     def __getitem__(self, key):
150         """Returns a config section, creating it if it doesn't exist yet.
151         """
152         if key not in self._config:
153             self.__dict__[key] = self._config[key] = ConfigurationOption()
154
155         return self._config[key]
156
157     def __setitem__(self, key, value):
158         self.__dict__[key] = value
159         self._config[key] = value
160
161     def __delitem__(self, key):
162         if key in self.__dict__ and key[0] != '_':
163             del self.__dict__[key]
164         del self._config[key]
165
166
167 class ConfigurationOption(Configuration):
168     def __init__(self, *args, **kwargs):
169         super(ConfigurationOption, self).__init__(*args, **kwargs)
170         self.default_container = dict
171
172     def __getitem__(self, key):
173         """Returns a config section, creating it if it doesn't exist yet.
174         """
175         if key not in self._config:
176             self.__dict__[key] = self._config[key] = None
177
178         return self._config[key]
179
180
181 def handle_store_boolean(self, *args, **kwargs):
182     """Does the add_argument for action='store_boolean'.
183     """
184     kwargs['add_dest'] = False
185     disabled_args = None
186     disabled_kwargs = kwargs.copy()
187     disabled_kwargs['action'] = 'store_false'
188     disabled_kwargs['help'] = 'Disable ' + kwargs['help']
189     kwargs['action'] = 'store_true'
190     kwargs['help'] = 'Enable ' + kwargs['help']
191
192     for flag in args:
193         if flag[:2] == '--':
194             disabled_args = ('--no-' + flag[2:],)
195             break
196
197     self.add_argument(*args, **kwargs)
198     self.add_argument(*disabled_args, **disabled_kwargs)
199
200     return (args, kwargs, disabled_args, disabled_kwargs)
201
202
203 class SubparserWrapper(object):
204     """Wrap subparsers so we can populate the normal and the shadow parser.
205     """
206
207     def __init__(self, cli, submodule, subparser):
208         self.cli = cli
209         self.submodule = submodule
210         self.subparser = subparser
211
212         for attr in dir(subparser):
213             if not hasattr(self, attr):
214                 setattr(self, attr, getattr(subparser, attr))
215
216     def completer(self, completer):
217         """Add an arpcomplete completer to this subcommand.
218         """
219         self.subparser.completer = completer
220
221     def add_argument(self, *args, **kwargs):
222         if kwargs.get('add_dest', True):
223             kwargs['dest'] = self.submodule + '_' + self.cli.get_argument_name(*args, **kwargs)
224         if 'add_dest' in kwargs:
225             del kwargs['add_dest']
226
227         if 'action' in kwargs and kwargs['action'] == 'store_boolean':
228             return handle_store_boolean(self, *args, **kwargs)
229
230         self.cli.acquire_lock()
231         self.subparser.add_argument(*args, **kwargs)
232
233         if 'default' in kwargs:
234             del kwargs['default']
235         if 'action' in kwargs and kwargs['action'] == 'store_false':
236             kwargs['action'] == 'store_true'
237         self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs)
238         self.cli.release_lock()
239
240
241 class MILC(object):
242     """MILC - An Opinionated Batteries Included Framework
243     """
244
245     def __init__(self):
246         """Initialize the MILC object.
247         """
248         # Setup a lock for thread safety
249         self._lock = threading.RLock() if thread else None
250
251         # Define some basic info
252         self.acquire_lock()
253         self._description = None
254         self._entrypoint = None
255         self._inside_context_manager = False
256         self.ansi = ansi_colors
257         self.config = Configuration()
258         self.config_file = None
259         self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0]
260         self.version = os.environ.get('QMK_VERSION', 'unknown')
261         self.release_lock()
262
263         # Initialize all the things
264         self.initialize_argparse()
265         self.initialize_logging()
266
267     @property
268     def description(self):
269         return self._description
270
271     @description.setter
272     def description(self, value):
273         self._description = self._arg_parser.description = self._arg_defaults.description = value
274
275     def echo(self, text, *args, **kwargs):
276         """Print colorized text to stdout, as long as stdout is a tty.
277
278         ANSI color strings (such as {fg-blue}) will be converted into ANSI
279         escape sequences, and the ANSI reset sequence will be added to all
280         strings.
281
282         If *args or **kwargs are passed they will be used to %-format the strings.
283         """
284         if args and kwargs:
285             raise RuntimeError('You can only specify *args or **kwargs, not both!')
286
287         if sys.stdout.isatty():
288             args = args or kwargs
289             text = format_ansi(text)
290
291             print(text % args)
292
293     def initialize_argparse(self):
294         """Prepare to process arguments from sys.argv.
295         """
296         kwargs = {
297             'fromfile_prefix_chars': '@',
298             'conflict_handler': 'resolve',
299         }
300
301         self.acquire_lock()
302         self.subcommands = {}
303         self.subcommands_default = {}
304         self._subparsers = None
305         self._subparsers_default = None
306         self.argwarn = argcomplete.warn
307         self.args = None
308         self._arg_defaults = argparse.ArgumentParser(**kwargs)
309         self._arg_parser = argparse.ArgumentParser(**kwargs)
310         self.set_defaults = self._arg_parser.set_defaults
311         self.print_usage = self._arg_parser.print_usage
312         self.print_help = self._arg_parser.print_help
313         self.release_lock()
314
315     def completer(self, completer):
316         """Add an arpcomplete completer to this subcommand.
317         """
318         self._arg_parser.completer = completer
319
320     def add_argument(self, *args, **kwargs):
321         """Wrapper to add arguments to both the main and the shadow argparser.
322         """
323         if kwargs.get('add_dest', True) and args[0][0] == '-':
324             kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs)
325         if 'add_dest' in kwargs:
326             del kwargs['add_dest']
327
328         if 'action' in kwargs and kwargs['action'] == 'store_boolean':
329             return handle_store_boolean(self, *args, **kwargs)
330
331         self.acquire_lock()
332         self._arg_parser.add_argument(*args, **kwargs)
333
334         # Populate the shadow parser
335         if 'default' in kwargs:
336             del kwargs['default']
337         if 'action' in kwargs and kwargs['action'] == 'store_false':
338             kwargs['action'] == 'store_true'
339         self._arg_defaults.add_argument(*args, **kwargs)
340         self.release_lock()
341
342     def initialize_logging(self):
343         """Prepare the defaults for the logging infrastructure.
344         """
345         self.acquire_lock()
346         self.log_file = None
347         self.log_file_mode = 'a'
348         self.log_file_handler = None
349         self.log_print = True
350         self.log_print_to = sys.stderr
351         self.log_print_level = logging.INFO
352         self.log_file_level = logging.DEBUG
353         self.log_level = logging.INFO
354         self.log = logging.getLogger(self.__class__.__name__)
355         self.log.setLevel(logging.DEBUG)
356         logging.root.setLevel(logging.DEBUG)
357         self.release_lock()
358
359         self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit')
360         self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose')
361         self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes')
362         self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output')
363         self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.')
364         self.add_argument('--log-file', help='File to write log messages to')
365         self.add_argument('--color', action='store_boolean', default=True, help='color in output')
366         self.add_argument('-c', '--config-file', help='The config file to read and/or write')
367         self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file')
368
369     def add_subparsers(self, title='Sub-commands', **kwargs):
370         if self._inside_context_manager:
371             raise RuntimeError('You must run this before the with statement!')
372
373         self.acquire_lock()
374         self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs)
375         self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs)
376         self.release_lock()
377
378     def acquire_lock(self):
379         """Acquire the MILC lock for exclusive access to properties.
380         """
381         if self._lock:
382             self._lock.acquire()
383
384     def release_lock(self):
385         """Release the MILC lock.
386         """
387         if self._lock:
388             self._lock.release()
389
390     def find_config_file(self):
391         """Locate the config file.
392         """
393         if self.config_file:
394             return self.config_file
395
396         if self.args and self.args.general_config_file:
397             return self.args.general_config_file
398
399         return os.path.abspath(os.path.expanduser('~/.%s.ini' % self.prog_name))
400
401     def get_argument_name(self, *args, **kwargs):
402         """Takes argparse arguments and returns the dest name.
403         """
404         try:
405             return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest']
406         except ValueError:
407             return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest']
408
409     def argument(self, *args, **kwargs):
410         """Decorator to call self.add_argument or self.<subcommand>.add_argument.
411         """
412         if self._inside_context_manager:
413             raise RuntimeError('You must run this before the with statement!')
414
415         def argument_function(handler):
416             if handler is self._entrypoint:
417                 self.add_argument(*args, **kwargs)
418
419             elif handler.__name__ in self.subcommands:
420                 self.subcommands[handler.__name__].add_argument(*args, **kwargs)
421
422             else:
423                 raise RuntimeError('Decorated function is not entrypoint or subcommand!')
424
425             return handler
426
427         return argument_function
428
429     def arg_passed(self, arg):
430         """Returns True if arg was passed on the command line.
431         """
432         return self.args_passed[arg] in (None, False)
433
434     def parse_args(self):
435         """Parse the CLI args.
436         """
437         if self.args:
438             self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!')
439             return
440
441         argcomplete.autocomplete(self._arg_parser)
442
443         self.acquire_lock()
444         self.args = self._arg_parser.parse_args()
445         self.args_passed = self._arg_defaults.parse_args()
446
447         if 'entrypoint' in self.args:
448             self._entrypoint = self.args.entrypoint
449
450         if self.args.general_config_file:
451             self.config_file = self.args.general_config_file
452
453         self.release_lock()
454
455     def read_config(self):
456         """Parse the configuration file and determine the runtime configuration.
457         """
458         self.acquire_lock()
459         self.config_file = self.find_config_file()
460
461         if self.config_file and os.path.exists(self.config_file):
462             config = RawConfigParser(self.config)
463             config.read(self.config_file)
464
465             # Iterate over the config file options and write them into self.config
466             for section in config.sections():
467                 for option in config.options(section):
468                     value = config.get(section, option)
469
470                     # Coerce values into useful datatypes
471                     if value.lower() in ['1', 'yes', 'true', 'on']:
472                         value = True
473                     elif value.lower() in ['0', 'no', 'false', 'none', 'off']:
474                         value = False
475                     elif value.replace('.', '').isdigit():
476                         if '.' in value:
477                             value = Decimal(value)
478                         else:
479                             value = int(value)
480
481                     self.config[section][option] = value
482
483         # Fold the CLI args into self.config
484         for argument in vars(self.args):
485             if argument in ('subparsers', 'entrypoint'):
486                 continue
487
488             if '_' not in argument:
489                 continue
490
491             section, option = argument.split('_', 1)
492             if hasattr(self.args_passed, argument):
493                 self.config[section][option] = getattr(self.args, argument)
494             else:
495                 if option not in self.config[section]:
496                     self.config[section][option] = getattr(self.args, argument)
497
498         self.release_lock()
499
500     def save_config(self):
501         """Save the current configuration to the config file.
502         """
503         self.log.debug("Saving config file to '%s'", self.config_file)
504
505         if not self.config_file:
506             self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__)
507             return
508
509         self.acquire_lock()
510
511         config = RawConfigParser()
512         for section_name, section in self.config._config.items():
513             config.add_section(section_name)
514             for option_name, value in section.items():
515                 if section_name == 'general':
516                     if option_name in ['save_config']:
517                         continue
518                 config.set(section_name, option_name, str(value))
519
520         with NamedTemporaryFile(mode='w', dir=os.path.dirname(self.config_file), delete=False) as tmpfile:
521             config.write(tmpfile)
522
523         # Move the new config file into place atomically
524         if os.path.getsize(tmpfile.name) > 0:
525             os.rename(tmpfile.name, self.config_file)
526         else:
527             self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file, tmpfile.name)
528
529         self.release_lock()
530
531     def __call__(self):
532         """Execute the entrypoint function.
533         """
534         if not self._inside_context_manager:
535             # If they didn't use the context manager use it ourselves
536             with self:
537                 self.__call__()
538                 return
539
540         if not self._entrypoint:
541             raise RuntimeError('No entrypoint provided!')
542
543         return self._entrypoint(self)
544
545     def entrypoint(self, description):
546         """Set the entrypoint for when no subcommand is provided.
547         """
548         if self._inside_context_manager:
549             raise RuntimeError('You must run this before cli()!')
550
551         self.acquire_lock()
552         self.description = description
553         self.release_lock()
554
555         def entrypoint_func(handler):
556             self.acquire_lock()
557             self._entrypoint = handler
558             self.release_lock()
559
560             return handler
561
562         return entrypoint_func
563
564     def add_subcommand(self, handler, description, name=None, **kwargs):
565         """Register a subcommand.
566
567         If name is not provided we use `handler.__name__`.
568         """
569         if self._inside_context_manager:
570             raise RuntimeError('You must run this before the with statement!')
571
572         if self._subparsers is None:
573             self.add_subparsers()
574
575         if not name:
576             name = handler.__name__
577
578         self.acquire_lock()
579         kwargs['help'] = description
580         self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs)
581         self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs))
582         self.subcommands[name].set_defaults(entrypoint=handler)
583
584         if name not in self.__dict__:
585             self.__dict__[name] = self.subcommands[name]
586         else:
587             self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name)
588
589         self.release_lock()
590
591         return handler
592
593     def subcommand(self, description, **kwargs):
594         """Decorator to register a subcommand.
595         """
596
597         def subcommand_function(handler):
598             return self.add_subcommand(handler, description, **kwargs)
599
600         return subcommand_function
601
602     def setup_logging(self):
603         """Called by __enter__() to setup the logging configuration.
604         """
605         if len(logging.root.handlers) != 0:
606             # This is not a design decision. This is what I'm doing for now until I can examine and think about this situation in more detail.
607             raise RuntimeError('MILC should be the only system installing root log handlers!')
608
609         self.acquire_lock()
610
611         if self.config['general']['verbose']:
612             self.log_print_level = logging.DEBUG
613
614         self.log_file = self.config['general']['log_file'] or self.log_file
615         self.log_file_format = self.config['general']['log_file_fmt']
616         self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt'])
617         self.log_format = self.config['general']['log_fmt']
618
619         if self.config.general.color:
620             self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
621         else:
622             self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
623
624         if self.log_file:
625             self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode)
626             self.log_file_handler.setLevel(self.log_file_level)
627             self.log_file_handler.setFormatter(self.log_file_format)
628             logging.root.addHandler(self.log_file_handler)
629
630         if self.log_print:
631             self.log_print_handler = logging.StreamHandler(self.log_print_to)
632             self.log_print_handler.setLevel(self.log_print_level)
633             self.log_print_handler.setFormatter(self.log_format)
634             logging.root.addHandler(self.log_print_handler)
635
636         self.release_lock()
637
638     def __enter__(self):
639         if self._inside_context_manager:
640             self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.')
641             return
642
643         self.acquire_lock()
644         self._inside_context_manager = True
645         self.release_lock()
646
647         colorama.init()
648         self.parse_args()
649         self.read_config()
650         self.setup_logging()
651
652         if self.config.general.save_config:
653             self.save_config()
654
655         return self
656
657     def __exit__(self, exc_type, exc_val, exc_tb):
658         self.acquire_lock()
659         self._inside_context_manager = False
660         self.release_lock()
661
662         if exc_type is not None and not isinstance(SystemExit(), exc_type):
663             print(exc_type)
664             logging.exception(exc_val)
665             exit(255)
666
667
668 cli = MILC()
669
670 if __name__ == '__main__':
671
672     @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean')
673     @cli.entrypoint('My useful CLI tool with subcommands.')
674     def main(cli):
675         comma = ',' if cli.config.general.comma else ''
676         cli.log.info('{bg_green}{fg_red}Hello%s World!', comma)
677
678     @cli.argument('-n', '--name', help='Name to greet', default='World')
679     @cli.subcommand('Description of hello subcommand here.')
680     def hello(cli):
681         comma = ',' if cli.config.general.comma else ''
682         cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name)
683
684     def goodbye(cli):
685         comma = ',' if cli.config.general.comma else ''
686         cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name)
687
688     @cli.argument('-n', '--name', help='Name to greet', default='World')
689     @cli.subcommand('Think a bit before greeting the user.')
690     def thinking(cli):
691         comma = ',' if cli.config.general.comma else ''
692         spinner = cli.spinner(text='Just a moment...', spinner='earth')
693         spinner.start()
694         sleep(2)
695         spinner.stop()
696
697         with cli.spinner(text='Almost there!', spinner='moon'):
698             sleep(2)
699
700         cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name)
701
702     @cli.subcommand('Show off our ANSI colors.')
703     def pride(cli):
704         cli.echo('{bg_red}                    ')
705         cli.echo('{bg_lightred_ex}                    ')
706         cli.echo('{bg_lightyellow_ex}                    ')
707         cli.echo('{bg_green}                    ')
708         cli.echo('{bg_blue}                    ')
709         cli.echo('{bg_magenta}                    ')
710
711     # You can register subcommands using decorators as seen above, or using functions like like this:
712     cli.add_subcommand(goodbye, 'This will show up in --help output.')
713     cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World')
714
715     cli()  # Automatically picks between main(), hello() and goodbye()
716     print(sorted(ansi_colors.keys()))