]> git.donarmstrong.com Git - deb_pkgs/autorandr.git/blobdiff - autorandr.py
Batch mode: Prefer processes with XAUTHORITY set as environment templates (See #81)
[deb_pkgs/autorandr.git] / autorandr.py
index 548f3129767ab32b52b561dc4f0f2fd6ab4bfd79..f8b3ae8a5a96e006052243797950c824963c2d4b 100755 (executable)
@@ -50,6 +50,7 @@ except NameError:
 virtual_profiles = [
     # (name, description, callback)
     ("common", "Clone all connected outputs at the largest common resolution", None),
+    ("clone-largest", "Clone all connected outputs with the largest resolution (scaled down if necessary)", None),
     ("horizontal", "Stack all connected outputs horizontally at their largest resolution", None),
     ("vertical", "Stack all connected outputs vertically at their largest resolution", None),
 ]
@@ -72,7 +73,7 @@ Usage: autorandr [options]
 --debug                 enable verbose output
 --batch                 run autorandr for all users with active X11 sessions
 
- To prevent a profile from being loaded, place a script call "block" in its
+ To prevent a profile from being loaded, place a script called "block" in its
  directory. The script is evaluated before the screen setup is inspected, and
  in case of it returning a value of 0 the profile is skipped. This can be used
  to query the status of a docking station you are about to leave.
@@ -99,18 +100,25 @@ class AutorandrException(Exception):
             while trace.tb_next:
                 trace = trace.tb_next
             self.line = trace.tb_lineno
+            self.file_name = trace.tb_frame.f_code.co_filename
         else:
             try:
                 import inspect
-                self.line = inspect.currentframe().f_back.f_lineno
+                frame = inspect.currentframe().f_back
+                self.line = frame.f_lineno
+                self.file_name = frame.f_code.co_filename
             except:
                 self.line = None
+                self.file_name = None
             self.original_exception = None
 
+        if os.path.abspath(self.file_name) == os.path.abspath(sys.argv[0]):
+            self.file_name = None
+
     def __str__(self):
         retval = [ self.message ]
         if self.line:
-            retval.append(" (line %d)" % self.line)
+            retval.append(" (line %d%s)" % (self.line, ("; %s" % self.file_name) if self.file_name else ""))
         if self.original_exception:
             retval.append(":\n  ")
             retval.append(str(self.original_exception).replace("\n", "\n  "))
@@ -338,6 +346,8 @@ class XrandrOutput(object):
         for line in configuration.split("\n"):
             if line:
                 line = line.split(None, 1)
+                if line and line[0].startswith("#"):
+                    continue
                 options[line[0]] = line[1] if len(line) > 1 else None
 
         edid = None
@@ -457,7 +467,7 @@ def load_profiles(profile_path):
         if not os.path.isfile(config_name) or not os.path.isfile(setup_name):
             continue
 
-        edids = dict([ x.strip().split() for x in open(setup_name).readlines() if x.strip() ])
+        edids = dict([ x.split() for x in (y.strip() for y in open(setup_name).readlines()) if x and x[0] != "#" ])
 
         config = {}
         buffer = []
@@ -476,6 +486,17 @@ def load_profiles(profile_path):
 
     return profiles
 
+def get_symlinks(profile_path):
+    "Load all symlinks from a directory"
+
+    symlinks = {}
+    for link in os.listdir(profile_path):
+        file_name = os.path.join(profile_path, link)
+        if os.path.islink(file_name):
+            symlinks[link] = os.readlink(file_name)
+
+    return symlinks
+
 def find_profiles(current_config, profiles):
     "Find profiles matching the currently connected outputs"
     detected_profiles = []
@@ -539,12 +560,18 @@ def call_and_retry(*args, **kwargs):
     waits a second and then retries once. This mitigates #47,
     a timing issue with some drivers.
     """
-    kwargs_redirected = dict(kwargs)
-    if hasattr(subprocess, "DEVNULL"):
-        kwargs_redirected["stdout"] = getattr(subprocess, "DEVNULL")
+    if "dry_run" in kwargs:
+        dry_run = kwargs["dry_run"]
+        del kwargs["dry_run"]
     else:
-        kwargs_redirected["stdout"] = open(os.devnull, "w")
-    kwargs_redirected["stderr"] = kwargs_redirected["stdout"]
+        dry_run = False
+    kwargs_redirected = dict(kwargs)
+    if not dry_run:
+        if hasattr(subprocess, "DEVNULL"):
+            kwargs_redirected["stdout"] = getattr(subprocess, "DEVNULL")
+        else:
+            kwargs_redirected["stdout"] = open(os.devnull, "w")
+        kwargs_redirected["stderr"] = kwargs_redirected["stdout"]
     retval = subprocess.call(*args, **kwargs_redirected)
     if retval != 0:
         time.sleep(1)
@@ -575,6 +602,9 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False)
     #   at least.)
     # - Some implementations can not handle --transform at all, so avoid it unless
     #   necessary. (See https://github.com/phillipberndt/autorandr/issues/37)
+    # - Some implementations can not handle --panning without specifying --fb
+    #   explicitly, so avoid it unless necessary.
+    #   (See https://github.com/phillipberndt/autorandr/issues/72)
 
     auxiliary_changes_pre = []
     disable_outputs = []
@@ -589,28 +619,29 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False)
 
             option_vector = new_configuration[output].option_vector
             if xrandr_version() >= Version("1.3.0"):
-                if "transform" in current_configuration[output].options:
-                    auxiliary_changes_pre.append(["--output", output, "--transform", "none"])
-                else:
-                    try:
-                        transform_index = option_vector.index("--transform")
-                        if option_vector[transform_index+1] == XrandrOutput.XRANDR_DEFAULTS["transform"]:
-                            option_vector = option_vector[:transform_index] + option_vector[transform_index+2:]
-                    except ValueError:
-                        pass
+                for option in ("transform", "panning"):
+                    if option in current_configuration[output].options:
+                        auxiliary_changes_pre.append(["--output", output, "--%s" % option, "none"])
+                    else:
+                        try:
+                            option_index = option_vector.index("--%s" % option)
+                            if option_vector[option_index+1] == XrandrOutput.XRANDR_DEFAULTS[option]:
+                                option_vector = option_vector[:option_index] + option_vector[option_index+2:]
+                        except ValueError:
+                            pass
 
             enable_outputs.append(option_vector)
 
     # Perform pe-change auxiliary changes
     if auxiliary_changes_pre:
         argv = base_argv + list(chain.from_iterable(auxiliary_changes_pre))
-        if call_and_retry(argv) != 0:
+        if call_and_retry(argv, dry_run=dry_run) != 0:
             raise AutorandrException("Command failed: %s" % " ".join(argv))
 
     # Disable unused outputs, but make sure that there always is at least one active screen
     disable_keep = 0 if remain_active_count else 1
     if len(disable_outputs) > disable_keep:
-        if call_and_retry(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs))) != 0:
+        if call_and_retry(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs)), dry_run=dry_run) != 0:
             # Disabling the outputs failed. Retry with the next command:
             # Sometimes disabling of outputs fails due to an invalid RRSetScreenSize.
             # This does not occur if simultaneously the primary screen is reset.
@@ -629,7 +660,7 @@ def apply_configuration(new_configuration, current_configuration, dry_run=False)
     operations = disable_outputs + enable_outputs
     for index in range(0, len(operations), 2):
         argv = base_argv + list(chain.from_iterable(operations[index:index+2]))
-        if call_and_retry(argv) != 0:
+        if call_and_retry(argv, dry_run=dry_run) != 0:
             raise AutorandrException("Command failed: %s" % " ".join(argv))
 
 def is_equal_configuration(source_configuration, target_configuration):
@@ -685,6 +716,21 @@ def generate_virtual_profile(configuration, modes, profile_name):
                 shift += int(mode[shift_index])
             else:
                 configuration[output].options["off"] = None
+    elif profile_name == "clone-largest":
+        biggest_resolution = sorted([output_modes[0] for output, output_modes in modes.items()], key=lambda x: int(x["width"])*int(x["height"]), reverse=True)[0]
+        for output in configuration:
+            configuration[output].options = {}
+            if output in modes and configuration[output].edid:
+                mode = sorted(modes[output], key=lambda a: int(a["width"])*int(a["height"]) + (10**6 if a["preferred"] else 0))[-1]
+                configuration[output].options["mode"] = mode["name"]
+                configuration[output].options["rate"] = mode["rate"]
+                configuration[output].options["pos"] = "0x0"
+                scale = max(float(biggest_resolution["width"]) / float(mode["width"]) ,float(biggest_resolution["height"]) / float(mode["height"]))
+                mov_x = (float(mode["width"])*scale-float(biggest_resolution["width"]))/-2
+                mov_y = (float(mode["height"])*scale-float(biggest_resolution["height"]))/-2
+                configuration[output].options["transform"] = "{},0,{},0,{},{},0,0,1".format(scale, mov_x, scale, mov_y)
+            else:
+                configuration[output].options["off"] = None
     return configuration
 
 def print_profile_differences(one, another):
@@ -708,7 +754,15 @@ def exit_help():
     "Print help and exit"
     print(help_text)
     for profile in virtual_profiles:
-        print("  %-10s %s" % profile[:2])
+        name, description = profile[:2]
+        description = [ description ]
+        max_width = 78-18
+        while len(description[0]) > max_width + 1:
+            left_over = description[0][max_width:]
+            description[0] = description[0][:max_width] + "-"
+            description.insert(1, "  %-15s %s" % ("", left_over))
+        description = "\n".join(description)
+        print("  %-15s %s" % (name, description))
     sys.exit(0)
 
 def exec_scripts(profile_path, script_name, meta_information=None):
@@ -718,6 +772,8 @@ def exec_scripts(profile_path, script_name, meta_information=None):
     and system-wide configuration folders, named script_name or residing in
     subdirectories named script_name.d.
 
+    If profile_path is None, only global scripts will be invoked.
+
     meta_information is expected to be an dictionary. It will be passed to the block scripts
     in the environment, as variables called AUTORANDR_<CAPITALIZED_KEY_HERE>.
 
@@ -737,13 +793,19 @@ def exec_scripts(profile_path, script_name, meta_information=None):
     if not os.path.isdir(user_profile_path):
         user_profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
 
-    for folder in chain((profile_path, os.path.dirname(profile_path), user_profile_path),
-                        (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":"))):
+    candidate_directories = chain((user_profile_path,), (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")))
+    if profile_path:
+        candidate_directories = chain((profile_path,), candidate_directories)
+
+    for folder in candidate_directories:
 
         if script_name not in ran_scripts:
             script = os.path.join(folder, script_name)
             if os.access(script, os.X_OK | os.F_OK):
-                all_ok &= subprocess.call(script, env=env) != 0
+                try:
+                    all_ok &= subprocess.call(script, env=env) != 0
+                except:
+                    raise AutorandrException("Failed to execute user command: %s" % (script,))
                 ran_scripts.add(script_name)
 
         script_folder = os.path.join(folder, "%s.d" % script_name)
@@ -753,7 +815,10 @@ def exec_scripts(profile_path, script_name, meta_information=None):
                 if check_name not in ran_scripts:
                     script = os.path.join(script_folder, file_name)
                     if os.access(script, os.X_OK | os.F_OK):
-                        all_ok &= subprocess.call(script, env=env) != 0
+                        try:
+                            all_ok &= subprocess.call(script, env=env) != 0
+                        except:
+                            raise AutorandrException("Failed to execute user command: %s" % (script,))
                         ran_scripts.add(check_name)
 
     return all_ok
@@ -762,11 +827,11 @@ def dispatch_call_to_sessions(argv):
     """Invoke autorandr for each open local X11 session with the given options.
 
     The function iterates over all processes not owned by root and checks
-    whether they have a DISPLAY variable set. It strips the screen from any
-    variable it finds (i.e. :0.0 becomes :0) and checks whether this display
-    has been handled already. If it has not, it forks, changes uid/gid to
-    the user owning the process, reuses the process's environment and runs
-    autorandr with the parameters from argv.
+    whether they have DISPLAY and XAUTHORITY variables set. It strips the
+    screen from any variable it finds (i.e. :0.0 becomes :0) and checks whether
+    this display has been handled already. If it has not, it forks, changes
+    uid/gid to the user owning the process, reuses the process's environment
+    and runs autorandr with the parameters from argv.
 
     This function requires root permissions. It only works for X11 servers that
     have at least one non-root process running. It is susceptible for attacks
@@ -775,9 +840,29 @@ def dispatch_call_to_sessions(argv):
     which won't work. Since no other harm than prevention of automated
     execution of autorandr can be done this way, the assumption is that in this
     situation, the local administrator will handle the situation."""
+
     X11_displays_done = set()
 
     autorandr_binary = os.path.abspath(argv[0])
+    backup_candidates = {}
+
+    def fork_child_autorandr(pwent, process_environ):
+        print("Running autorandr as %s for display %s" % (pwent.pw_name, process_environ["DISPLAY"]))
+        child_pid = os.fork()
+        if child_pid == 0:
+            # This will throw an exception if any of the privilege changes fails,
+            # so it should be safe. Also, note that since the environment
+            # is taken from a process owned by the user, reusing it should
+            # not leak any information.
+            os.setgroups([])
+            os.setresgid(pwent.pw_gid, pwent.pw_gid, pwent.pw_gid)
+            os.setresuid(pwent.pw_uid, pwent.pw_uid, pwent.pw_uid)
+            os.chdir(pwent.pw_dir)
+            os.environ.clear()
+            os.environ.update(process_environ)
+            os.execl(autorandr_binary, autorandr_binary, *argv[1:])
+            os.exit(1)
+        os.waitpid(child_pid, 0)
 
     for directory in os.listdir("/proc"):
         directory = os.path.join("/proc/", directory)
@@ -787,7 +872,14 @@ def dispatch_call_to_sessions(argv):
         if not os.path.isfile(environ_file):
             continue
         uid = os.stat(environ_file).st_uid
-        if uid == 0:
+
+        # The following line assumes that user accounts start at 1000 and that
+        # no one works using the root or another system account. This is rather
+        # restrictive, but de facto default. Alternatives would be to use the
+        # UID_MIN from /etc/login.defs or FIRST_UID from /etc/adduser.conf;
+        # but effectively, both values aren't binding in any way.
+        # If this breaks your use case, please file a bug on Github.
+        if uid < 1000:
             continue
 
         process_environ = {}
@@ -797,32 +889,44 @@ def dispatch_call_to_sessions(argv):
                 if name == "DISPLAY" and "." in value:
                     value = value[:value.find(".")]
                 process_environ[name] = value
-        display = process_environ["DISPLAY"] if "DISPLAY" in process_environ else None
 
-        if display and display not in X11_displays_done:
+        if "DISPLAY" not in process_environ:
+            # Cannot work with this environment, skip.
+            continue
+
+        # To allow scripts to detect batch invocation (especially useful for predetect)
+        process_environ["AUTORANDR_BATCH_PID"] = str(os.getpid())
+        process_environ["UID"] = str(uid)
+
+        display = process_environ["DISPLAY"]
+
+        if "XAUTHORITY" not in process_environ:
+            # It's very likely that we cannot work with this environment either,
+            # but keep it as a backup just in case we don't find anything else.
+            backup_candidates[display] = process_environ
+            continue
+
+        if display not in X11_displays_done:
             try:
                 pwent = pwd.getpwuid(uid)
             except KeyError:
                 # User has no pwd entry
                 continue
 
-            print("Running autorandr as %s for display %s" % (pwent.pw_name, display))
-            child_pid = os.fork()
-            if child_pid == 0:
-                # This will throw an exception if any of the privilege changes fails,
-                # so it should be safe. Also, note that since the environment
-                # is taken from a process owned by the user, reusing it should
-                # not leak any information.
-                os.setgroups([])
-                os.setresgid(pwent.pw_gid, pwent.pw_gid, pwent.pw_gid)
-                os.setresuid(pwent.pw_uid, pwent.pw_uid, pwent.pw_uid)
-                os.chdir(pwent.pw_dir)
-                os.environ.clear()
-                os.environ.update(process_environ)
-                os.execl(autorandr_binary, autorandr_binary, *argv[1:])
-                os.exit(1)
-            os.waitpid(child_pid, 0)
+            fork_child_autorandr(pwent, process_environ)
+            X11_displays_done.add(display)
 
+    # Run autorandr for any users/displays which didn't have a process with
+    # XAUTHORITY set.
+    for display, process_environ in backup_candidates.items():
+        if display not in X11_displays_done:
+            try:
+                pwent = pwd.getpwuid(int(process_environ["UID"]))
+            except KeyError:
+                # User has no pwd entry
+                continue
+
+            fork_child_autorandr(pwent, process_environ)
             X11_displays_done.add(display)
 
 def main(argv):
@@ -834,6 +938,9 @@ def main(argv):
               file=sys.stderr)
         sys.exit(posix.EX_USAGE)
 
+    if "-h" in options or "--help" in options:
+        exit_help()
+
     # Batch mode
     if "--batch" in options:
         if ("DISPLAY" not in os.environ or not os.environ["DISPLAY"]) and os.getuid() == 0:
@@ -841,8 +948,13 @@ def main(argv):
         else:
             print("--batch mode can only be used by root and if $DISPLAY is unset")
         return
+    if "AUTORANDR_BATCH_PID" in os.environ:
+        user = pwd.getpwuid(os.getuid())
+        user = user.pw_name if user else "#%d" % os.getuid()
+        print("autorandr running as user %s (started from batch instance)" % user)
 
     profiles = {}
+    profile_symlinks = {}
     try:
         # Load profiles from each XDG config directory
         # The XDG spec says that earlier entries should take precedence, so reverse the order
@@ -850,6 +962,7 @@ def main(argv):
             system_profile_path = os.path.join(directory, "autorandr")
             if os.path.isdir(system_profile_path):
                 profiles.update(load_profiles(system_profile_path))
+                profile_symlinks.update(get_symlinks(system_profile_path))
         # For the user's profiles, prefer the legacy ~/.autorandr if it already exists
         # profile_path is also used later on to store configurations
         profile_path = os.path.expanduser("~/.autorandr")
@@ -858,11 +971,15 @@ def main(argv):
             profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
         if os.path.isdir(profile_path):
             profiles.update(load_profiles(profile_path))
+            profile_symlinks.update(get_symlinks(profile_path))
         # Sort by descending mtime
         profiles = OrderedDict(sorted(profiles.items(), key=lambda x: -x[1]["config-mtime"]))
     except Exception as e:
         raise AutorandrException("Failed to load profiles", e)
 
+    profile_symlinks = { k: v for k, v in profile_symlinks.items() if v in (x[0] for x in virtual_profiles) or v in profiles }
+
+    exec_scripts(None, "predetect")
     config, modes = parse_xrandr_output()
 
     if "--fingerprint" in options:
@@ -922,9 +1039,6 @@ def main(argv):
             raise AutorandrException("Failed to remove profile '%s'" % (options["--remove"],), e)
         sys.exit(0)
 
-    if "-h" in options or "--help" in options:
-        exit_help()
-
     detected_profiles = find_profiles(config, profiles)
     load_profile = False
 
@@ -965,6 +1079,11 @@ def main(argv):
         load_profile = options["--default"]
 
     if load_profile:
+        if load_profile in profile_symlinks:
+            if "--debug" in options:
+                print("'%s' symlinked to '%s'" % (load_profile, profile_symlinks[load_profile]))
+            load_profile = profile_symlinks[load_profile]
+
         if load_profile in ( x[0] for x in virtual_profiles ):
             load_config = generate_virtual_profile(config, modes, load_profile)
             scripts_path = os.path.join(profile_path, load_profile)
@@ -1001,6 +1120,8 @@ def main(argv):
                     apply_configuration(load_config, config, True)
                 apply_configuration(load_config, config, False)
                 exec_scripts(scripts_path, "postswitch", script_metadata)
+        except AutorandrException as e:
+            raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, e.report_bug)
         except Exception as e:
             raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, True)
 
@@ -1012,7 +1133,7 @@ def main(argv):
 
     sys.exit(0)
 
-if __name__ == '__main__':
+def exception_handled_main(argv=sys.argv):
     try:
         main(sys.argv)
     except AutorandrException as e:
@@ -1025,3 +1146,6 @@ if __name__ == '__main__':
 
         print("Unhandled exception ({0}). Please report this as a bug at https://github.com/phillipberndt/autorandr/issues.".format(e), file=sys.stderr)
         raise
+
+if __name__ == '__main__':
+    exception_handled_main()