import email as modemail
import subprocess
-from dbconn import DBConn, get_architecture, get_component, get_suite
+from dbconn import DBConn, get_architecture, get_component, get_suite, get_override_type, Keyring, session_wrapper
from dak_exceptions import *
+from gpg import SignedFile
from textutils import fix_maintainer
from regexes import re_html_escaping, html_escaping, re_single_line_field, \
re_multi_line_field, re_srchasver, re_taint_free, \
################################################################################
-def parse_deb822(contents, signing_rules=0):
+def parse_deb822(armored_contents, signing_rules=0, keyrings=None, session=None):
+ require_signature = True
+ if keyrings == None:
+ keyrings = []
+ require_signature = False
+
+ signed_file = SignedFile(armored_contents, keyrings=keyrings, require_signature=require_signature)
+ contents = signed_file.contents
+
error = ""
changes = {}
index += 1
indexed_lines[index] = line[:-1]
- inside_signature = 0
-
num_of_lines = len(indexed_lines.keys())
index = 0
first = -1
while index < num_of_lines:
index += 1
line = indexed_lines[index]
- if line == "":
- if signing_rules == 1:
- index += 1
- if index > num_of_lines:
- raise InvalidDscError, index
- line = indexed_lines[index]
- if not line.startswith("-----BEGIN PGP SIGNATURE"):
- raise InvalidDscError, index
- inside_signature = 0
- break
- else:
- continue
- if line.startswith("-----BEGIN PGP SIGNATURE"):
+ if line == "" and signing_rules == 1:
+ if index != num_of_lines:
+ raise InvalidDscError, index
break
- if line.startswith("-----BEGIN PGP SIGNED MESSAGE"):
- inside_signature = 1
- if signing_rules == 1:
- while index < num_of_lines and line != "":
- index += 1
- line = indexed_lines[index]
- continue
- # If we're not inside the signed data, don't process anything
- if signing_rules >= 0 and not inside_signature:
- continue
slf = re_single_line_field.match(line)
if slf:
field = slf.groups()[0].lower()
continue
error += line
- if signing_rules == 1 and inside_signature:
- raise InvalidDscError, index
-
- changes["filecontents"] = "".join(lines)
+ changes["filecontents"] = armored_contents
if changes.has_key("source"):
# Strip the source version in brackets from the source field,
################################################################################
-def parse_changes(filename, signing_rules=0, dsc_file=0):
+def parse_changes(filename, signing_rules=0, dsc_file=0, keyrings=None):
"""
Parses a changes file and returns a dictionary where each field is a
key. The mandatory first argument is the filename of the .changes
unicode(content, 'utf-8')
except UnicodeError:
raise ChangesUnicodeError, "Changes file not proper utf-8"
- changes = parse_deb822(content, signing_rules)
+ changes = parse_deb822(content, signing_rules, keyrings=keyrings)
if not dsc_file:
(r'orig.tar.gz', ('orig_tar_gz', 'orig_tar')),
(r'diff.gz', ('debian_diff',)),
(r'tar.gz', ('native_tar_gz', 'native_tar')),
- (r'debian\.tar\.(gz|bz2)', ('debian_tar',)),
- (r'orig\.tar\.(gz|bz2)', ('orig_tar',)),
- (r'tar\.(gz|bz2)', ('native_tar',)),
- (r'orig-.+\.tar\.(gz|bz2)', ('more_orig_tar',)),
+ (r'debian\.tar\.(gz|bz2|xz)', ('debian_tar',)),
+ (r'orig\.tar\.(gz|bz2|xz)', ('orig_tar',)),
+ (r'tar\.(gz|bz2|xz)', ('native_tar',)),
+ (r'orig-.+\.tar\.(gz|bz2|xz)', ('more_orig_tar',)),
)
for f in dsc_files.keys():
################################################################################
+# see http://bugs.debian.org/619131
+def build_package_set(dsc, session = None):
+ if not dsc.has_key("package-set"):
+ return {}
+
+ packages = {}
+
+ for line in dsc["package-set"].split("\n"):
+ if not line:
+ break
+
+ (name, section, priority) = line.split()
+ (section, component) = extract_component_from_section(section)
+
+ package_type = "deb"
+ if name.find(":") != -1:
+ (package_type, name) = name.split(":", 1)
+ if package_type == "src":
+ package_type = "dsc"
+
+ # Validate type if we have a session
+ if session and get_override_type(package_type, session) is None:
+ # Maybe just warn and ignore? exit(1) might be a bit hard...
+ utils.fubar("invalid type (%s) in Package-Set." % (package_type))
+
+ if section == "":
+ section = "-"
+ if priority == "":
+ priority = "-"
+
+ if package_type == "dsc":
+ priority = "source"
+
+ if not packages.has_key(name) or packages[name]["type"] == "dsc":
+ packages[name] = dict(priority=priority, section=section, type=package_type, component=component, files=[])
+
+ return packages
+
+################################################################################
+
def send_mail (message, filename=""):
"""sendmail wrapper, takes _either_ a message string or a file as arguments"""
return " ".join(["--keyring %s" % x for x in keyrings])
################################################################################
-
-def check_signature (sig_filename, data_filename="", keyrings=None, autofetch=None):
+@session_wrapper
+def check_signature (sig_filename, data_filename="", keyrings=None, autofetch=None, session=None):
"""
Check the signature of a file and return the fingerprint if the
signature is valid or 'None' if it's not. The first argument is the
return (None, rejects)
if not keyrings:
- keyrings = Cnf.ValueList("Dinstall::GPGKeyring")
+ keyrings = [ x.keyring_name for x in session.query(Keyring).filter(Keyring.active == True).all() ]
# Autofetch the signing key if that's enabled
if autofetch == None:
returns a dict associating source package name with a list of open wnpp
bugs (Yes, there might be more than one)
"""
-
+
line = []
try:
f = open(file)
lines = f.readlines()
- except IOerror, e:
+ except IOError, e:
print "Warning: Couldn't open %s; don't know about WNPP bugs, so won't close any." % file
lines = []
wnpp = {}
wnpp[source] = bugs
return wnpp
+################################################################################
+
+def get_packages_from_ftp(root, suite, component, architecture):
+ """
+ Returns an object containing apt_pkg-parseable data collected by
+ aggregating Packages.gz files gathered for each architecture.
+
+ @type root: string
+ @param root: path to ftp archive root directory
+
+ @type suite: string
+ @param suite: suite to extract files from
+
+ @type component: string
+ @param component: component to extract files from
+
+ @type architecture: string
+ @param architecture: architecture to extract files from
+
+ @rtype: TagFile
+ @return: apt_pkg class containing package data
+
+ """
+ filename = "%s/dists/%s/%s/binary-%s/Packages.gz" % (root, suite, component, architecture)
+ (fd, temp_file) = temp_filename()
+ (result, output) = commands.getstatusoutput("gunzip -c %s > %s" % (filename, temp_file))
+ if (result != 0):
+ fubar("Gunzip invocation failed!\n%s\n" % (output), result)
+ filename = "%s/dists/%s/%s/debian-installer/binary-%s/Packages.gz" % (root, suite, component, architecture)
+ if os.path.exists(filename):
+ (result, output) = commands.getstatusoutput("gunzip -c %s >> %s" % (filename, temp_file))
+ if (result != 0):
+ fubar("Gunzip invocation failed!\n%s\n" % (output), result)
+ packages = open_file(temp_file)
+ Packages = apt_pkg.ParseTagFile(packages)
+ os.unlink(temp_file)
+ return Packages