From: Ansgar Burchardt Date: Fri, 20 May 2011 09:02:13 +0000 (+0000) Subject: daklib/gpg.py: small library for PGP-signed files X-Git-Tag: debian-r/squeeze~155^2~22^2~8 X-Git-Url: https://git.donarmstrong.com/?a=commitdiff_plain;h=9952acaae7cd7758ef320acfe194168a22829ab8;p=dak.git daklib/gpg.py: small library for PGP-signed files Signed-off-by: Ansgar Burchardt --- diff --git a/daklib/gpg.py b/daklib/gpg.py new file mode 100644 index 00000000..5c396ec2 --- /dev/null +++ b/daklib/gpg.py @@ -0,0 +1,187 @@ +"""Utilities for signed files + +@contact: Debian FTP Master +@copyright: 2011 Ansgar Burchardt +@license: GNU General Public License version 2 or later +""" + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +import errno +import fcntl +import os +import select +import sys + +try: + _MAXFD = os.sysconf("SC_OPEN_MAX") +except: + _MAXFD = 256 + +class GpgException(Exception): + pass + +class _Pipe(object): + """context manager for pipes + + Note: When the pipe is closed by other means than the close_r and close_w + methods, you have to set self.r (self.w) to None. + """ + def __enter__(self): + (self.r, self.w) = os.pipe() + return self + def __exit__(self, type, value, traceback): + self.close_w() + self.close_r() + return False + def close_r(self): + """close reading side of the pipe""" + if self.r: + os.close(self.r) + self.r = None + def close_w(self): + """close writing part of the pipe""" + if self.w: + os.close(self.w) + self.w = None + +class SignedFile(object): + """handle files signed with PGP + + The following attributes are available: + contents - string with the content (after removing PGP armor) + valid - Boolean indicating a valid signature was found + fingerprint - fingerprint of the key used for signing + primary_fingerprint - fingerprint of the primary key associated to the key used for signing + """ + def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"): + """ + @param data: string containing the message + @param keyrings: seqeuence of keyrings + @param require_signature: if True (the default), will raise an exception if no valid signature was found + @param gpg: location of the gpg binary + """ + self.gpg = gpg + self.keyrings = keyrings + + self.valid = False + self.fingerprint = None + self.primary_fingerprint = None + + self._verify(data, require_signature) + + def _verify(self, data, require_signature): + with _Pipe() as stdin: + with _Pipe() as contents: + with _Pipe() as status: + pid = os.fork() + if pid == 0: + self._exec_gpg(stdin.r, contents.w, sys.stderr.fileno(), status.w) + else: + stdin.close_r() + contents.close_w() + status.close_w() + + read = self._do_io([contents.r, status.r], {stdin.w: data}) + stdin.w = None # was closed by _do_io + + (pid_, exit_code, usage_) = os.wait4(pid, 0) + + self.contents = read[contents.r] + self.status = read[status.r] + + if self.status == "": + raise GpgException("No status output from GPG. (GPG exited with status code %s)" % exit_code) + + for line in self.status.splitlines(): + self._parse_status(line) + + if require_signature and not self.valid: + raise GpgException("No valid signature found. (GPG exited with status code %s)" % exit_code) + + def _do_io(self, read, write): + read_lines = dict( (fd, []) for fd in read ) + write_pos = dict( (fd, 0) for fd in write ) + + read_set = list(read) + write_set = write.keys() + while len(read_set) > 0 or len(write_set) > 0: + r, w, x_ = select.select(read_set, write_set, ()) + for fd in r: + data = os.read(fd, 4096) + if data == "": + read_set.remove(fd) + read_lines[fd].append(data) + for fd in w: + data = write[fd][write_pos[fd]:] + if data == "": + os.close(fd) + write_set.remove(fd) + else: + bytes_written = os.write(fd, data) + write_pos[fd] += bytes_written + + return dict( (fd, "".join(read_lines[fd])) for fd in read_lines.keys() ) + + def _parse_status(self, line): + fields = line.split() + if fields[0] != "[GNUPG:]": + raise GpgException("Unexpected output on status-fd: %s" % line) + + # VALIDSIG + # + # + if fields[1] == "VALIDSIG": + self.valid = True + self.fingerprint = fields[2] + self.primary_fingerprint = fields[11] + + if fields[1] == "BADARMOR": + raise GpgException("Bad armor.") + + if fields[1] == "NODATA": + raise GpgException("No data.") + + if fields[1] == "DECRYPTION_FAILED": + raise GpgException("Decryption failed.") + + if fields[1] == "ERROR": + raise GpgException("Other error: %s %s" % (fields[2], fields[3])) + + def _exec_gpg(self, stdin, stdout, stderr, statusfd): + try: + if stdin != 0: + os.dup2(stdin, 0) + if stdout != 1: + os.dup2(stdout, 1) + if stderr != 2: + os.dup2(stderr, 2) + if statusfd != 3: + os.dup2(statusfd, 3) + for fd in range(4): + old = fcntl.fcntl(fd, fcntl.F_GETFD) + fcntl.fcntl(fd, fcntl.F_SETFD, old & ~fcntl.FD_CLOEXEC) + os.closerange(4, _MAXFD) + + args = [self.gpg, "--status-fd=3", "--no-default-keyring"] + for k in self.keyrings: + args.append("--keyring=%s" % k) + args.extend(["--decrypt", "-"]) + + os.execvp(self.gpg, args) + finally: + sys.exit(1) + +# vim: set sw=4 et: