X-Git-Url: https://git.donarmstrong.com/?a=blobdiff_plain;f=autorandr.py;h=a37a6240542e1ca47e90eab113b61a826603673d;hb=eaa04753094d944c3513de9d4de85fb1eb2e8b7b;hp=a65322d90c4ea4168a4748ba47befb5423e1de85;hpb=a997298c5da19ee9eb808bac9b6c494798f7f924;p=deb_pkgs%2Fautorandr.git diff --git a/autorandr.py b/autorandr.py index a65322d..a37a624 100755 --- a/autorandr.py +++ b/autorandr.py @@ -30,10 +30,12 @@ import getopt import hashlib import os import posix +import pwd import re import subprocess import sys import shutil +import time from collections import OrderedDict from distutils.version import LooseVersion as Version @@ -68,8 +70,9 @@ Usage: autorandr [options] --config dump your current xrandr setup --dry-run don't change anything, only print the xrandr commands --debug enable verbose output +--batch run autorandr for all users with active X11 sessions - To prevent a profile from being loaded, place a script call "block" in its + To prevent a profile from being loaded, place a script called "block" in its directory. The script is evaluated before the screen setup is inspected, and in case of it returning a value of 0 the profile is skipped. This can be used to query the status of a docking station you are about to leave. @@ -96,23 +99,31 @@ class AutorandrException(Exception): while trace.tb_next: trace = trace.tb_next self.line = trace.tb_lineno + self.file_name = trace.tb_frame.f_code.co_filename else: try: import inspect - self.line = inspect.currentframe().f_back.f_lineno + frame = inspect.currentframe().f_back + self.line = frame.f_lineno + self.file_name = frame.f_code.co_filename except: self.line = None + self.file_name = None self.original_exception = None + if os.path.abspath(self.file_name) == os.path.abspath(sys.argv[0]): + self.file_name = None + def __str__(self): retval = [ self.message ] if self.line: - retval.append(" (line %d)" % self.line) + retval.append(" (line %d%s)" % (self.line, ("; %s" % self.file_name) if self.file_name else "")) if self.original_exception: retval.append(":\n ") retval.append(str(self.original_exception).replace("\n", "\n ")) if self.report_bug: - retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream." + retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream:" + "\nhttps://github.com/phillipberndt/autorandr/issues" "\nPlease attach the output of `xrandr --verbose` to your bug report if appropriate.") return "".join(retval) @@ -334,6 +345,8 @@ class XrandrOutput(object): for line in configuration.split("\n"): if line: line = line.split(None, 1) + if line and line[0].startswith("#"): + continue options[line[0]] = line[1] if len(line) > 1 else None edid = None @@ -453,7 +466,7 @@ def load_profiles(profile_path): if not os.path.isfile(config_name) or not os.path.isfile(setup_name): continue - edids = dict([ x.strip().split() for x in open(setup_name).readlines() if x.strip() ]) + edids = dict([ x.split() for x in (y.strip() for y in open(setup_name).readlines()) if x and x[0] != "#" ]) config = {} buffer = [] @@ -472,6 +485,17 @@ def load_profiles(profile_path): return profiles +def get_symlinks(profile_path): + "Load all symlinks from a directory" + + symlinks = {} + for link in os.listdir(profile_path): + file_name = os.path.join(profile_path, link) + if os.path.islink(file_name): + symlinks[link] = os.readlink(file_name) + + return symlinks + def find_profiles(current_config, profiles): "Find profiles matching the currently connected outputs" detected_profiles = [] @@ -528,6 +552,31 @@ def update_mtime(filename): except: return False +def call_and_retry(*args, **kwargs): + """Wrapper around subprocess.call that retries failed calls. + + This function calls subprocess.call and on non-zero exit states, + waits a second and then retries once. This mitigates #47, + a timing issue with some drivers. + """ + if "dry_run" in kwargs: + dry_run = kwargs["dry_run"] + del kwargs["dry_run"] + else: + dry_run = False + kwargs_redirected = dict(kwargs) + if not dry_run: + if hasattr(subprocess, "DEVNULL"): + kwargs_redirected["stdout"] = getattr(subprocess, "DEVNULL") + else: + kwargs_redirected["stdout"] = open(os.devnull, "w") + kwargs_redirected["stderr"] = kwargs_redirected["stdout"] + retval = subprocess.call(*args, **kwargs_redirected) + if retval != 0: + time.sleep(1) + retval = subprocess.call(*args, **kwargs) + return retval + def apply_configuration(new_configuration, current_configuration, dry_run=False): "Apply a configuration" outputs = sorted(new_configuration.keys(), key=lambda x: new_configuration[x].sort_key) @@ -552,6 +601,9 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False) # at least.) # - Some implementations can not handle --transform at all, so avoid it unless # necessary. (See https://github.com/phillipberndt/autorandr/issues/37) + # - Some implementations can not handle --panning without specifying --fb + # explicitly, so avoid it unless necessary. + # (See https://github.com/phillipberndt/autorandr/issues/72) auxiliary_changes_pre = [] disable_outputs = [] @@ -566,28 +618,29 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False) option_vector = new_configuration[output].option_vector if xrandr_version() >= Version("1.3.0"): - if "transform" in current_configuration[output].options: - auxiliary_changes_pre.append(["--output", output, "--transform", "none"]) - else: - try: - transform_index = option_vector.index("--transform") - if option_vector[transform_index+1] == XrandrOutput.XRANDR_DEFAULTS["transform"]: - option_vector = option_vector[:transform_index] + option_vector[transform_index+2:] - except ValueError: - pass + for option in ("transform", "panning"): + if option in current_configuration[output].options: + auxiliary_changes_pre.append(["--output", output, "--%s" % option, "none"]) + else: + try: + option_index = option_vector.index("--%s" % option) + if option_vector[option_index+1] == XrandrOutput.XRANDR_DEFAULTS[option]: + option_vector = option_vector[:option_index] + option_vector[option_index+2:] + except ValueError: + pass enable_outputs.append(option_vector) # Perform pe-change auxiliary changes if auxiliary_changes_pre: argv = base_argv + list(chain.from_iterable(auxiliary_changes_pre)) - if subprocess.call(argv) != 0: + if call_and_retry(argv, dry_run=dry_run) != 0: raise AutorandrException("Command failed: %s" % " ".join(argv)) # Disable unused outputs, but make sure that there always is at least one active screen disable_keep = 0 if remain_active_count else 1 if len(disable_outputs) > disable_keep: - if subprocess.call(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs))) != 0: + if call_and_retry(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs)), dry_run=dry_run) != 0: # Disabling the outputs failed. Retry with the next command: # Sometimes disabling of outputs fails due to an invalid RRSetScreenSize. # This does not occur if simultaneously the primary screen is reset. @@ -606,7 +659,7 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False) operations = disable_outputs + enable_outputs for index in range(0, len(operations), 2): argv = base_argv + list(chain.from_iterable(operations[index:index+2])) - if subprocess.call(argv) != 0: + if call_and_retry(argv, dry_run=dry_run) != 0: raise AutorandrException("Command failed: %s" % " ".join(argv)) def is_equal_configuration(source_configuration, target_configuration): @@ -695,6 +748,8 @@ def exec_scripts(profile_path, script_name, meta_information=None): and system-wide configuration folders, named script_name or residing in subdirectories named script_name.d. + If profile_path is None, only global scripts will be invoked. + meta_information is expected to be an dictionary. It will be passed to the block scripts in the environment, as variables called AUTORANDR_. @@ -714,13 +769,19 @@ def exec_scripts(profile_path, script_name, meta_information=None): if not os.path.isdir(user_profile_path): user_profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr") - for folder in chain((profile_path, os.path.dirname(profile_path), user_profile_path), - (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "").split(":"))): + candidate_directories = chain((user_profile_path,), (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":"))) + if profile_path: + candidate_directories = chain((profile_path,), candidate_directories) + + for folder in candidate_directories: if script_name not in ran_scripts: script = os.path.join(folder, script_name) if os.access(script, os.X_OK | os.F_OK): - all_ok &= subprocess.call(script, env=env) != 0 + try: + all_ok &= subprocess.call(script, env=env) != 0 + except: + raise AutorandrException("Failed to execute user command: %s" % (script,)) ran_scripts.add(script_name) script_folder = os.path.join(folder, "%s.d" % script_name) @@ -730,28 +791,121 @@ def exec_scripts(profile_path, script_name, meta_information=None): if check_name not in ran_scripts: script = os.path.join(script_folder, file_name) if os.access(script, os.X_OK | os.F_OK): - all_ok &= subprocess.call(script, env=env) != 0 + try: + all_ok &= subprocess.call(script, env=env) != 0 + except: + raise AutorandrException("Failed to execute user command: %s" % (script,)) ran_scripts.add(check_name) return all_ok +def dispatch_call_to_sessions(argv): + """Invoke autorandr for each open local X11 session with the given options. + + The function iterates over all processes not owned by root and checks + whether they have a DISPLAY variable set. It strips the screen from any + variable it finds (i.e. :0.0 becomes :0) and checks whether this display + has been handled already. If it has not, it forks, changes uid/gid to + the user owning the process, reuses the process's environment and runs + autorandr with the parameters from argv. + + This function requires root permissions. It only works for X11 servers that + have at least one non-root process running. It is susceptible for attacks + where one user runs a process with another user's DISPLAY variable - in + this case, it might happen that autorandr is invoked for the other user, + which won't work. Since no other harm than prevention of automated + execution of autorandr can be done this way, the assumption is that in this + situation, the local administrator will handle the situation.""" + X11_displays_done = set() + + autorandr_binary = os.path.abspath(argv[0]) + + for directory in os.listdir("/proc"): + directory = os.path.join("/proc/", directory) + if not os.path.isdir(directory): + continue + environ_file = os.path.join(directory, "environ") + if not os.path.isfile(environ_file): + continue + uid = os.stat(environ_file).st_uid + + # The following line assumes that user accounts start at 1000 and that + # no one works using the root or another system account. This is rather + # restrictive, but de facto default. Alternatives would be to use the + # UID_MIN from /etc/login.defs or FIRST_UID from /etc/adduser.conf; + # but effectively, both values aren't binding in any way. + # If this breaks your use case, please file a bug on Github. + if uid < 1000: + continue + + process_environ = {} + for environ_entry in open(environ_file).read().split("\0"): + if "=" in environ_entry: + name, value = environ_entry.split("=", 1) + if name == "DISPLAY" and "." in value: + value = value[:value.find(".")] + process_environ[name] = value + display = process_environ["DISPLAY"] if "DISPLAY" in process_environ else None + + # To allow scripts to detect batch invocation (especially useful for predetect) + process_environ["AUTORANDR_BATCH_PID"] = str(os.getpid()) + + if display and display not in X11_displays_done: + try: + pwent = pwd.getpwuid(uid) + except KeyError: + # User has no pwd entry + continue + + print("Running autorandr as %s for display %s" % (pwent.pw_name, display)) + child_pid = os.fork() + if child_pid == 0: + # This will throw an exception if any of the privilege changes fails, + # so it should be safe. Also, note that since the environment + # is taken from a process owned by the user, reusing it should + # not leak any information. + os.setgroups([]) + os.setresgid(pwent.pw_gid, pwent.pw_gid, pwent.pw_gid) + os.setresuid(pwent.pw_uid, pwent.pw_uid, pwent.pw_uid) + os.chdir(pwent.pw_dir) + os.environ.clear() + os.environ.update(process_environ) + os.execl(autorandr_binary, autorandr_binary, *argv[1:]) + os.exit(1) + os.waitpid(child_pid, 0) + + X11_displays_done.add(display) + def main(argv): try: - options = dict(getopt.getopt(argv[1:], "s:r:l:d:cfh", [ "dry-run", "change", "default=", "save=", "remove=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0]) + options = dict(getopt.getopt(argv[1:], "s:r:l:d:cfh", [ "batch", "dry-run", "change", "default=", "save=", "remove=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0]) except getopt.GetoptError as e: print("Failed to parse options: {0}.\n" "Use --help to get usage information.".format(str(e)), file=sys.stderr) sys.exit(posix.EX_USAGE) + if "-h" in options or "--help" in options: + exit_help() + + # Batch mode + if "--batch" in options: + if ("DISPLAY" not in os.environ or not os.environ["DISPLAY"]) and os.getuid() == 0: + dispatch_call_to_sessions([ x for x in argv if x != "--batch" ]) + else: + print("--batch mode can only be used by root and if $DISPLAY is unset") + return + profiles = {} + profile_symlinks = {} try: # Load profiles from each XDG config directory # The XDG spec says that earlier entries should take precedence, so reverse the order - for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "").split(":")): + for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")): system_profile_path = os.path.join(directory, "autorandr") if os.path.isdir(system_profile_path): profiles.update(load_profiles(system_profile_path)) + profile_symlinks.update(get_symlinks(system_profile_path)) # For the user's profiles, prefer the legacy ~/.autorandr if it already exists # profile_path is also used later on to store configurations profile_path = os.path.expanduser("~/.autorandr") @@ -760,11 +914,15 @@ def main(argv): profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr") if os.path.isdir(profile_path): profiles.update(load_profiles(profile_path)) + profile_symlinks.update(get_symlinks(profile_path)) # Sort by descending mtime profiles = OrderedDict(sorted(profiles.items(), key=lambda x: -x[1]["config-mtime"])) except Exception as e: raise AutorandrException("Failed to load profiles", e) + profile_symlinks = { k: v for k, v in profile_symlinks.items() if v in (x[0] for x in virtual_profiles) or v in profiles } + + exec_scripts(None, "predetect") config, modes = parse_xrandr_output() if "--fingerprint" in options: @@ -824,9 +982,6 @@ def main(argv): raise AutorandrException("Failed to remove profile '%s'" % (options["--remove"],), e) sys.exit(0) - if "-h" in options or "--help" in options: - exit_help() - detected_profiles = find_profiles(config, profiles) load_profile = False @@ -867,6 +1022,11 @@ def main(argv): load_profile = options["--default"] if load_profile: + if load_profile in profile_symlinks: + if "--debug" in options: + print("'%s' symlinked to '%s'" % (load_profile, profile_symlinks[load_profile])) + load_profile = profile_symlinks[load_profile] + if load_profile in ( x[0] for x in virtual_profiles ): load_config = generate_virtual_profile(config, modes, load_profile) scripts_path = os.path.join(profile_path, load_profile) @@ -898,8 +1058,13 @@ def main(argv): "PROFILE_FOLDER": scripts_path, } exec_scripts(scripts_path, "preswitch", script_metadata) + if "--debug" in options: + print("Going to run:") + apply_configuration(load_config, config, True) apply_configuration(load_config, config, False) exec_scripts(scripts_path, "postswitch", script_metadata) + except AutorandrException as e: + raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, e.report_bug) except Exception as e: raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, True) @@ -911,7 +1076,7 @@ def main(argv): sys.exit(0) -if __name__ == '__main__': +def exception_handled_main(argv=sys.argv): try: main(sys.argv) except AutorandrException as e: @@ -922,5 +1087,8 @@ if __name__ == '__main__': print("Exception: {0}".format(e.__class__.__name__)) sys.exit(2) - print("Unhandled exception ({0}). Please report this as a bug.".format(e), file=sys.stderr) + print("Unhandled exception ({0}). Please report this as a bug at https://github.com/phillipberndt/autorandr/issues.".format(e), file=sys.stderr) raise + +if __name__ == '__main__': + exception_handled_main()