1 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
3 # Parts based on code that is
4 # Copyright (C) 2001-2006, James Troup <james@nocrew.org>
5 # Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """module provided pre-acceptance tests
23 Please read the documentation for the L{Check} class for the interface.
26 from daklib.config import Config
27 from daklib.dbconn import *
28 import daklib.dbconn as dbconn
29 from daklib.regexes import *
30 from daklib.textutils import fix_maintainer, ParseMaintError
31 import daklib.lintian as lintian
32 import daklib.utils as utils
33 from daklib.upload import InvalidHashException
37 from apt_pkg import version_compare
43 # TODO: replace by subprocess
46 class Reject(Exception):
47 """exception raised by failing checks"""
50 class RejectStupidMaintainerException(Exception):
51 """exception raised by failing the external hashes check"""
54 return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4]
57 """base class for checks
59 checks are called by L{daklib.archive.ArchiveUpload}. Failing tests should
60 raise a L{daklib.checks.Reject} exception including a human-readable
61 description why the upload should be rejected.
63 def check(self, upload):
66 @type upload: L{daklib.archive.ArchiveUpload}
67 @param upload: upload to check
69 @raise daklib.checks.Reject: upload should be rejected
72 def per_suite_check(self, upload, suite):
73 """do per-suite checks
75 @type upload: L{daklib.archive.ArchiveUpload}
76 @param upload: upload to check
78 @type suite: L{daklib.dbconn.Suite}
79 @param suite: suite to check
81 @raise daklib.checks.Reject: upload should be rejected
86 """allow to force ignore failing test
88 C{True} if it is acceptable to force ignoring a failing test,
93 class SignatureCheck(Check):
94 """Check signature of changes and dsc file (if included in upload)
96 Make sure the signature is valid and done by a known user.
98 def check(self, upload):
99 changes = upload.changes
100 if not changes.valid_signature:
101 raise Reject("Signature for .changes not valid.")
102 if changes.source is not None:
103 if not changes.source.valid_signature:
104 raise Reject("Signature for .dsc not valid.")
105 if changes.source.primary_fingerprint != changes.primary_fingerprint:
106 raise Reject(".changes and .dsc not signed by the same key.")
107 if upload.fingerprint is None or upload.fingerprint.uid is None:
108 raise Reject(".changes signed by unknown key.")
110 class ChangesCheck(Check):
111 """Check changes file for syntax errors."""
112 def check(self, upload):
113 changes = upload.changes
114 control = changes.changes
115 fn = changes.filename
117 for field in ('Distribution', 'Source', 'Binary', 'Architecture', 'Version', 'Maintainer', 'Files', 'Changes', 'Description'):
118 if field not in control:
119 raise Reject('{0}: misses mandatory field {1}'.format(fn, field))
121 source_match = re_field_source.match(control['Source'])
123 raise Reject('{0}: Invalid Source field'.format(fn))
124 version_match = re_field_version.match(control['Version'])
125 if not version_match:
126 raise Reject('{0}: Invalid Version field'.format(fn))
127 version_without_epoch = version_match.group('without_epoch')
129 match = re_file_changes.match(fn)
131 raise Reject('{0}: Does not match re_file_changes'.format(fn))
132 if match.group('package') != source_match.group('package'):
133 raise Reject('{0}: Filename does not match Source field'.format(fn))
134 if match.group('version') != version_without_epoch:
135 raise Reject('{0}: Filename does not match Version field'.format(fn))
137 for bn in changes.binary_names:
138 if not re_field_package.match(bn):
139 raise Reject('{0}: Invalid binary package name {1}'.format(fn, bn))
141 if 'source' in changes.architectures and changes.source is None:
142 raise Reject("Changes has architecture source, but no source found.")
143 if changes.source is not None and 'source' not in changes.architectures:
144 raise Reject("Upload includes source, but changes does not say so.")
147 fix_maintainer(changes.changes['Maintainer'])
148 except ParseMaintError as e:
149 raise Reject('{0}: Failed to parse Maintainer field: {1}'.format(changes.filename, e))
152 changed_by = changes.changes.get('Changed-By')
153 if changed_by is not None:
154 fix_maintainer(changed_by)
155 except ParseMaintError as e:
156 raise Reject('{0}: Failed to parse Changed-By field: {1}'.format(changes.filename, e))
158 if len(changes.files) == 0:
159 raise Reject("Changes includes no files.")
161 for bugnum in changes.closed_bugs:
162 if not re_isanum.match(bugnum):
163 raise Reject('{0}: "{1}" in Closes field is not a number'.format(changes.filename, bugnum))
167 class HashesCheck(Check):
168 """Check hashes in .changes and .dsc are valid."""
169 def check(self, upload):
172 changes = upload.changes
173 what = changes.filename
174 for f in changes.files.itervalues():
175 f.check(upload.directory)
176 source = changes.source
177 if source is not None:
178 what = source.filename
179 for f in source.files.itervalues():
180 f.check(upload.directory)
182 if e.errno == errno.ENOENT:
183 raise Reject('{0} refers to non-existing file: {1}\n'
184 'Perhaps you need to include it in your upload?'
185 .format(what, os.path.basename(e.filename)))
187 except InvalidHashException as e:
188 raise Reject('{0}: {1}'.format(what, unicode(e)))
190 class ExternalHashesCheck(Check):
191 """Checks hashes in .changes and .dsc against an external database."""
192 def check_single(self, session, f):
193 q = session.execute("SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE '%%/%s'" % f.filename)
194 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or (None, None, None, None)
199 if ext_size != f.size:
200 raise RejectStupidMaintainerException(f.filename, 'size', f.size, ext_size)
202 if ext_md5sum != f.md5sum:
203 raise RejectStupidMaintainerException(f.filename, 'md5sum', f.md5sum, ext_md5sum)
205 if ext_sha1sum != f.sha1sum:
206 raise RejectStupidMaintainerException(f.filename, 'sha1sum', f.sha1sum, ext_sha1sum)
208 if ext_sha256sum != f.sha256sum:
209 raise RejectStupidMaintainerException(f.filename, 'sha256sum', f.sha256sum, ext_sha256sum)
211 def check(self, upload):
214 if not cnf.use_extfiles:
217 session = upload.session
218 changes = upload.changes
220 for f in changes.files.itervalues():
221 self.check_single(session, f)
222 source = changes.source
223 if source is not None:
224 for f in source.files.itervalues():
225 self.check_single(session, f)
227 class BinaryCheck(Check):
228 """Check binary packages for syntax errors."""
229 def check(self, upload):
230 for binary in upload.changes.binaries:
231 self.check_binary(upload, binary)
233 binary_names = set([ binary.control['Package'] for binary in upload.changes.binaries ])
234 for bn in binary_names:
235 if bn not in upload.changes.binary_names:
236 raise Reject('Package {0} is not mentioned in Binary field in changes'.format(bn))
240 def check_binary(self, upload, binary):
241 fn = binary.hashed_file.filename
242 control = binary.control
244 for field in ('Package', 'Architecture', 'Version', 'Description'):
245 if field not in control:
246 raise Reject('{0}: Missing mandatory field {0}.'.format(fn, field))
250 package = control['Package']
251 if not re_field_package.match(package):
252 raise Reject('{0}: Invalid Package field'.format(fn))
254 version = control['Version']
255 version_match = re_field_version.match(version)
256 if not version_match:
257 raise Reject('{0}: Invalid Version field'.format(fn))
258 version_without_epoch = version_match.group('without_epoch')
260 architecture = control['Architecture']
261 if architecture not in upload.changes.architectures:
262 raise Reject('{0}: Architecture not in Architecture field in changes file'.format(fn))
263 if architecture == 'source':
264 raise Reject('{0}: Architecture "source" invalid for binary packages'.format(fn))
266 source = control.get('Source')
267 if source is not None and not re_field_source.match(source):
268 raise Reject('{0}: Invalid Source field'.format(fn))
272 match = re_file_binary.match(fn)
273 if package != match.group('package'):
274 raise Reject('{0}: filename does not match Package field'.format(fn))
275 if version_without_epoch != match.group('version'):
276 raise Reject('{0}: filename does not match Version field'.format(fn))
277 if architecture != match.group('architecture'):
278 raise Reject('{0}: filename does not match Architecture field'.format(fn))
280 # check dependency field syntax
282 for field in ('Breaks', 'Conflicts', 'Depends', 'Enhances', 'Pre-Depends',
283 'Provides', 'Recommends', 'Replaces', 'Suggests'):
284 value = control.get(field)
285 if value is not None:
286 if value.strip() == '':
287 raise Reject('{0}: empty {1} field'.format(fn, field))
289 apt_pkg.parse_depends(value)
291 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
293 for field in ('Built-Using',):
294 value = control.get(field)
295 if value is not None:
296 if value.strip() == '':
297 raise Reject('{0}: empty {1} field'.format(fn, field))
299 apt_pkg.parse_src_depends(value)
301 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
303 class BinaryTimestampCheck(Check):
304 """check timestamps of files in binary packages
306 Files in the near future cause ugly warnings and extreme time travel
307 can cause errors on extraction.
309 def check(self, upload):
311 future_cutoff = time.time() + cnf.find_i('Dinstall::FutureTimeTravelGrace', 24*3600)
312 past_cutoff = time.mktime(time.strptime(cnf.find('Dinstall::PastCutoffYear', '1984'), '%Y'))
314 class TarTime(object):
316 self.future_files = dict()
317 self.past_files = dict()
318 def callback(self, member, data):
319 if member.mtime > future_cutoff:
320 future_files[member.name] = member.mtime
321 elif member.mtime < past_cutoff:
322 past_files[member.name] = member.mtime
324 def format_reason(filename, direction, files):
325 reason = "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format(filename, len(files), direction)
326 for fn, ts in files.iteritems():
327 reason += " {0} ({1})".format(fn, time.ctime(ts))
330 for binary in upload.changes.binaries:
331 filename = binary.hashed_file.filename
332 path = os.path.join(upload.directory, filename)
333 deb = apt_inst.DebFile(path)
335 deb.control.go(tar.callback)
337 raise Reject(format_reason(filename, 'future', tar.future_files))
339 raise Reject(format_reason(filename, 'past', tar.past_files))
341 class SourceCheck(Check):
342 """Check source package for syntax errors."""
343 def check_filename(self, control, filename, regex):
344 # In case we have an .orig.tar.*, we have to strip the Debian revison
345 # from the version number. So handle this special case first.
347 match = re_file_orig.match(filename)
350 match = regex.match(filename)
353 raise Reject('{0}: does not match regular expression for source filenames'.format(filename))
354 if match.group('package') != control['Source']:
355 raise Reject('{0}: filename does not match Source field'.format(filename))
357 version = control['Version']
359 version = re_field_version_upstream.match(version).group('upstream')
360 version_match = re_field_version.match(version)
361 version_without_epoch = version_match.group('without_epoch')
362 if match.group('version') != version_without_epoch:
363 raise Reject('{0}: filename does not match Version field'.format(filename))
365 def check(self, upload):
366 if upload.changes.source is None:
369 changes = upload.changes.changes
370 source = upload.changes.source
372 dsc_fn = source._dsc_file.filename
375 if not re_field_package.match(control['Source']):
376 raise Reject('{0}: Invalid Source field'.format(dsc_fn))
377 if control['Source'] != changes['Source']:
378 raise Reject('{0}: Source field does not match Source field in changes'.format(dsc_fn))
379 if control['Version'] != changes['Version']:
380 raise Reject('{0}: Version field does not match Version field in changes'.format(dsc_fn))
383 self.check_filename(control, dsc_fn, re_file_dsc)
384 for f in source.files.itervalues():
385 self.check_filename(control, f.filename, re_file_source)
387 # check dependency field syntax
388 for field in ('Build-Conflicts', 'Build-Conflicts-Indep', 'Build-Depends', 'Build-Depends-Arch', 'Build-Depends-Indep'):
389 value = control.get(field)
390 if value is not None:
391 if value.strip() == '':
392 raise Reject('{0}: empty {1} field'.format(dsc_fn, field))
394 apt_pkg.parse_src_depends(value)
395 except Exception as e:
396 raise Reject('{0}: APT could not parse {1} field: {2}'.format(dsc_fn, field, e))
398 rejects = utils.check_dsc_files(dsc_fn, control, source.files.keys())
400 raise Reject("\n".join(rejects))
404 class SingleDistributionCheck(Check):
405 """Check that the .changes targets only a single distribution."""
406 def check(self, upload):
407 if len(upload.changes.distributions) != 1:
408 raise Reject("Only uploads to a single distribution are allowed.")
410 class ACLCheck(Check):
411 """Check the uploader is allowed to upload the packages in .changes"""
413 def _does_hijack(self, session, upload, suite):
414 # Try to catch hijacks.
415 # This doesn't work correctly. Uploads to experimental can still
416 # "hijack" binaries from unstable. Also one can hijack packages
417 # via buildds (but people who try this should not be DMs).
418 for binary_name in upload.changes.binary_names:
419 binaries = session.query(DBBinary).join(DBBinary.source) \
420 .filter(DBBinary.suites.contains(suite)) \
421 .filter(DBBinary.package == binary_name)
422 for binary in binaries:
423 if binary.source.source != upload.changes.changes['Source']:
424 return True, binary, binary.source.source
425 return False, None, None
427 def _check_acl(self, session, upload, acl):
428 source_name = upload.changes.source_name
430 if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints:
432 if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring:
435 if not acl.allow_new:
437 return False, "NEW uploads are not allowed"
438 for f in upload.changes.files.itervalues():
439 if f.section == 'byhand' or f.section.startswith("raw-"):
440 return False, "BYHAND uploads are not allowed"
441 if not acl.allow_source and upload.changes.source is not None:
442 return False, "sourceful uploads are not allowed"
443 binaries = upload.changes.binaries
444 if len(binaries) != 0:
445 if not acl.allow_binary:
446 return False, "binary uploads are not allowed"
447 if upload.changes.source is None and not acl.allow_binary_only:
448 return False, "binary-only uploads are not allowed"
449 if not acl.allow_binary_all:
450 uploaded_arches = set(upload.changes.architectures)
451 uploaded_arches.discard('source')
452 allowed_arches = set(a.arch_string for a in acl.architectures)
453 forbidden_arches = uploaded_arches - allowed_arches
454 if len(forbidden_arches) != 0:
455 return False, "uploads for architecture(s) {0} are not allowed".format(", ".join(forbidden_arches))
456 if not acl.allow_hijack:
457 for suite in upload.final_suites:
458 does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite)
460 return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from)
462 acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first()
463 if acl.allow_per_source:
464 # XXX: Drop DMUA part here and switch to new implementation.
465 # XXX: Send warning mail once users can set the new DMUA flag
466 dmua_status, dmua_reason = self._check_dmua(upload)
468 return False, dmua_reason
469 #if acl_per_source is None:
470 # return False, "not allowed to upload source package '{0}'".format(source_name)
471 if acl.deny_per_source and acl_per_source is not None:
472 return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name)
476 def _check_dmua(self, upload):
477 # This code is not very nice, but hopefully works until we can replace
478 # DM-Upload-Allowed, cf. https://lists.debian.org/debian-project/2012/06/msg00029.html
479 session = upload.session
481 # Check DM-Upload-Allowed
482 suites = upload.final_suites
483 assert len(suites) == 1
484 suite = list(suites)[0]
486 last_suites = ['unstable', 'experimental']
487 if suite.suite_name.endswith('-backports'):
488 last_suites = [suite.suite_name]
489 last = session.query(DBSource).filter_by(source=upload.changes.changes['Source']) \
490 .join(DBSource.suites).filter(Suite.suite_name.in_(last_suites)) \
491 .order_by(DBSource.version.desc()).limit(1).first()
493 return False, 'No existing source found in {0}'.format(' or '.join(last_suites))
494 if not last.dm_upload_allowed:
495 return False, 'DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version)
497 # check current Changed-by is in last Maintainer or Uploaders
498 uploader_names = [ u.name for u in last.uploaders ]
499 changed_by_field = upload.changes.changes.get('Changed-By', upload.changes.changes['Maintainer'])
500 if changed_by_field not in uploader_names:
501 return False, '{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version)
503 # check Changed-by is the DM
504 changed_by = fix_maintainer(changed_by_field)
505 uid = upload.fingerprint.uid
507 return False, 'Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint)
508 if uid.uid != changed_by[3] and uid.name != changed_by[2]:
509 return False, 'DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field)
513 def check(self, upload):
514 session = upload.session
515 fingerprint = upload.fingerprint
516 keyring = fingerprint.keyring
519 raise Reject('No keyring for fingerprint {0}'.format(fingerprint.fingerprint))
520 if not keyring.active:
521 raise Reject('Keyring {0} is not active'.format(keyring.name))
523 acl = fingerprint.acl or keyring.acl
525 raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint))
526 result, reason = self._check_acl(session, upload, acl)
530 for acl in session.query(ACL).filter_by(is_global=True):
531 result, reason = self._check_acl(session, upload, acl)
537 def per_suite_check(self, upload, suite):
542 result, reason = self._check_acl(upload.session, upload, acl)
545 accept = accept or result
547 raise Reject('Not accepted by any per-suite acl (suite={0})'.format(suite.suite_name))
550 class TransitionCheck(Check):
551 """check for a transition"""
552 def check(self, upload):
553 if 'source' not in upload.changes.architectures:
556 transitions = self.get_transitions()
557 if transitions is None:
560 control = upload.changes.changes
561 source = re_field_source.match(control['Source']).group('package')
563 for trans in transitions:
564 t = transitions[trans]
568 # Will be None if nothing is in testing.
569 current = get_source_in_suite(source, "testing", session)
570 if current is not None:
571 compare = apt_pkg.version_compare(current.version, expected)
573 if current is None or compare < 0:
574 # This is still valid, the current version in testing is older than
575 # the new version we wait for, or there is none in testing yet
577 # Check if the source we look at is affected by this.
578 if source in t['packages']:
579 # The source is affected, lets reject it.
581 rejectmsg = "{0}: part of the {1} transition.\n\n".format(source, trans)
583 if current is not None:
584 currentlymsg = "at version {0}".format(current.version)
586 currentlymsg = "not present in testing"
588 rejectmsg += "Transition description: {0}\n\n".format(t["reason"])
590 rejectmsg += "\n".join(textwrap.wrap("""Your package
591 is part of a testing transition designed to get {0} migrated (it is
592 currently {1}, we need version {2}). This transition is managed by the
593 Release Team, and {3} is the Release-Team member responsible for it.
594 Please mail debian-release@lists.debian.org or contact {3} directly if you
595 need further assistance. You might want to upload to experimental until this
596 transition is done.""".format(source, currentlymsg, expected,t["rm"])))
598 raise Reject(rejectmsg)
602 def get_transitions(self):
604 path = cnf.get('Dinstall::ReleaseTransitions', '')
605 if path == '' or not os.path.exists(path):
608 contents = file(path, 'r').read()
610 transitions = yaml.load(contents)
612 except yaml.YAMLError as msg:
613 utils.warn('Not checking transitions, the transitions file is broken: {0}'.format(msg))
617 class NoSourceOnlyCheck(Check):
618 """Check for source-only upload
620 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is
621 set. Otherwise they are rejected.
623 def check(self, upload):
624 if Config().find_b("Dinstall::AllowSourceOnlyUploads"):
626 changes = upload.changes
627 if changes.source is not None and len(changes.binaries) == 0:
628 raise Reject('Source-only uploads are not allowed.')
631 class LintianCheck(Check):
632 """Check package using lintian"""
633 def check(self, upload):
634 changes = upload.changes
636 # Only check sourceful uploads.
637 if changes.source is None:
639 # Only check uploads to unstable or experimental.
640 if 'unstable' not in changes.distributions and 'experimental' not in changes.distributions:
644 if 'Dinstall::LintianTags' not in cnf:
646 tagfile = cnf['Dinstall::LintianTags']
648 with open(tagfile, 'r') as sourcefile:
649 sourcecontent = sourcefile.read()
651 lintiantags = yaml.load(sourcecontent)['lintian']
652 except yaml.YAMLError as msg:
653 raise Exception('Could not read lintian tags file {0}, YAML error: {1}'.format(tagfile, msg))
655 fd, temp_filename = utils.temp_filename(mode=0o644)
656 temptagfile = os.fdopen(fd, 'w')
657 for tags in lintiantags.itervalues():
659 print >>temptagfile, tag
662 changespath = os.path.join(upload.directory, changes.filename)
665 cmd = "sudo -H -u {0} -- /usr/bin/lintian --show-overrides --tags-from-file {1} {2}".format(cnf.unprivgroup, temp_filename, changespath)
667 cmd = "/usr/bin/lintian --show-overrides --tags-from-file {0} {1}".format(temp_filename, changespath)
668 result, output = commands.getstatusoutput(cmd)
670 os.unlink(temp_filename)
673 utils.warn("lintian failed for %s [return code: %s]." % \
674 (changespath, result))
675 utils.warn(utils.prefix_multi_line_string(output, \
676 " [possible output:] "))
678 parsed_tags = lintian.parse_lintian_output(output)
679 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags))
680 if len(rejects) != 0:
681 raise Reject('\n'.join(rejects))
685 class SourceFormatCheck(Check):
686 """Check source format is allowed in the target suite"""
687 def per_suite_check(self, upload, suite):
688 source = upload.changes.source
689 session = upload.session
693 source_format = source.dsc['Format']
694 query = session.query(SrcFormat).filter_by(format_name=source_format).filter(SrcFormat.suites.contains(suite))
695 if query.first() is None:
696 raise Reject('source format {0} is not allowed in suite {1}'.format(source_format, suite.suite_name))
698 class SuiteArchitectureCheck(Check):
699 def per_suite_check(self, upload, suite):
700 session = upload.session
701 for arch in upload.changes.architectures:
702 query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite))
703 if query.first() is None:
704 raise Reject('Architecture {0} is not allowed in suite {2}'.format(arch, suite.suite_name))
708 class VersionCheck(Check):
709 """Check version constraints"""
710 def _highest_source_version(self, session, source_name, suite):
711 db_source = session.query(DBSource).filter_by(source=source_name) \
712 .filter(DBSource.suites.contains(suite)).order_by(DBSource.version.desc()).first()
713 if db_source is None:
716 return db_source.version
718 def _highest_binary_version(self, session, binary_name, suite, architecture):
719 db_binary = session.query(DBBinary).filter_by(package=binary_name) \
720 .filter(DBBinary.suites.contains(suite)) \
721 .join(DBBinary.architecture) \
722 .filter(Architecture.arch_string.in_(['all', architecture])) \
723 .order_by(DBBinary.version.desc()).first()
724 if db_binary is None:
727 return db_binary.version
729 def _version_checks(self, upload, suite, op):
730 session = upload.session
732 if upload.changes.source is not None:
733 source_name = upload.changes.source.dsc['Source']
734 source_version = upload.changes.source.dsc['Version']
735 v = self._highest_source_version(session, source_name, suite)
736 if v is not None and not op(version_compare(source_version, v)):
737 raise Reject('Version check failed (source={0}, version={1}, other-version={2}, suite={3})'.format(source_name, source_version, v, suite.suite_name))
739 for binary in upload.changes.binaries:
740 binary_name = binary.control['Package']
741 binary_version = binary.control['Version']
742 architecture = binary.control['Architecture']
743 v = self._highest_binary_version(session, binary_name, suite, architecture)
744 if v is not None and not op(version_compare(binary_version, v)):
745 raise Reject('Version check failed (binary={0}, version={1}, other-version={2}, suite={3})'.format(binary_name, binary_version, v, suite.suite_name))
747 def per_suite_check(self, upload, suite):
748 session = upload.session
750 vc_newer = session.query(dbconn.VersionCheck).filter_by(suite=suite) \
751 .filter(dbconn.VersionCheck.check.in_(['MustBeNewerThan', 'Enhances']))
752 must_be_newer_than = [ vc.reference for vc in vc_newer ]
753 # Must be newer than old versions in `suite`
754 must_be_newer_than.append(suite)
756 for s in must_be_newer_than:
757 self._version_checks(upload, s, lambda result: result > 0)
759 vc_older = session.query(dbconn.VersionCheck).filter_by(suite=suite, check='MustBeOlderThan')
760 must_be_older_than = [ vc.reference for vc in vc_older ]
762 for s in must_be_older_than:
763 self._version_checks(upload, s, lambda result: result < 0)