import hashlib
import os
import posix
+import pwd
import re
import subprocess
import sys
import shutil
+import time
from collections import OrderedDict
from distutils.version import LooseVersion as Version
from functools import reduce
from itertools import chain
+try:
+ input = raw_input
+except NameError:
+ pass
virtual_profiles = [
# (name, description, callback)
--config dump your current xrandr setup
--dry-run don't change anything, only print the xrandr commands
--debug enable verbose output
+--batch run autorandr for all users with active X11 sessions
- To prevent a profile from being loaded, place a script call "block" in its
+ To prevent a profile from being loaded, place a script called "block" in its
directory. The script is evaluated before the screen setup is inspected, and
in case of it returning a value of 0 the profile is skipped. This can be used
to query the status of a docking station you are about to leave.
while trace.tb_next:
trace = trace.tb_next
self.line = trace.tb_lineno
+ self.file_name = trace.tb_frame.f_code.co_filename
else:
try:
import inspect
- self.line = inspect.currentframe().f_back.f_lineno
+ frame = inspect.currentframe().f_back
+ self.line = frame.f_lineno
+ self.file_name = frame.f_code.co_filename
except:
self.line = None
+ self.file_name = None
self.original_exception = None
+ if os.path.abspath(self.file_name) == os.path.abspath(sys.argv[0]):
+ self.file_name = None
+
def __str__(self):
retval = [ self.message ]
if self.line:
- retval.append(" (line %d)" % self.line)
+ retval.append(" (line %d%s)" % (self.line, ("; %s" % self.file_name) if self.file_name else ""))
if self.original_exception:
retval.append(":\n ")
retval.append(str(self.original_exception).replace("\n", "\n "))
if self.report_bug:
- retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream."
+ retval.append("\nThis appears to be a bug. Please help improving autorandr by reporting it upstream:"
+ "\nhttps://github.com/phillipberndt/autorandr/issues"
"\nPlease attach the output of `xrandr --verbose` to your bug report if appropriate.")
return "".join(retval)
for line in configuration.split("\n"):
if line:
line = line.split(None, 1)
+ if line and line[0].startswith("#"):
+ continue
options[line[0]] = line[1] if len(line) > 1 else None
edid = None
if not os.path.isfile(config_name) or not os.path.isfile(setup_name):
continue
- edids = dict([ x.strip().split() for x in open(setup_name).readlines() if x.strip() ])
+ edids = dict([ x.split() for x in (y.strip() for y in open(setup_name).readlines()) if x and x[0] != "#" ])
config = {}
buffer = []
return profiles
+def get_symlinks(profile_path):
+ "Load all symlinks from a directory"
+
+ symlinks = {}
+ for link in os.listdir(profile_path):
+ file_name = os.path.join(profile_path, link)
+ if os.path.islink(file_name):
+ symlinks[link] = os.readlink(file_name)
+
+ return symlinks
+
def find_profiles(current_config, profiles):
"Find profiles matching the currently connected outputs"
detected_profiles = []
except:
return False
+def call_and_retry(*args, **kwargs):
+ """Wrapper around subprocess.call that retries failed calls.
+
+ This function calls subprocess.call and on non-zero exit states,
+ waits a second and then retries once. This mitigates #47,
+ a timing issue with some drivers.
+ """
+ if "dry_run" in kwargs:
+ dry_run = kwargs["dry_run"]
+ del kwargs["dry_run"]
+ else:
+ dry_run = False
+ kwargs_redirected = dict(kwargs)
+ if not dry_run:
+ if hasattr(subprocess, "DEVNULL"):
+ kwargs_redirected["stdout"] = getattr(subprocess, "DEVNULL")
+ else:
+ kwargs_redirected["stdout"] = open(os.devnull, "w")
+ kwargs_redirected["stderr"] = kwargs_redirected["stdout"]
+ retval = subprocess.call(*args, **kwargs_redirected)
+ if retval != 0:
+ time.sleep(1)
+ retval = subprocess.call(*args, **kwargs)
+ return retval
+
def apply_configuration(new_configuration, current_configuration, dry_run=False):
"Apply a configuration"
outputs = sorted(new_configuration.keys(), key=lambda x: new_configuration[x].sort_key)
# Perform pe-change auxiliary changes
if auxiliary_changes_pre:
argv = base_argv + list(chain.from_iterable(auxiliary_changes_pre))
- if subprocess.call(argv) != 0:
+ if call_and_retry(argv, dry_run=dry_run) != 0:
raise AutorandrException("Command failed: %s" % " ".join(argv))
# Disable unused outputs, but make sure that there always is at least one active screen
disable_keep = 0 if remain_active_count else 1
if len(disable_outputs) > disable_keep:
- if subprocess.call(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs))) != 0:
+ if call_and_retry(base_argv + list(chain.from_iterable(disable_outputs[:-1] if disable_keep else disable_outputs)), dry_run=dry_run) != 0:
# Disabling the outputs failed. Retry with the next command:
# Sometimes disabling of outputs fails due to an invalid RRSetScreenSize.
# This does not occur if simultaneously the primary screen is reset.
operations = disable_outputs + enable_outputs
for index in range(0, len(operations), 2):
argv = base_argv + list(chain.from_iterable(operations[index:index+2]))
- if subprocess.call(argv) != 0:
+ if call_and_retry(argv, dry_run=dry_run) != 0:
raise AutorandrException("Command failed: %s" % " ".join(argv))
def is_equal_configuration(source_configuration, target_configuration):
and system-wide configuration folders, named script_name or residing in
subdirectories named script_name.d.
+ If profile_path is None, only global scripts will be invoked.
+
meta_information is expected to be an dictionary. It will be passed to the block scripts
in the environment, as variables called AUTORANDR_<CAPITALIZED_KEY_HERE>.
if not os.path.isdir(user_profile_path):
user_profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
- for folder in chain((profile_path, os.path.dirname(profile_path), user_profile_path),
- (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "").split(":"))):
+ candidate_directories = chain((user_profile_path,), (os.path.join(x, "autorandr") for x in os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")))
+ if profile_path:
+ candidate_directories = chain((profile_path,), candidate_directories)
+
+ for folder in candidate_directories:
if script_name not in ran_scripts:
script = os.path.join(folder, script_name)
if os.access(script, os.X_OK | os.F_OK):
- all_ok &= subprocess.call(script, env=env) != 0
+ try:
+ all_ok &= subprocess.call(script, env=env) != 0
+ except:
+ raise AutorandrException("Failed to execute user command: %s" % (script,))
ran_scripts.add(script_name)
script_folder = os.path.join(folder, "%s.d" % script_name)
if check_name not in ran_scripts:
script = os.path.join(script_folder, file_name)
if os.access(script, os.X_OK | os.F_OK):
- all_ok &= subprocess.call(script, env=env) != 0
+ try:
+ all_ok &= subprocess.call(script, env=env) != 0
+ except:
+ raise AutorandrException("Failed to execute user command: %s" % (script,))
ran_scripts.add(check_name)
return all_ok
+def dispatch_call_to_sessions(argv):
+ """Invoke autorandr for each open local X11 session with the given options.
+
+ The function iterates over all processes not owned by root and checks
+ whether they have a DISPLAY variable set. It strips the screen from any
+ variable it finds (i.e. :0.0 becomes :0) and checks whether this display
+ has been handled already. If it has not, it forks, changes uid/gid to
+ the user owning the process, reuses the process's environment and runs
+ autorandr with the parameters from argv.
+
+ This function requires root permissions. It only works for X11 servers that
+ have at least one non-root process running. It is susceptible for attacks
+ where one user runs a process with another user's DISPLAY variable - in
+ this case, it might happen that autorandr is invoked for the other user,
+ which won't work. Since no other harm than prevention of automated
+ execution of autorandr can be done this way, the assumption is that in this
+ situation, the local administrator will handle the situation."""
+ X11_displays_done = set()
+
+ autorandr_binary = os.path.abspath(argv[0])
+
+ for directory in os.listdir("/proc"):
+ directory = os.path.join("/proc/", directory)
+ if not os.path.isdir(directory):
+ continue
+ environ_file = os.path.join(directory, "environ")
+ if not os.path.isfile(environ_file):
+ continue
+ uid = os.stat(environ_file).st_uid
+
+ # The following line assumes that user accounts start at 1000 and that
+ # no one works using the root or another system account. This is rather
+ # restrictive, but de facto default. Alternatives would be to use the
+ # UID_MIN from /etc/login.defs or FIRST_UID from /etc/adduser.conf;
+ # but effectively, both values aren't binding in any way.
+ # If this breaks your use case, please file a bug on Github.
+ if uid < 1000:
+ continue
+
+ process_environ = {}
+ for environ_entry in open(environ_file).read().split("\0"):
+ if "=" in environ_entry:
+ name, value = environ_entry.split("=", 1)
+ if name == "DISPLAY" and "." in value:
+ value = value[:value.find(".")]
+ process_environ[name] = value
+ display = process_environ["DISPLAY"] if "DISPLAY" in process_environ else None
+
+ # To allow scripts to detect batch invocation (especially useful for predetect)
+ process_environ["AUTORANDR_BATCH_PID"] = str(os.getpid())
+
+ if display and display not in X11_displays_done:
+ try:
+ pwent = pwd.getpwuid(uid)
+ except KeyError:
+ # User has no pwd entry
+ continue
+
+ print("Running autorandr as %s for display %s" % (pwent.pw_name, display))
+ child_pid = os.fork()
+ if child_pid == 0:
+ # This will throw an exception if any of the privilege changes fails,
+ # so it should be safe. Also, note that since the environment
+ # is taken from a process owned by the user, reusing it should
+ # not leak any information.
+ os.setgroups([])
+ os.setresgid(pwent.pw_gid, pwent.pw_gid, pwent.pw_gid)
+ os.setresuid(pwent.pw_uid, pwent.pw_uid, pwent.pw_uid)
+ os.chdir(pwent.pw_dir)
+ os.environ.clear()
+ os.environ.update(process_environ)
+ os.execl(autorandr_binary, autorandr_binary, *argv[1:])
+ os.exit(1)
+ os.waitpid(child_pid, 0)
+
+ X11_displays_done.add(display)
+
def main(argv):
try:
- options = dict(getopt.getopt(argv[1:], "s:r:l:d:cfh", [ "dry-run", "change", "default=", "save=", "remove=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0])
+ options = dict(getopt.getopt(argv[1:], "s:r:l:d:cfh", [ "batch", "dry-run", "change", "default=", "save=", "remove=", "load=", "force", "fingerprint", "config", "debug", "skip-options=", "help" ])[0])
except getopt.GetoptError as e:
print("Failed to parse options: {0}.\n"
"Use --help to get usage information.".format(str(e)),
file=sys.stderr)
sys.exit(posix.EX_USAGE)
+ if "-h" in options or "--help" in options:
+ exit_help()
+
+ # Batch mode
+ if "--batch" in options:
+ if ("DISPLAY" not in os.environ or not os.environ["DISPLAY"]) and os.getuid() == 0:
+ dispatch_call_to_sessions([ x for x in argv if x != "--batch" ])
+ else:
+ print("--batch mode can only be used by root and if $DISPLAY is unset")
+ return
+
profiles = {}
+ profile_symlinks = {}
try:
# Load profiles from each XDG config directory
# The XDG spec says that earlier entries should take precedence, so reverse the order
- for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "").split(":")):
+ for directory in reversed(os.environ.get("XDG_CONFIG_DIRS", "/etc/xdg").split(":")):
system_profile_path = os.path.join(directory, "autorandr")
if os.path.isdir(system_profile_path):
profiles.update(load_profiles(system_profile_path))
+ profile_symlinks.update(get_symlinks(system_profile_path))
# For the user's profiles, prefer the legacy ~/.autorandr if it already exists
# profile_path is also used later on to store configurations
profile_path = os.path.expanduser("~/.autorandr")
profile_path = os.path.join(os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), "autorandr")
if os.path.isdir(profile_path):
profiles.update(load_profiles(profile_path))
+ profile_symlinks.update(get_symlinks(profile_path))
# Sort by descending mtime
profiles = OrderedDict(sorted(profiles.items(), key=lambda x: -x[1]["config-mtime"]))
except Exception as e:
raise AutorandrException("Failed to load profiles", e)
+ profile_symlinks = { k: v for k, v in profile_symlinks.items() if v in (x[0] for x in virtual_profiles) or v in profiles }
+
+ exec_scripts(None, "predetect")
config, modes = parse_xrandr_output()
if "--fingerprint" in options:
profile_dirlist.remove("config")
profile_dirlist.remove("setup")
if profile_dirlist:
- print("Profile folder '%s' contains the following:\n---\n%s\n---" % (options["--remove"], "\n".join(profile_dirlist)))
+ print("Profile folder '%s' contains the following additional files:\n---\n%s\n---" % (options["--remove"], "\n".join(profile_dirlist)))
response = input("Do you really want to remove profile '%s'? If so, type 'yes': " % options["--remove"]).strip()
if response != "yes":
remove = False
raise AutorandrException("Failed to remove profile '%s'" % (options["--remove"],), e)
sys.exit(0)
- if "-h" in options or "--help" in options:
- exit_help()
-
detected_profiles = find_profiles(config, profiles)
load_profile = False
load_profile = options["--default"]
if load_profile:
+ if load_profile in profile_symlinks:
+ if "--debug" in options:
+ print("'%s' symlinked to '%s'" % (load_profile, profile_symlinks[load_profile]))
+ load_profile = profile_symlinks[load_profile]
+
if load_profile in ( x[0] for x in virtual_profiles ):
load_config = generate_virtual_profile(config, modes, load_profile)
scripts_path = os.path.join(profile_path, load_profile)
"PROFILE_FOLDER": scripts_path,
}
exec_scripts(scripts_path, "preswitch", script_metadata)
+ if "--debug" in options:
+ print("Going to run:")
+ apply_configuration(load_config, config, True)
apply_configuration(load_config, config, False)
exec_scripts(scripts_path, "postswitch", script_metadata)
+ except AutorandrException as e:
+ raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, e.report_bug)
except Exception as e:
raise AutorandrException("Failed to apply profile '%s'" % load_profile, e, True)
sys.exit(0)
-if __name__ == '__main__':
+def exception_handled_main(argv=sys.argv):
try:
main(sys.argv)
except AutorandrException as e:
print("Exception: {0}".format(e.__class__.__name__))
sys.exit(2)
- print("Unhandled exception ({0}). Please report this as a bug.".format(e), file=sys.stderr)
+ print("Unhandled exception ({0}). Please report this as a bug at https://github.com/phillipberndt/autorandr/issues.".format(e), file=sys.stderr)
raise
+
+if __name__ == '__main__':
+ exception_handled_main()