import hashlib
import os
import posix
+import pwd
import re
import subprocess
import sys
+import shutil
+import time
from collections import OrderedDict
from distutils.version import LooseVersion as Version
from functools import reduce
from itertools import chain
+try:
+ input = raw_input
+except NameError:
+ pass
virtual_profiles = [
# (name, description, callback)
-h, --help get this small help
-c, --change reload current setup
-s, --save <profile> save your current setup to profile <profile>
+-r, --remove <profile> remove profile <profile>
-l, --load <profile> load profile <profile>
-d, --default <profile> make profile <profile> the default profile
--skip-options <option> comma separated list of xrandr arguments (e.g. "gamma")
--config dump your current xrandr setup
--dry-run don't change anything, only print the xrandr commands
--debug enable verbose output
+--batch run autorandr for all users with active X11 sessions
- To prevent a profile from being loaded, place a script call "block" in its
+ To prevent a profile from being loaded, place a script called "block" in its
directory. The script is evaluated before the screen setup is inspected, and
in case of it returning a value of 0 the profile is skipped. This can be used
to query the status of a docking station you are about to leave.
retval.append(":\n ")
retval.append(str(self.original_exception).replace("\n", "\n "))
if self.report_bug:
- retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream."
+ retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream:"
+ "\nhttps://github.com/phillipberndt/autorandr/issues"
"\nPlease attach the output of `xrandr --verbose` to your bug report if appropriate.")
return "".join(retval)
return profiles
+def get_symlinks(profile_path):
+ "Load all symlinks from a directory"
+
+ symlinks = {}
+ for link in os.listdir(profile_path):
+ file_name = os.path.join(profile_path, link)
+ if os.path.islink(file_name):
+ symlinks[link] = os.readlink(file_name)
+
+ return symlinks
+
def find_profiles(current_config, profiles):
"Find profiles matching the currently connected outputs"
detected_profiles = []
except:
return False
+def call_and_retry(*args, **kwargs):
+ """Wrapper around subprocess.call that retries failed calls.
+
+ This function calls subprocess.call and on non-zero exit states,
+ waits a second and then retries once. This mitigates #47,
+ a timing issue with some drivers.
+ """
+ kwargs_redirected = dict(kwargs)
+ if hasattr(subprocess, "DEVNULL"):
+ kwargs_redirected["stdout"] = getattr(subprocess, "DEVNULL")
+ else:
+ kwargs_redirected["stdout"] = open(os.devnull, "w")
+ kwargs_redirected["stderr"] = kwargs_redirected["stdout"]
+ retval = subprocess.call(*args, **kwargs_redirected)
+ if retval != 0:
+ time.sleep(1)
+ retval = subprocess.call(*args, **kwargs)
+ return retval
+
def apply_configuration(new_configuration, current_configuration, dry_run=False):
"Apply a configuration"
outputs = sorted(new_configuration.keys(), key=lambda x: new_configuration[x].sort_key)
# Perform pe-change auxiliary changes
if auxiliary_changes_pre:
argv = base_argv + list(chain.from_iterable(auxiliary_changes_pre))
- if subprocess.call(argv) != 0:
+ if call_and_retry(argv) != 0:
raise AutorandrException("Command failed: %s" % " ".join(argv))
# Disable unused outputs, but make sure that there always is at least one active screen
disable_keep = 0 if remain_active_count else 1
if len(disable_outputs) > disable_keep:
- if subprocess.call(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs))) != 0:
+ if call_and_retry(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs))) != 0:
# Disabling the outputs failed. Retry with the next command:
# Sometimes disabling of outputs fails due to an invalid RRSetScreenSize.
# This does not occur if simultaneously the primary screen is reset.
operations = disable_outputs + enable_outputs
for index in range(0, len(operations), 2):
argv = base_argv + list(chain.from_iterable(operations[index:index+2]))
- if subprocess.call(argv) != 0:
+ if call_and_retry(argv) != 0:
raise AutorandrException("Command failed: %s" % " ".join(argv))
def is_equal_configuration(source_configuration, target_configuration):
user_profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
for folder in chain((profile_path, os.path.dirname(profile_path), user_profile_path),
- (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "").split(":"))):
+ (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":"))):
if script_name not in ran_scripts:
script = os.path.join(folder, script_name)
return all_ok
+def dispatch_call_to_sessions(argv):
+ """Invoke autorandr for each open local X11 session with the given options.
+
+ The function iterates over all processes not owned by root and checks
+ whether they have a DISPLAY variable set. It strips the screen from any
+ variable it finds (i.e. :0.0 becomes :0) and checks whether this display
+ has been handled already. If it has not, it forks, changes uid/gid to
+ the user owning the process, reuses the process's environment and runs
+ autorandr with the parameters from argv.
+
+ This function requires root permissions. It only works for X11 servers that
+ have at least one non-root process running. It is susceptible for attacks
+ where one user runs a process with another user's DISPLAY variable - in
+ this case, it might happen that autorandr is invoked for the other user,
+ which won't work. Since no other harm than prevention of automated
+ execution of autorandr can be done this way, the assumption is that in this
+ situation, the local administrator will handle the situation."""
+ X11_displays_done = set()
+
+ autorandr_binary = os.path.abspath(argv[0])
+
+ for directory in os.listdir("/proc"):
+ directory = os.path.join("/proc/", directory)
+ if not os.path.isdir(directory):
+ continue
+ environ_file = os.path.join(directory, "environ")
+ if not os.path.isfile(environ_file):
+ continue
+ uid = os.stat(environ_file).st_uid
+
+ # The following line assumes that user accounts start at 1000 and that
+ # no one works using the root or another system account. This is rather
+ # restrictive, but de facto default. Alternatives would be to use the
+ # UID_MIN from /etc/login.defs or FIRST_UID from /etc/adduser.conf;
+ # but effectively, both values aren't binding in any way.
+ # If this breaks your use case, please file a bug on Github.
+ if uid < 1000:
+ continue
+
+ process_environ = {}
+ for environ_entry in open(environ_file).read().split("\0"):
+ if "=" in environ_entry:
+ name, value = environ_entry.split("=", 1)
+ if name == "DISPLAY" and "." in value:
+ value = value[:value.find(".")]
+ process_environ[name] = value
+ display = process_environ["DISPLAY"] if "DISPLAY" in process_environ else None
+
+ if display and display not in X11_displays_done:
+ try:
+ pwent = pwd.getpwuid(uid)
+ except KeyError:
+ # User has no pwd entry
+ continue
+
+ print("Running autorandr as %s for display %s" % (pwent.pw_name, display))
+ child_pid = os.fork()
+ if child_pid == 0:
+ # This will throw an exception if any of the privilege changes fails,
+ # so it should be safe. Also, note that since the environment
+ # is taken from a process owned by the user, reusing it should
+ # not leak any information.
+ os.setgroups([])
+ os.setresgid(pwent.pw_gid, pwent.pw_gid, pwent.pw_gid)
+ os.setresuid(pwent.pw_uid, pwent.pw_uid, pwent.pw_uid)
+ os.chdir(pwent.pw_dir)
+ os.environ.clear()
+ os.environ.update(process_environ)
+ os.execl(autorandr_binary, autorandr_binary, *argv[1:])
+ os.exit(1)
+ os.waitpid(child_pid, 0)
+
+ X11_displays_done.add(display)
+
def main(argv):
try:
- options = dict(getopt.getopt(argv[1:], "s:l:d:cfh", [ "dry-run", "change", "default=", "save=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0])
+ options = dict(getopt.getopt(argv[1:], "s:r:l:d:cfh", [ "batch", "dry-run", "change", "default=", "save=", "remove=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0])
except getopt.GetoptError as e:
print("Failed to parse options: {0}.\n"
"Use --help to get usage information.".format(str(e)),
file=sys.stderr)
sys.exit(posix.EX_USAGE)
+ # Batch mode
+ if "--batch" in options:
+ if ("DISPLAY" not in os.environ or not os.environ["DISPLAY"]) and os.getuid() == 0:
+ dispatch_call_to_sessions([ x for x in argv if x != "--batch" ])
+ else:
+ print("--batch mode can only be used by root and if $DISPLAY is unset")
+ return
+
profiles = {}
+ profile_symlinks = {}
try:
# Load profiles from each XDG config directory
# The XDG spec says that earlier entries should take precedence, so reverse the order
- for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "").split(":")):
+ for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")):
system_profile_path = os.path.join(directory, "autorandr")
if os.path.isdir(system_profile_path):
profiles.update(load_profiles(system_profile_path))
+ profile_symlinks.update(get_symlinks(system_profile_path))
# For the user's profiles, prefer the legacy ~/.autorandr if it already exists
# profile_path is also used later on to store configurations
profile_path = os.path.expanduser("~/.autorandr")
profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
if os.path.isdir(profile_path):
profiles.update(load_profiles(profile_path))
+ profile_symlinks.update(get_symlinks(profile_path))
# Sort by descending mtime
profiles = OrderedDict(sorted(profiles.items(), key=lambda x: -x[1]["config-mtime"]))
except Exception as e:
raise AutorandrException("Failed to load profiles", e)
+ profile_symlinks = { k: v for k, v in profile_symlinks.items() if v in (x[0] for x in virtual_profiles) or v in profiles }
+
config, modes = parse_xrandr_output()
if "--fingerprint" in options:
print("Saved current configuration as profile '%s'" % options["--save"])
sys.exit(0)
+ if "-r" in options:
+ options["--remove"] = options["-r"]
+ if "--remove" in options:
+ if options["--remove"] in ( x[0] for x in virtual_profiles ):
+ raise AutorandrException("Cannot remove profile '%s':\nThis configuration name is a reserved virtual configuration." % options["--remove"])
+ if options["--remove"] not in profiles.keys():
+ raise AutorandrException("Cannot remove profile '%s':\nThis profile does not exist." % options["--remove"])
+ try:
+ remove = True
+ profile_folder = os.path.join(profile_path, options["--remove"])
+ profile_dirlist = os.listdir(profile_folder)
+ profile_dirlist.remove("config")
+ profile_dirlist.remove("setup")
+ if profile_dirlist:
+ print("Profile folder '%s' contains the following additional files:\n---\n%s\n---" % (options["--remove"], "\n".join(profile_dirlist)))
+ response = input("Do you really want to remove profile '%s'? If so, type 'yes': " % options["--remove"]).strip()
+ if response != "yes":
+ remove = False
+ if remove is True:
+ shutil.rmtree(profile_folder)
+ print("Removed profile '%s'" % options["--remove"])
+ else:
+ print("Profile '%s' was not removed" % options["--remove"])
+ except Exception as e:
+ raise AutorandrException("Failed to remove profile '%s'" % (options["--remove"],), e)
+ sys.exit(0)
+
if "-h" in options or "--help" in options:
exit_help()
load_profile = options["--default"]
if load_profile:
+ if load_profile in profile_symlinks:
+ if "--debug" in options:
+ print("'%s' symlinked to '%s'" % (load_profile, profile_symlinks[load_profile]))
+ load_profile = profile_symlinks[load_profile]
+
if load_profile in ( x[0] for x in virtual_profiles ):
load_config = generate_virtual_profile(config, modes, load_profile)
scripts_path = os.path.join(profile_path, load_profile)
"PROFILE_FOLDER": scripts_path,
}
exec_scripts(scripts_path, "preswitch", script_metadata)
+ if "--debug" in options:
+ print("Going to run:")
+ apply_configuration(load_config, config, True)
apply_configuration(load_config, config, False)
exec_scripts(scripts_path, "postswitch", script_metadata)
except Exception as e:
sys.exit(0)
-if __name__ == '__main__':
+def exception_handled_main(argv=sys.argv):
try:
main(sys.argv)
except AutorandrException as e:
print("Exception: {0}".format(e.__class__.__name__))
sys.exit(2)
- print("Unhandled exception ({0}). Please report this as a bug.".format(e), file=sys.stderr)
+ print("Unhandled exception ({0}). Please report this as a bug at https://github.com/phillipberndt/autorandr/issues.".format(e), file=sys.stderr)
raise
+
+if __name__ == '__main__':
+ exception_handled_main()