3 """MILC - A CLI Framework
7 MILC is an opinionated framework for writing CLI apps. It optimizes for the
8 most common unix tool pattern- small tools that are run from the command
9 line but generally do not feature any user interaction while they run.
11 For more details see the MILC documentation:
13 <https://github.com/clueboard/milc/tree/master/docs>
15 from __future__ import division, print_function, unicode_literals
21 from decimal import Decimal
22 from tempfile import NamedTemporaryFile
23 from time import sleep
26 from ConfigParser import RawConfigParser
28 from configparser import RawConfigParser
39 # Log Level Representations
41 'CRITICAL': '{bg_red}{fg_white}¬_¬{style_reset_all}',
42 'ERROR': '{fg_red}☒{style_reset_all}',
43 'WARNING': '{fg_yellow}⚠{style_reset_all}',
44 'INFO': '{fg_blue}ℹ{style_reset_all}',
45 'DEBUG': '{fg_cyan}☐{style_reset_all}',
46 'NOTSET': '{style_reset_all}¯\\_(o_o)_/¯'
48 EMOJI_LOGLEVELS['FATAL'] = EMOJI_LOGLEVELS['CRITICAL']
49 EMOJI_LOGLEVELS['WARN'] = EMOJI_LOGLEVELS['WARNING']
52 # Regex was gratefully borrowed from kfir on stackoverflow:
53 # https://stackoverflow.com/a/45448194
54 ansi_regex = r'\x1b(' \
57 r'([\(\)][a-b0-2])|' \
58 r'(\[\d{0,2}[ma-dgkjqi])|' \
59 r'(\[\d+;\d+[hfy]?)|' \
68 ansi_escape = re.compile(ansi_regex, flags=re.IGNORECASE)
70 ('fg', colorama.ansi.AnsiFore()),
71 ('bg', colorama.ansi.AnsiBack()),
72 ('style', colorama.ansi.AnsiStyle()),
76 for prefix, obj in ansi_styles:
77 for color in [x for x in obj.__dict__ if not x.startswith('_')]:
78 ansi_colors[prefix + '_' + color.lower()] = getattr(obj, color)
81 def format_ansi(text):
82 """Return a copy of text with certain strings replaced with ansi.
84 # Avoid .format() so we don't have to worry about the log content
85 for color in ansi_colors:
86 text = text.replace('{%s}' % color, ansi_colors[color])
87 return text + ansi_colors['style_reset_all']
90 class ANSIFormatter(logging.Formatter):
91 """A log formatter that inserts ANSI color.
94 def format(self, record):
95 msg = super(ANSIFormatter, self).format(record)
96 return format_ansi(msg)
99 class ANSIEmojiLoglevelFormatter(ANSIFormatter):
100 """A log formatter that makes the loglevel an emoji.
103 def format(self, record):
104 record.levelname = EMOJI_LOGLEVELS[record.levelname].format(**ansi_colors)
105 return super(ANSIEmojiLoglevelFormatter, self).format(record)
108 class ANSIStrippingFormatter(ANSIFormatter):
109 """A log formatter that strips ANSI.
112 def format(self, record):
113 msg = super(ANSIStrippingFormatter, self).format(record)
114 return ansi_escape.sub('', msg)
117 class Configuration(object):
118 """Represents the running configuration.
120 This class never raises IndexError, instead it will return None if a
121 section or option does not yet exist.
124 def __contains__(self, key):
125 return self._config.__contains__(key)
128 return self._config.__iter__()
131 return self._config.__len__()
134 return self._config.__repr__()
137 return self._config.keys()
140 return self._config.items()
143 return self._config.values()
145 def __init__(self, *args, **kwargs):
147 self.default_container = ConfigurationOption
149 def __getitem__(self, key):
150 """Returns a config section, creating it if it doesn't exist yet.
152 if key not in self._config:
153 self.__dict__[key] = self._config[key] = ConfigurationOption()
155 return self._config[key]
157 def __setitem__(self, key, value):
158 self.__dict__[key] = value
159 self._config[key] = value
161 def __delitem__(self, key):
162 if key in self.__dict__ and key[0] != '_':
163 del self.__dict__[key]
164 del self._config[key]
167 class ConfigurationOption(Configuration):
168 def __init__(self, *args, **kwargs):
169 super(ConfigurationOption, self).__init__(*args, **kwargs)
170 self.default_container = dict
172 def __getitem__(self, key):
173 """Returns a config section, creating it if it doesn't exist yet.
175 if key not in self._config:
176 self.__dict__[key] = self._config[key] = None
178 return self._config[key]
181 def handle_store_boolean(self, *args, **kwargs):
182 """Does the add_argument for action='store_boolean'.
184 kwargs['add_dest'] = False
186 disabled_kwargs = kwargs.copy()
187 disabled_kwargs['action'] = 'store_false'
188 disabled_kwargs['help'] = 'Disable ' + kwargs['help']
189 kwargs['action'] = 'store_true'
190 kwargs['help'] = 'Enable ' + kwargs['help']
194 disabled_args = ('--no-' + flag[2:],)
197 self.add_argument(*args, **kwargs)
198 self.add_argument(*disabled_args, **disabled_kwargs)
200 return (args, kwargs, disabled_args, disabled_kwargs)
203 class SubparserWrapper(object):
204 """Wrap subparsers so we can populate the normal and the shadow parser.
207 def __init__(self, cli, submodule, subparser):
209 self.submodule = submodule
210 self.subparser = subparser
212 for attr in dir(subparser):
213 if not hasattr(self, attr):
214 setattr(self, attr, getattr(subparser, attr))
216 def completer(self, completer):
217 """Add an arpcomplete completer to this subcommand.
219 self.subparser.completer = completer
221 def add_argument(self, *args, **kwargs):
222 if kwargs.get('add_dest', True):
223 kwargs['dest'] = self.submodule + '_' + self.cli.get_argument_name(*args, **kwargs)
224 if 'add_dest' in kwargs:
225 del kwargs['add_dest']
227 if 'action' in kwargs and kwargs['action'] == 'store_boolean':
228 return handle_store_boolean(self, *args, **kwargs)
230 self.cli.acquire_lock()
231 self.subparser.add_argument(*args, **kwargs)
233 if 'default' in kwargs:
234 del kwargs['default']
235 if 'action' in kwargs and kwargs['action'] == 'store_false':
236 kwargs['action'] == 'store_true'
237 self.cli.subcommands_default[self.submodule].add_argument(*args, **kwargs)
238 self.cli.release_lock()
242 """MILC - An Opinionated Batteries Included Framework
246 """Initialize the MILC object.
248 # Setup a lock for thread safety
249 self._lock = threading.RLock() if thread else None
251 # Define some basic info
253 self._description = None
254 self._entrypoint = None
255 self._inside_context_manager = False
256 self.ansi = ansi_colors
257 self.config = Configuration()
258 self.config_file = None
259 self.prog_name = sys.argv[0][:-3] if sys.argv[0].endswith('.py') else sys.argv[0]
260 self.version = os.environ.get('QMK_VERSION', 'unknown')
263 # Initialize all the things
264 self.initialize_argparse()
265 self.initialize_logging()
268 def description(self):
269 return self._description
272 def description(self, value):
273 self._description = self._arg_parser.description = self._arg_defaults.description = value
275 def echo(self, text, *args, **kwargs):
276 """Print colorized text to stdout, as long as stdout is a tty.
278 ANSI color strings (such as {fg-blue}) will be converted into ANSI
279 escape sequences, and the ANSI reset sequence will be added to all
282 If *args or **kwargs are passed they will be used to %-format the strings.
285 raise RuntimeError('You can only specify *args or **kwargs, not both!')
287 if sys.stdout.isatty():
288 args = args or kwargs
289 text = format_ansi(text)
293 def initialize_argparse(self):
294 """Prepare to process arguments from sys.argv.
297 'fromfile_prefix_chars': '@',
298 'conflict_handler': 'resolve',
302 self.subcommands = {}
303 self.subcommands_default = {}
304 self._subparsers = None
305 self._subparsers_default = None
306 self.argwarn = argcomplete.warn
308 self._arg_defaults = argparse.ArgumentParser(**kwargs)
309 self._arg_parser = argparse.ArgumentParser(**kwargs)
310 self.set_defaults = self._arg_parser.set_defaults
311 self.print_usage = self._arg_parser.print_usage
312 self.print_help = self._arg_parser.print_help
315 def completer(self, completer):
316 """Add an arpcomplete completer to this subcommand.
318 self._arg_parser.completer = completer
320 def add_argument(self, *args, **kwargs):
321 """Wrapper to add arguments to both the main and the shadow argparser.
323 if kwargs.get('add_dest', True) and args[0][0] == '-':
324 kwargs['dest'] = 'general_' + self.get_argument_name(*args, **kwargs)
325 if 'add_dest' in kwargs:
326 del kwargs['add_dest']
328 if 'action' in kwargs and kwargs['action'] == 'store_boolean':
329 return handle_store_boolean(self, *args, **kwargs)
332 self._arg_parser.add_argument(*args, **kwargs)
334 # Populate the shadow parser
335 if 'default' in kwargs:
336 del kwargs['default']
337 if 'action' in kwargs and kwargs['action'] == 'store_false':
338 kwargs['action'] == 'store_true'
339 self._arg_defaults.add_argument(*args, **kwargs)
342 def initialize_logging(self):
343 """Prepare the defaults for the logging infrastructure.
347 self.log_file_mode = 'a'
348 self.log_file_handler = None
349 self.log_print = True
350 self.log_print_to = sys.stderr
351 self.log_print_level = logging.INFO
352 self.log_file_level = logging.DEBUG
353 self.log_level = logging.INFO
354 self.log = logging.getLogger(self.__class__.__name__)
355 self.log.setLevel(logging.DEBUG)
356 logging.root.setLevel(logging.DEBUG)
359 self.add_argument('-V', '--version', version=self.version, action='version', help='Display the version and exit')
360 self.add_argument('-v', '--verbose', action='store_true', help='Make the logging more verbose')
361 self.add_argument('--datetime-fmt', default='%Y-%m-%d %H:%M:%S', help='Format string for datetimes')
362 self.add_argument('--log-fmt', default='%(levelname)s %(message)s', help='Format string for printed log output')
363 self.add_argument('--log-file-fmt', default='[%(levelname)s] [%(asctime)s] [file:%(pathname)s] [line:%(lineno)d] %(message)s', help='Format string for log file.')
364 self.add_argument('--log-file', help='File to write log messages to')
365 self.add_argument('--color', action='store_boolean', default=True, help='color in output')
366 self.add_argument('-c', '--config-file', help='The config file to read and/or write')
367 self.add_argument('--save-config', action='store_true', help='Save the running configuration to the config file')
369 def add_subparsers(self, title='Sub-commands', **kwargs):
370 if self._inside_context_manager:
371 raise RuntimeError('You must run this before the with statement!')
374 self._subparsers_default = self._arg_defaults.add_subparsers(title=title, dest='subparsers', **kwargs)
375 self._subparsers = self._arg_parser.add_subparsers(title=title, dest='subparsers', **kwargs)
378 def acquire_lock(self):
379 """Acquire the MILC lock for exclusive access to properties.
384 def release_lock(self):
385 """Release the MILC lock.
390 def find_config_file(self):
391 """Locate the config file.
394 return self.config_file
396 if self.args and self.args.general_config_file:
397 return self.args.general_config_file
399 return os.path.abspath(os.path.expanduser('~/.%s.ini' % self.prog_name))
401 def get_argument_name(self, *args, **kwargs):
402 """Takes argparse arguments and returns the dest name.
405 return self._arg_parser._get_optional_kwargs(*args, **kwargs)['dest']
407 return self._arg_parser._get_positional_kwargs(*args, **kwargs)['dest']
409 def argument(self, *args, **kwargs):
410 """Decorator to call self.add_argument or self.<subcommand>.add_argument.
412 if self._inside_context_manager:
413 raise RuntimeError('You must run this before the with statement!')
415 def argument_function(handler):
416 if handler is self._entrypoint:
417 self.add_argument(*args, **kwargs)
419 elif handler.__name__ in self.subcommands:
420 self.subcommands[handler.__name__].add_argument(*args, **kwargs)
423 raise RuntimeError('Decorated function is not entrypoint or subcommand!')
427 return argument_function
429 def arg_passed(self, arg):
430 """Returns True if arg was passed on the command line.
432 return self.args_passed[arg] in (None, False)
434 def parse_args(self):
435 """Parse the CLI args.
438 self.log.debug('Warning: Arguments have already been parsed, ignoring duplicate attempt!')
441 argcomplete.autocomplete(self._arg_parser)
444 self.args = self._arg_parser.parse_args()
445 self.args_passed = self._arg_defaults.parse_args()
447 if 'entrypoint' in self.args:
448 self._entrypoint = self.args.entrypoint
450 if self.args.general_config_file:
451 self.config_file = self.args.general_config_file
455 def read_config(self):
456 """Parse the configuration file and determine the runtime configuration.
459 self.config_file = self.find_config_file()
461 if self.config_file and os.path.exists(self.config_file):
462 config = RawConfigParser(self.config)
463 config.read(self.config_file)
465 # Iterate over the config file options and write them into self.config
466 for section in config.sections():
467 for option in config.options(section):
468 value = config.get(section, option)
470 # Coerce values into useful datatypes
471 if value.lower() in ['1', 'yes', 'true', 'on']:
473 elif value.lower() in ['0', 'no', 'false', 'none', 'off']:
475 elif value.replace('.', '').isdigit():
477 value = Decimal(value)
481 self.config[section][option] = value
483 # Fold the CLI args into self.config
484 for argument in vars(self.args):
485 if argument in ('subparsers', 'entrypoint'):
488 if '_' not in argument:
491 section, option = argument.split('_', 1)
492 if hasattr(self.args_passed, argument):
493 self.config[section][option] = getattr(self.args, argument)
495 if option not in self.config[section]:
496 self.config[section][option] = getattr(self.args, argument)
500 def save_config(self):
501 """Save the current configuration to the config file.
503 self.log.debug("Saving config file to '%s'", self.config_file)
505 if not self.config_file:
506 self.log.warning('%s.config_file file not set, not saving config!', self.__class__.__name__)
511 config = RawConfigParser()
512 for section_name, section in self.config._config.items():
513 config.add_section(section_name)
514 for option_name, value in section.items():
515 if section_name == 'general':
516 if option_name in ['save_config']:
518 config.set(section_name, option_name, str(value))
520 with NamedTemporaryFile(mode='w', dir=os.path.dirname(self.config_file), delete=False) as tmpfile:
521 config.write(tmpfile)
523 # Move the new config file into place atomically
524 if os.path.getsize(tmpfile.name) > 0:
525 os.rename(tmpfile.name, self.config_file)
527 self.log.warning('Config file saving failed, not replacing %s with %s.', self.config_file, tmpfile.name)
532 """Execute the entrypoint function.
534 if not self._inside_context_manager:
535 # If they didn't use the context manager use it ourselves
540 if not self._entrypoint:
541 raise RuntimeError('No entrypoint provided!')
543 return self._entrypoint(self)
545 def entrypoint(self, description):
546 """Set the entrypoint for when no subcommand is provided.
548 if self._inside_context_manager:
549 raise RuntimeError('You must run this before cli()!')
552 self.description = description
555 def entrypoint_func(handler):
557 self._entrypoint = handler
562 return entrypoint_func
564 def add_subcommand(self, handler, description, name=None, **kwargs):
565 """Register a subcommand.
567 If name is not provided we use `handler.__name__`.
569 if self._inside_context_manager:
570 raise RuntimeError('You must run this before the with statement!')
572 if self._subparsers is None:
573 self.add_subparsers()
576 name = handler.__name__
579 kwargs['help'] = description
580 self.subcommands_default[name] = self._subparsers_default.add_parser(name, **kwargs)
581 self.subcommands[name] = SubparserWrapper(self, name, self._subparsers.add_parser(name, **kwargs))
582 self.subcommands[name].set_defaults(entrypoint=handler)
584 if name not in self.__dict__:
585 self.__dict__[name] = self.subcommands[name]
587 self.log.debug("Could not add subcommand '%s' to attributes, key already exists!", name)
593 def subcommand(self, description, **kwargs):
594 """Decorator to register a subcommand.
597 def subcommand_function(handler):
598 return self.add_subcommand(handler, description, **kwargs)
600 return subcommand_function
602 def setup_logging(self):
603 """Called by __enter__() to setup the logging configuration.
605 if len(logging.root.handlers) != 0:
606 # This is not a design decision. This is what I'm doing for now until I can examine and think about this situation in more detail.
607 raise RuntimeError('MILC should be the only system installing root log handlers!')
611 if self.config['general']['verbose']:
612 self.log_print_level = logging.DEBUG
614 self.log_file = self.config['general']['log_file'] or self.log_file
615 self.log_file_format = self.config['general']['log_file_fmt']
616 self.log_file_format = ANSIStrippingFormatter(self.config['general']['log_file_fmt'], self.config['general']['datetime_fmt'])
617 self.log_format = self.config['general']['log_fmt']
619 if self.config.general.color:
620 self.log_format = ANSIEmojiLoglevelFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
622 self.log_format = ANSIStrippingFormatter(self.args.general_log_fmt, self.config.general.datetime_fmt)
625 self.log_file_handler = logging.FileHandler(self.log_file, self.log_file_mode)
626 self.log_file_handler.setLevel(self.log_file_level)
627 self.log_file_handler.setFormatter(self.log_file_format)
628 logging.root.addHandler(self.log_file_handler)
631 self.log_print_handler = logging.StreamHandler(self.log_print_to)
632 self.log_print_handler.setLevel(self.log_print_level)
633 self.log_print_handler.setFormatter(self.log_format)
634 logging.root.addHandler(self.log_print_handler)
639 if self._inside_context_manager:
640 self.log.debug('Warning: context manager was entered again. This usually means that self.__call__() was called before the with statement. You probably do not want to do that.')
644 self._inside_context_manager = True
652 if self.config.general.save_config:
657 def __exit__(self, exc_type, exc_val, exc_tb):
659 self._inside_context_manager = False
662 if exc_type is not None and not isinstance(SystemExit(), exc_type):
664 logging.exception(exc_val)
670 if __name__ == '__main__':
672 @cli.argument('-c', '--comma', help='comma in output', default=True, action='store_boolean')
673 @cli.entrypoint('My useful CLI tool with subcommands.')
675 comma = ',' if cli.config.general.comma else ''
676 cli.log.info('{bg_green}{fg_red}Hello%s World!', comma)
678 @cli.argument('-n', '--name', help='Name to greet', default='World')
679 @cli.subcommand('Description of hello subcommand here.')
681 comma = ',' if cli.config.general.comma else ''
682 cli.log.info('{fg_blue}Hello%s %s!', comma, cli.config.hello.name)
685 comma = ',' if cli.config.general.comma else ''
686 cli.log.info('{bg_red}Goodbye%s %s!', comma, cli.config.goodbye.name)
688 @cli.argument('-n', '--name', help='Name to greet', default='World')
689 @cli.subcommand('Think a bit before greeting the user.')
691 comma = ',' if cli.config.general.comma else ''
692 spinner = cli.spinner(text='Just a moment...', spinner='earth')
697 with cli.spinner(text='Almost there!', spinner='moon'):
700 cli.log.info('{fg_cyan}Hello%s %s!', comma, cli.config.thinking.name)
702 @cli.subcommand('Show off our ANSI colors.')
704 cli.echo('{bg_red} ')
705 cli.echo('{bg_lightred_ex} ')
706 cli.echo('{bg_lightyellow_ex} ')
707 cli.echo('{bg_green} ')
708 cli.echo('{bg_blue} ')
709 cli.echo('{bg_magenta} ')
711 # You can register subcommands using decorators as seen above, or using functions like like this:
712 cli.add_subcommand(goodbye, 'This will show up in --help output.')
713 cli.goodbye.add_argument('-n', '--name', help='Name to bid farewell to', default='World')
715 cli() # Automatically picks between main(), hello() and goodbye()
716 print(sorted(ansi_colors.keys()))