1 # Copyright (C) 2012, Ansgar Burchardt <ansgar@debian.org>
3 # Parts based on code that is
4 # Copyright (C) 2001-2006, James Troup <james@nocrew.org>
5 # Copyright (C) 2009-2010, Joerg Jaspert <joerg@debian.org>
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; if not, write to the Free Software Foundation, Inc.,
19 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 """module provided pre-acceptance tests
23 Please read the documentation for the L{Check} class for the interface.
26 from daklib.config import Config
27 from daklib.dbconn import *
28 import daklib.dbconn as dbconn
29 from daklib.regexes import *
30 from daklib.textutils import fix_maintainer, ParseMaintError
31 import daklib.lintian as lintian
32 import daklib.utils as utils
36 from apt_pkg import version_compare
42 # TODO: replace by subprocess
45 class Reject(Exception):
46 """exception raised by failing checks"""
49 class RejectStupidMaintainerException(Exception):
50 """exception raised by failing the external hashes check"""
53 return "'%s' has mismatching %s from the external files db ('%s' [current] vs '%s' [external])" % self.args[:4]
56 """base class for checks
58 checks are called by L{daklib.archive.ArchiveUpload}. Failing tests should
59 raise a L{daklib.checks.Reject} exception including a human-readable
60 description why the upload should be rejected.
62 def check(self, upload):
65 @type upload: L{daklib.archive.ArchiveUpload}
66 @param upload: upload to check
68 @raise daklib.checks.Reject: upload should be rejected
71 def per_suite_check(self, upload, suite):
72 """do per-suite checks
74 @type upload: L{daklib.archive.ArchiveUpload}
75 @param upload: upload to check
77 @type suite: L{daklib.dbconn.Suite}
78 @param suite: suite to check
80 @raise daklib.checks.Reject: upload should be rejected
85 """allow to force ignore failing test
87 C{True} if it is acceptable to force ignoring a failing test,
92 class SignatureCheck(Check):
93 """Check signature of changes and dsc file (if included in upload)
95 Make sure the signature is valid and done by a known user.
97 def check(self, upload):
98 changes = upload.changes
99 if not changes.valid_signature:
100 raise Reject("Signature for .changes not valid.")
101 if changes.source is not None:
102 if not changes.source.valid_signature:
103 raise Reject("Signature for .dsc not valid.")
104 if changes.source.primary_fingerprint != changes.primary_fingerprint:
105 raise Reject(".changes and .dsc not signed by the same key.")
106 if upload.fingerprint is None or upload.fingerprint.uid is None:
107 raise Reject(".changes signed by unknown key.")
109 class ChangesCheck(Check):
110 """Check changes file for syntax errors."""
111 def check(self, upload):
112 changes = upload.changes
113 control = changes.changes
114 fn = changes.filename
116 for field in ('Distribution', 'Source', 'Binary', 'Architecture', 'Version', 'Maintainer', 'Files', 'Changes', 'Description'):
117 if field not in control:
118 raise Reject('{0}: misses mandatory field {1}'.format(fn, field))
120 source_match = re_field_source.match(control['Source'])
122 raise Reject('{0}: Invalid Source field'.format(fn))
123 version_match = re_field_version.match(control['Version'])
124 if not version_match:
125 raise Reject('{0}: Invalid Version field'.format(fn))
126 version_without_epoch = version_match.group('without_epoch')
128 match = re_file_changes.match(fn)
130 raise Reject('{0}: Does not match re_file_changes'.format(fn))
131 if match.group('package') != source_match.group('package'):
132 raise Reject('{0}: Filename does not match Source field'.format(fn))
133 if match.group('version') != version_without_epoch:
134 raise Reject('{0}: Filename does not match Version field'.format(fn))
136 for bn in changes.binary_names:
137 if not re_field_package.match(bn):
138 raise Reject('{0}: Invalid binary package name {1}'.format(fn, bn))
140 if 'source' in changes.architectures and changes.source is None:
141 raise Reject("Changes has architecture source, but no source found.")
142 if changes.source is not None and 'source' not in changes.architectures:
143 raise Reject("Upload includes source, but changes does not say so.")
146 fix_maintainer(changes.changes['Maintainer'])
147 except ParseMaintError as e:
148 raise Reject('{0}: Failed to parse Maintainer field: {1}'.format(changes.filename, e))
151 changed_by = changes.changes.get('Changed-By')
152 if changed_by is not None:
153 fix_maintainer(changed_by)
154 except ParseMaintError as e:
155 raise Reject('{0}: Failed to parse Changed-By field: {1}'.format(changes.filename, e))
157 if len(changes.files) == 0:
158 raise Reject("Changes includes no files.")
160 for bugnum in changes.closed_bugs:
161 if not re_isanum.match(bugnum):
162 raise Reject('{0}: "{1}" in Closes field is not a number'.format(changes.filename, bugnum))
166 class HashesCheck(Check):
167 """Check hashes in .changes and .dsc are valid."""
168 def check(self, upload):
171 changes = upload.changes
172 what = changes.filename
173 for f in changes.files.itervalues():
174 f.check(upload.directory)
175 source = changes.source
176 what = source.filename
177 if source is not None:
178 for f in source.files.itervalues():
179 f.check(upload.directory)
181 if e.errno == errno.ENOENT:
182 raise Reject('{0} refers to non-existing file: {1}\n'
183 'Perhaps you need to include it in your upload?'
184 .format(what, os.path.basename(e.filename)))
187 class ExternalHashesCheck(Check):
188 """Checks hashes in .changes and .dsc against an external database."""
189 def check_single(self, session, f):
190 q = session.execute("SELECT size, md5sum, sha1sum, sha256sum FROM external_files WHERE filename LIKE '%%/%s'" % f.filename)
191 (ext_size, ext_md5sum, ext_sha1sum, ext_sha256sum) = q.fetchone() or (None, None, None, None)
196 if ext_size != f.size:
197 raise RejectStupidMaintainerException(f.filename, 'size', f.size, ext_size)
199 if ext_md5sum != f.md5sum:
200 raise RejectStupidMaintainerException(f.filename, 'md5sum', f.md5sum, ext_md5sum)
202 if ext_sha1sum != f.sha1sum:
203 raise RejectStupidMaintainerException(f.filename, 'sha1sum', f.sha1sum, ext_sha1sum)
205 if ext_sha256sum != f.sha256sum:
206 raise RejectStupidMaintainerException(f.filename, 'sha256sum', f.sha256sum, ext_sha256sum)
208 def check(self, upload):
211 if not cnf.use_extfiles:
214 session = upload.session
215 changes = upload.changes
217 for f in changes.files.itervalues():
218 self.check_single(session, f)
219 source = changes.source
220 if source is not None:
221 for f in source.files.itervalues():
222 self.check_single(session, f)
224 class BinaryCheck(Check):
225 """Check binary packages for syntax errors."""
226 def check(self, upload):
227 for binary in upload.changes.binaries:
228 self.check_binary(upload, binary)
230 binary_names = set([ binary.control['Package'] for binary in upload.changes.binaries ])
231 for bn in binary_names:
232 if bn not in upload.changes.binary_names:
233 raise Reject('Package {0} is not mentioned in Binary field in changes'.format(bn))
237 def check_binary(self, upload, binary):
238 fn = binary.hashed_file.filename
239 control = binary.control
241 for field in ('Package', 'Architecture', 'Version', 'Description'):
242 if field not in control:
243 raise Reject('{0}: Missing mandatory field {0}.'.format(fn, field))
247 package = control['Package']
248 if not re_field_package.match(package):
249 raise Reject('{0}: Invalid Package field'.format(fn))
251 version = control['Version']
252 version_match = re_field_version.match(version)
253 if not version_match:
254 raise Reject('{0}: Invalid Version field'.format(fn))
255 version_without_epoch = version_match.group('without_epoch')
257 architecture = control['Architecture']
258 if architecture not in upload.changes.architectures:
259 raise Reject('{0}: Architecture not in Architecture field in changes file'.format(fn))
260 if architecture == 'source':
261 raise Reject('{0}: Architecture "source" invalid for binary packages'.format(fn))
263 source = control.get('Source')
264 if source is not None and not re_field_source.match(source):
265 raise Reject('{0}: Invalid Source field'.format(fn))
269 match = re_file_binary.match(fn)
270 if package != match.group('package'):
271 raise Reject('{0}: filename does not match Package field'.format(fn))
272 if version_without_epoch != match.group('version'):
273 raise Reject('{0}: filename does not match Version field'.format(fn))
274 if architecture != match.group('architecture'):
275 raise Reject('{0}: filename does not match Architecture field'.format(fn))
277 # check dependency field syntax
279 for field in ('Breaks', 'Conflicts', 'Depends', 'Enhances', 'Pre-Depends',
280 'Provides', 'Recommends', 'Replaces', 'Suggests'):
281 value = control.get(field)
282 if value is not None:
283 if value.strip() == '':
284 raise Reject('{0}: empty {1} field'.format(fn, field))
286 apt_pkg.parse_depends(value)
288 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
290 for field in ('Built-Using',):
291 value = control.get(field)
292 if value is not None:
293 if value.strip() == '':
294 raise Reject('{0}: empty {1} field'.format(fn, field))
296 apt_pkg.parse_src_depends(value)
298 raise Reject('{0}: APT could not parse {1} field'.format(fn, field))
300 class BinaryTimestampCheck(Check):
301 """check timestamps of files in binary packages
303 Files in the near future cause ugly warnings and extreme time travel
304 can cause errors on extraction.
306 def check(self, upload):
308 future_cutoff = time.time() + cnf.find_i('Dinstall::FutureTimeTravelGrace', 24*3600)
309 past_cutoff = time.mktime(time.strptime(cnf.find('Dinstall::PastCutoffYear', '1984'), '%Y'))
311 class TarTime(object):
313 self.future_files = dict()
314 self.past_files = dict()
315 def callback(self, member, data):
316 if member.mtime > future_cutoff:
317 future_files[member.name] = member.mtime
318 elif member.mtime < past_cutoff:
319 past_files[member.name] = member.mtime
321 def format_reason(filename, direction, files):
322 reason = "{0}: has {1} file(s) with a timestamp too far in the {2}:\n".format(filename, len(files), direction)
323 for fn, ts in files.iteritems():
324 reason += " {0} ({1})".format(fn, time.ctime(ts))
327 for binary in upload.changes.binaries:
328 filename = binary.hashed_file.filename
329 path = os.path.join(upload.directory, filename)
330 deb = apt_inst.DebFile(path)
332 deb.control.go(tar.callback)
334 raise Reject(format_reason(filename, 'future', tar.future_files))
336 raise Reject(format_reason(filename, 'past', tar.past_files))
338 class SourceCheck(Check):
339 """Check source package for syntax errors."""
340 def check_filename(self, control, filename, regex):
341 # In case we have an .orig.tar.*, we have to strip the Debian revison
342 # from the version number. So handle this special case first.
344 match = re_file_orig.match(filename)
347 match = regex.match(filename)
350 raise Reject('{0}: does not match regular expression for source filenames'.format(filename))
351 if match.group('package') != control['Source']:
352 raise Reject('{0}: filename does not match Source field'.format(filename))
354 version = control['Version']
356 version = re_field_version_upstream.match(version).group('upstream')
357 version_match = re_field_version.match(version)
358 version_without_epoch = version_match.group('without_epoch')
359 if match.group('version') != version_without_epoch:
360 raise Reject('{0}: filename does not match Version field'.format(filename))
362 def check(self, upload):
363 if upload.changes.source is None:
366 changes = upload.changes.changes
367 source = upload.changes.source
369 dsc_fn = source._dsc_file.filename
372 if not re_field_package.match(control['Source']):
373 raise Reject('{0}: Invalid Source field'.format(dsc_fn))
374 if control['Source'] != changes['Source']:
375 raise Reject('{0}: Source field does not match Source field in changes'.format(dsc_fn))
376 if control['Version'] != changes['Version']:
377 raise Reject('{0}: Version field does not match Version field in changes'.format(dsc_fn))
380 self.check_filename(control, dsc_fn, re_file_dsc)
381 for f in source.files.itervalues():
382 self.check_filename(control, f.filename, re_file_source)
384 # check dependency field syntax
385 for field in ('Build-Conflicts', 'Build-Conflicts-Indep', 'Build-Depends', 'Build-Depends-Arch', 'Build-Depends-Indep'):
386 value = control.get(field)
387 if value is not None:
388 if value.strip() == '':
389 raise Reject('{0}: empty {1} field'.format(dsc_fn, field))
391 apt_pkg.parse_src_depends(value)
392 except Exception as e:
393 raise Reject('{0}: APT could not parse {1} field: {2}'.format(dsc_fn, field, e))
395 rejects = utils.check_dsc_files(dsc_fn, control, source.files.keys())
397 raise Reject("\n".join(rejects))
401 class SingleDistributionCheck(Check):
402 """Check that the .changes targets only a single distribution."""
403 def check(self, upload):
404 if len(upload.changes.distributions) != 1:
405 raise Reject("Only uploads to a single distribution are allowed.")
407 class ACLCheck(Check):
408 """Check the uploader is allowed to upload the packages in .changes"""
410 def _does_hijack(self, session, upload, suite):
411 # Try to catch hijacks.
412 # This doesn't work correctly. Uploads to experimental can still
413 # "hijack" binaries from unstable. Also one can hijack packages
414 # via buildds (but people who try this should not be DMs).
415 for binary_name in upload.changes.binary_names:
416 binaries = session.query(DBBinary).join(DBBinary.source) \
417 .filter(DBBinary.suites.contains(suite)) \
418 .filter(DBBinary.package == binary_name)
419 for binary in binaries:
420 if binary.source.source != upload.changes.changes['Source']:
421 return True, binary, binary.source.source
422 return False, None, None
424 def _check_acl(self, session, upload, acl):
425 source_name = upload.changes.source_name
427 if acl.match_fingerprint and upload.fingerprint not in acl.fingerprints:
429 if acl.match_keyring is not None and upload.fingerprint.keyring != acl.match_keyring:
432 if not acl.allow_new:
434 return False, "NEW uploads are not allowed"
435 for f in upload.changes.files.itervalues():
436 if f.section == 'byhand' or f.section.startswith("raw-"):
437 return False, "BYHAND uploads are not allowed"
438 if not acl.allow_source and upload.changes.source is not None:
439 return False, "sourceful uploads are not allowed"
440 binaries = upload.changes.binaries
441 if len(binaries) != 0:
442 if not acl.allow_binary:
443 return False, "binary uploads are not allowed"
444 if upload.changes.source is None and not acl.allow_binary_only:
445 return False, "binary-only uploads are not allowed"
446 if not acl.allow_binary_all:
447 uploaded_arches = set(upload.changes.architectures)
448 uploaded_arches.discard('source')
449 allowed_arches = set(a.arch_string for a in acl.architectures)
450 forbidden_arches = uploaded_arches - allowed_arches
451 if len(forbidden_arches) != 0:
452 return False, "uploads for architecture(s) {0} are not allowed".format(", ".join(forbidden_arches))
453 if not acl.allow_hijack:
454 for suite in upload.final_suites:
455 does_hijack, hijacked_binary, hijacked_from = self._does_hijack(session, upload, suite)
457 return False, "hijacks are not allowed (binary={0}, other-source={1})".format(hijacked_binary, hijacked_from)
459 acl_per_source = session.query(ACLPerSource).filter_by(acl=acl, fingerprint=upload.fingerprint, source=source_name).first()
460 if acl.allow_per_source:
461 # XXX: Drop DMUA part here and switch to new implementation.
462 # XXX: Send warning mail once users can set the new DMUA flag
463 dmua_status, dmua_reason = self._check_dmua(upload)
465 return False, dmua_reason
466 #if acl_per_source is None:
467 # return False, "not allowed to upload source package '{0}'".format(source_name)
468 if acl.deny_per_source and acl_per_source is not None:
469 return False, acl_per_source.reason or "forbidden to upload source package '{0}'".format(source_name)
473 def _check_dmua(self, upload):
474 # This code is not very nice, but hopefully works until we can replace
475 # DM-Upload-Allowed, cf. https://lists.debian.org/debian-project/2012/06/msg00029.html
476 session = upload.session
478 # Check DM-Upload-Allowed
479 suites = upload.final_suites
480 assert len(suites) == 1
481 suite = list(suites)[0]
483 last_suites = ['unstable', 'experimental']
484 if suite.suite_name.endswith('-backports'):
485 last_suites = [suite.suite_name]
486 last = session.query(DBSource).filter_by(source=upload.changes.changes['Source']) \
487 .join(DBSource.suites).filter(Suite.suite_name.in_(last_suites)) \
488 .order_by(DBSource.version.desc()).limit(1).first()
490 return False, 'No existing source found in {0}'.format(' or '.join(last_suites))
491 if not last.dm_upload_allowed:
492 return False, 'DM-Upload-Allowed is not set in {0}={1}'.format(last.source, last.version)
494 # check current Changed-by is in last Maintainer or Uploaders
495 uploader_names = [ u.name for u in last.uploaders ]
496 changed_by_field = upload.changes.changes.get('Changed-By', upload.changes.changes['Maintainer'])
497 if changed_by_field not in uploader_names:
498 return False, '{0} is not an uploader for {1}={2}'.format(changed_by_field, last.source, last.version)
500 # check Changed-by is the DM
501 changed_by = fix_maintainer(changed_by_field)
502 uid = upload.fingerprint.uid
504 return False, 'Unknown uid for fingerprint {0}'.format(upload.fingerprint.fingerprint)
505 if uid.uid != changed_by[3] and uid.name != changed_by[2]:
506 return False, 'DMs are not allowed to sponsor uploads (expected {0} <{1}> as maintainer, but got {2})'.format(uid.name, uid.uid, changed_by_field)
510 def check(self, upload):
511 session = upload.session
512 fingerprint = upload.fingerprint
513 keyring = fingerprint.keyring
516 raise Reject('No keyring for fingerprint {0}'.format(fingerprint.fingerprint))
517 if not keyring.active:
518 raise Reject('Keyring {0} is not active'.format(keyring.name))
520 acl = fingerprint.acl or keyring.acl
522 raise Reject('No ACL for fingerprint {0}'.format(fingerprint.fingerprint))
523 result, reason = self._check_acl(session, upload, acl)
527 for acl in session.query(ACL).filter_by(is_global=True):
528 result, reason = self._check_acl(session, upload, acl)
534 def per_suite_check(self, upload, suite):
539 result, reason = self._check_acl(upload.session, upload, acl)
542 accept = accept or result
544 raise Reject('Not accepted by any per-suite acl (suite={0})'.format(suite.suite_name))
547 class TransitionCheck(Check):
548 """check for a transition"""
549 def check(self, upload):
550 if 'source' not in upload.changes.architectures:
553 transitions = self.get_transitions()
554 if transitions is None:
557 control = upload.changes.changes
558 source = re_field_source.match(control['Source']).group('package')
560 for trans in transitions:
561 t = transitions[trans]
565 # Will be None if nothing is in testing.
566 current = get_source_in_suite(source, "testing", session)
567 if current is not None:
568 compare = apt_pkg.version_compare(current.version, expected)
570 if current is None or compare < 0:
571 # This is still valid, the current version in testing is older than
572 # the new version we wait for, or there is none in testing yet
574 # Check if the source we look at is affected by this.
575 if source in t['packages']:
576 # The source is affected, lets reject it.
578 rejectmsg = "{0}: part of the {1} transition.\n\n".format(source, trans)
580 if current is not None:
581 currentlymsg = "at version {0}".format(current.version)
583 currentlymsg = "not present in testing"
585 rejectmsg += "Transition description: {0}\n\n".format(t["reason"])
587 rejectmsg += "\n".join(textwrap.wrap("""Your package
588 is part of a testing transition designed to get {0} migrated (it is
589 currently {1}, we need version {2}). This transition is managed by the
590 Release Team, and {3} is the Release-Team member responsible for it.
591 Please mail debian-release@lists.debian.org or contact {3} directly if you
592 need further assistance. You might want to upload to experimental until this
593 transition is done.""".format(source, currentlymsg, expected,t["rm"])))
595 raise Reject(rejectmsg)
599 def get_transitions(self):
601 path = cnf.get('Dinstall::ReleaseTransitions', '')
602 if path == '' or not os.path.exists(path):
605 contents = file(path, 'r').read()
607 transitions = yaml.load(contents)
609 except yaml.YAMLError as msg:
610 utils.warn('Not checking transitions, the transitions file is broken: {0}'.format(msg))
614 class NoSourceOnlyCheck(Check):
615 """Check for source-only upload
617 Source-only uploads are only allowed if Dinstall::AllowSourceOnlyUploads is
618 set. Otherwise they are rejected.
620 def check(self, upload):
621 if Config().find_b("Dinstall::AllowSourceOnlyUploads"):
623 changes = upload.changes
624 if changes.source is not None and len(changes.binaries) == 0:
625 raise Reject('Source-only uploads are not allowed.')
628 class LintianCheck(Check):
629 """Check package using lintian"""
630 def check(self, upload):
631 changes = upload.changes
633 # Only check sourceful uploads.
634 if changes.source is None:
636 # Only check uploads to unstable or experimental.
637 if 'unstable' not in changes.distributions and 'experimental' not in changes.distributions:
641 if 'Dinstall::LintianTags' not in cnf:
643 tagfile = cnf['Dinstall::LintianTags']
645 with open(tagfile, 'r') as sourcefile:
646 sourcecontent = sourcefile.read()
648 lintiantags = yaml.load(sourcecontent)['lintian']
649 except yaml.YAMLError as msg:
650 raise Exception('Could not read lintian tags file {0}, YAML error: {1}'.format(tagfile, msg))
652 fd, temp_filename = utils.temp_filename(mode=0o644)
653 temptagfile = os.fdopen(fd, 'w')
654 for tags in lintiantags.itervalues():
656 print >>temptagfile, tag
659 changespath = os.path.join(upload.directory, changes.filename)
662 cmd = "sudo -H -u {0} -- /usr/bin/lintian --show-overrides --tags-from-file {1} {2}".format(cnf.unprivgroup, temp_filename, changespath)
664 cmd = "/usr/bin/lintian --show-overrides --tags-from-file {0} {1}".format(temp_filename, changespath)
665 result, output = commands.getstatusoutput(cmd)
667 os.unlink(temp_filename)
670 utils.warn("lintian failed for %s [return code: %s]." % \
671 (changespath, result))
672 utils.warn(utils.prefix_multi_line_string(output, \
673 " [possible output:] "))
675 parsed_tags = lintian.parse_lintian_output(output)
676 rejects = list(lintian.generate_reject_messages(parsed_tags, lintiantags))
677 if len(rejects) != 0:
678 raise Reject('\n'.join(rejects))
682 class SourceFormatCheck(Check):
683 """Check source format is allowed in the target suite"""
684 def per_suite_check(self, upload, suite):
685 source = upload.changes.source
686 session = upload.session
690 source_format = source.dsc['Format']
691 query = session.query(SrcFormat).filter_by(format_name=source_format).filter(SrcFormat.suites.contains(suite))
692 if query.first() is None:
693 raise Reject('source format {0} is not allowed in suite {1}'.format(source_format, suite.suite_name))
695 class SuiteArchitectureCheck(Check):
696 def per_suite_check(self, upload, suite):
697 session = upload.session
698 for arch in upload.changes.architectures:
699 query = session.query(Architecture).filter_by(arch_string=arch).filter(Architecture.suites.contains(suite))
700 if query.first() is None:
701 raise Reject('Architecture {0} is not allowed in suite {2}'.format(arch, suite.suite_name))
705 class VersionCheck(Check):
706 """Check version constraints"""
707 def _highest_source_version(self, session, source_name, suite):
708 db_source = session.query(DBSource).filter_by(source=source_name) \
709 .filter(DBSource.suites.contains(suite)).order_by(DBSource.version.desc()).first()
710 if db_source is None:
713 return db_source.version
715 def _highest_binary_version(self, session, binary_name, suite, architecture):
716 db_binary = session.query(DBBinary).filter_by(package=binary_name) \
717 .filter(DBBinary.suites.contains(suite)) \
718 .join(DBBinary.architecture) \
719 .filter(Architecture.arch_string.in_(['all', architecture])) \
720 .order_by(DBBinary.version.desc()).first()
721 if db_binary is None:
724 return db_binary.version
726 def _version_checks(self, upload, suite, op):
727 session = upload.session
729 if upload.changes.source is not None:
730 source_name = upload.changes.source.dsc['Source']
731 source_version = upload.changes.source.dsc['Version']
732 v = self._highest_source_version(session, source_name, suite)
733 if v is not None and not op(version_compare(source_version, v)):
734 raise Reject('Version check failed (source={0}, version={1}, other-version={2}, suite={3})'.format(source_name, source_version, v, suite.suite_name))
736 for binary in upload.changes.binaries:
737 binary_name = binary.control['Package']
738 binary_version = binary.control['Version']
739 architecture = binary.control['Architecture']
740 v = self._highest_binary_version(session, binary_name, suite, architecture)
741 if v is not None and not op(version_compare(binary_version, v)):
742 raise Reject('Version check failed (binary={0}, version={1}, other-version={2}, suite={3})'.format(binary_name, binary_version, v, suite.suite_name))
744 def per_suite_check(self, upload, suite):
745 session = upload.session
747 vc_newer = session.query(dbconn.VersionCheck).filter_by(suite=suite) \
748 .filter(dbconn.VersionCheck.check.in_(['MustBeNewerThan', 'Enhances']))
749 must_be_newer_than = [ vc.reference for vc in vc_newer ]
750 # Must be newer than old versions in `suite`
751 must_be_newer_than.append(suite)
753 for s in must_be_newer_than:
754 self._version_checks(upload, s, lambda result: result > 0)
756 vc_older = session.query(dbconn.VersionCheck).filter_by(suite=suite, check='MustBeOlderThan')
757 must_be_older_than = [ vc.reference for vc in vc_older ]
759 for s in must_be_older_than:
760 self._version_checks(upload, s, lambda result: result < 0)