]> git.donarmstrong.com Git - dak.git/commitdiff
Merge branch 'master' into dbtests
authorTorsten Werner <twerner@debian.org>
Sat, 22 Jan 2011 13:03:33 +0000 (14:03 +0100)
committerTorsten Werner <twerner@debian.org>
Sat, 22 Jan 2011 13:03:33 +0000 (14:03 +0100)
daklib/dbconn.py
tests/dbtest_packages.py

index 6d8d3bd69b230a1a61ceec456cdfbb18a6311d31..9c1ed2b39b677d0f1d85f21d19c89a1d57b95c96 100755 (executable)
@@ -2168,7 +2168,7 @@ def get_suites_source_in(source, session=None):
     @return: list of Suite objects for the given source
     """
 
-    return session.query(Suite).join(SrcAssociation).join(DBSource).filter_by(source=source).all()
+    return session.query(Suite).filter(Suite.sources.any(source=source)).all()
 
 __all__.append('get_suites_source_in')
 
@@ -2207,10 +2207,12 @@ def get_sources_from_name(source, version=None, dm_upload_allowed=None, session=
 
 __all__.append('get_sources_from_name')
 
+# FIXME: This function fails badly if it finds more than 1 source package and
+# its implementation is trivial enough to be inlined.
 @session_wrapper
 def get_source_in_suite(source, suite, session=None):
     """
-    Returns list of DBSource objects for a combination of C{source} and C{suite}.
+    Returns a DBSource object for a combination of C{source} and C{suite}.
 
       - B{source} - source package name, eg. I{mailfilter}, I{bbdb}, I{glibc}
       - B{suite} - a suite name, eg. I{unstable}
@@ -2226,12 +2228,9 @@ def get_source_in_suite(source, suite, session=None):
 
     """
 
-    q = session.query(SrcAssociation)
-    q = q.join('source').filter_by(source=source)
-    q = q.join('suite').filter_by(suite_name=suite)
-
+    q = get_suite(suite, session).get_sources(source)
     try:
-        return q.one().source
+        return q.one()
     except NoResultFound:
         return None
 
@@ -2267,15 +2266,10 @@ def add_dsc_to_db(u, filename, session=None):
 
     source.poolfile_id = entry["files id"]
     session.add(source)
-    session.flush()
-
-    for suite_name in u.pkg.changes["distribution"].keys():
-        sa = SrcAssociation()
-        sa.source_id = source.source_id
-        sa.suite_id = get_suite(suite_name).suite_id
-        session.add(sa)
 
-    session.flush()
+    suite_names = u.pkg.changes["distribution"].keys()
+    source.suites = session.query(Suite). \
+        filter(Suite.suite_name.in_(suite_names)).all()
 
     # Add the source files to the DB (files and dsc_files)
     dscfile = DSCFile()
@@ -2325,8 +2319,6 @@ def add_dsc_to_db(u, filename, session=None):
         df.poolfile_id = files_id
         session.add(df)
 
-    session.flush()
-
     # Add the src_uploaders to the DB
     uploader_ids = [source.maintainer_id]
     if u.pkg.dsc.has_key("uploaders"):
@@ -2480,6 +2472,8 @@ SUITE_FIELDS = [ ('SuiteName', 'suite_name'),
                  ('CopyChanges', 'copychanges'),
                  ('OverrideSuite', 'overridesuite')]
 
+# Why the heck don't we have any UNIQUE constraints in table suite?
+# TODO: Add UNIQUE constraints for appropriate columns.
 class Suite(object):
     def __init__(self, suite_name = None, version = None):
         self.suite_name = suite_name
@@ -2533,6 +2527,24 @@ class Suite(object):
             q = q.filter(Architecture.arch_string != 'all')
         return q.order_by(Architecture.arch_string).all()
 
+    def get_sources(self, source):
+        """
+        Returns a query object representing DBSource that is part of C{suite}.
+
+          - B{source} - source package name, eg. I{mailfilter}, I{bbdb}, I{glibc}
+
+        @type source: string
+        @param source: source package name
+
+        @rtype: sqlalchemy.orm.query.Query
+        @return: a query of DBSource
+
+        """
+
+        session = object_session(self)
+        return session.query(DBSource).filter_by(source = source). \
+            filter(DBSource.suites.contains(self))
+
 __all__.append('Suite')
 
 @session_wrapper
index 8764bc78a4139152d790baf867cf1835711f9107..732cccccba0163a316088d6933877172c81b40dc 100755 (executable)
@@ -4,62 +4,71 @@ from db_test import DBDakTestCase
 
 from daklib.dbconn import Architecture, Suite, get_suite_architectures, \
     get_architecture_suites, Maintainer, DBSource, Location, PoolFile, \
-    check_poolfile, get_poolfile_like_name
+    check_poolfile, get_poolfile_like_name, get_source_in_suite, \
+    get_suites_source_in, add_dsc_to_db
 
+from sqlalchemy.orm.exc import MultipleResultsFound
 import unittest
 
+class Pkg():
+    'fake package class used for testing'
+
+    def __init__(self):
+        self.dsc = {}
+        self.files = {}
+        self.changes = {}
+
+class Upload():
+    'fake Upload class used for testing'
+
+    def __init__(self, pkg):
+        self.pkg = pkg
+
 class PackageTestCase(DBDakTestCase):
     """
     PackageTestCase checks the handling of source and binary packages in dak's
     database.
     """
 
+    def setup_suites(self):
+        "setup a hash of Suite objects in self.suite"
+
+        if 'suite' in self.__dict__:
+            return
+        self.suite = {}
+        for suite_name in ('lenny', 'squeeze', 'sid'):
+            self.suite[suite_name] = Suite(suite_name = suite_name, version = '-')
+        self.session.add_all(self.suite.values())
+
     def setup_architectures(self):
-        "setup a hash of Architecture objects in self.arch"
+        "setup Architecture objects in self.arch and connect to suites"
 
+        if 'arch' in self.__dict__:
+            return
+        self.setup_suites()
         self.arch = {}
         for arch_string in ('source', 'all', 'i386', 'amd64', 'kfreebsd-i386'):
             self.arch[arch_string] = Architecture(arch_string)
+            if arch_string != 'kfreebsd-i386':
+                self.arch[arch_string].suites = self.suite.values()
+            else:
+                self.arch[arch_string].suites = [self.suite['squeeze'], self.suite['sid']]
         # hard code ids for source and all
         self.arch['source'].arch_id = 1
         self.arch['all'].arch_id = 2
         self.session.add_all(self.arch.values())
 
-    def setup_suites(self):
-        "setup a hash of Suite objects in self.suite"
-
-        self.suite = {}
-        for suite_name in ('lenny', 'squeeze', 'sid'):
-            self.suite[suite_name] = Suite(suite_name = suite_name, version = '-')
-        self.session.add_all(self.suite.values())
-
     def setUp(self):
         super(PackageTestCase, self).setUp()
         self.setup_architectures()
         self.setup_suites()
 
-    def connect_suite_architectures(self):
-        """
-        Gonnect all suites and all architectures except for kfreebsd-i386 which
-        should not be in lenny.
-        """
-
-        for arch_string, architecture in self.arch.items():
-            if arch_string != 'kfreebsd-i386':
-                architecture.suites = self.suite.values()
-            else:
-                architecture.suites = [self.suite['squeeze'], self.suite['sid']]
-
     def test_suite_architecture(self):
         # check the id for architectures source and all
         self.assertEqual(1, self.arch['source'].arch_id)
         self.assertEqual(2, self.arch['all'].arch_id)
         # check the many to many relation between Suite and Architecture
-        self.arch['source'].suites.append(self.suite['lenny'])
         self.assertEqual('source', self.suite['lenny'].architectures[0])
-        self.arch['source'].suites = []
-        self.assertEqual([], self.suite['lenny'].architectures)
-        self.connect_suite_architectures()
         self.assertEqual(4, len(self.suite['lenny'].architectures))
         self.assertEqual(3, len(self.arch['i386'].suites))
         # check the function get_suite_architectures()
@@ -88,6 +97,8 @@ class PackageTestCase(DBDakTestCase):
     def setup_locations(self):
         'create some Location objects, TODO: add component'
 
+        if 'loc' in self.__dict__:
+            return
         self.loc = {}
         self.loc['main'] = Location(path = \
             '/srv/ftp-master.debian.org/ftp/pool/')
@@ -96,12 +107,19 @@ class PackageTestCase(DBDakTestCase):
     def setup_poolfiles(self):
         'create some PoolFile objects'
 
+        if 'file' in self.__dict__:
+            return
         self.setup_locations()
         self.file = {}
         self.file['hello'] = PoolFile(filename = 'main/h/hello/hello_2.2-2.dsc', \
             location = self.loc['main'], filesize = 0, md5sum = '')
+        self.file['hello_old'] = PoolFile(filename = 'main/h/hello/hello_2.2-1.dsc', \
+            location = self.loc['main'], filesize = 0, md5sum = '')
         self.file['sl'] = PoolFile(filename = 'main/s/sl/sl_3.03-16.dsc', \
             location = self.loc['main'], filesize = 0, md5sum = '')
+        self.file['python'] = PoolFile( \
+            filename = 'main/p/python2.6/python2.6_2.6.6-8.dsc', \
+            location = self.loc['main'], filesize = 0, md5sum = '')
         self.session.add_all(self.file.values())
 
     def test_poolfiles(self):
@@ -120,9 +138,10 @@ class PackageTestCase(DBDakTestCase):
         self.setup_poolfiles()
         location = self.session.query(Location)[0]
         self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/', location.path)
-        self.assertEqual(2, location.files.count())
+        self.assertEqual(4, location.files.count())
         poolfile = location.files. \
-                filter(PoolFile.filename.like('%/hello/hello%')).one()
+                filter(PoolFile.filename.like('%/hello/hello%')). \
+                order_by(PoolFile.filename)[1]
         self.assertEqual('main/h/hello/hello_2.2-2.dsc', poolfile.filename)
         self.assertEqual(location, poolfile.location)
         # test get()
@@ -134,13 +153,13 @@ class PackageTestCase(DBDakTestCase):
         # TODO: deletion should cascade automatically
         self.session.delete(self.file['sl'])
         self.session.refresh(location)
-        self.assertEqual(1, location.files.count())
+        self.assertEqual(3, location.files.count())
         # please note that we intentionally do not specify 'location' here
         self.file['sl'] = PoolFile(filename = 'main/s/sl/sl_3.03-16.dsc', \
             filesize = 0, md5sum = '')
         location.files.append(self.file['sl'])
         self.session.refresh(location)
-        self.assertEqual(2, location.files.count())
+        self.assertEqual(4, location.files.count())
         # test fullpath
         self.assertEqual('/srv/ftp-master.debian.org/ftp/pool/main/s/sl/sl_3.03-16.dsc', \
             self.file['sl'].fullpath)
@@ -164,6 +183,8 @@ class PackageTestCase(DBDakTestCase):
     def setup_maintainers(self):
         'create some Maintainer objects'
 
+        if 'maintainer' in self.__dict__:
+            return
         self.maintainer = {}
         self.maintainer['maintainer'] = Maintainer(name = 'Mr. Maintainer')
         self.maintainer['uploader'] = Maintainer(name = 'Mrs. Uploader')
@@ -173,12 +194,29 @@ class PackageTestCase(DBDakTestCase):
     def setup_sources(self):
         'create a DBSource object; but it cannot be stored in the DB yet'
 
+        if 'source' in self.__dict__:
+            return
         self.setup_maintainers()
         self.setup_poolfiles()
-        self.source = DBSource(source = 'hello', version = '2.2-2', \
+        self.setup_suites()
+        self.source = {}
+        self.source['hello'] = DBSource(source = 'hello', version = '2.2-2', \
             maintainer = self.maintainer['maintainer'], \
             changedby = self.maintainer['uploader'], \
             poolfile = self.file['hello'], install_date = self.now())
+        self.source['hello'].suites.append(self.suite['sid'])
+        self.source['hello_old'] = DBSource(source = 'hello', version = '2.2-1', \
+            maintainer = self.maintainer['maintainer'], \
+            changedby = self.maintainer['uploader'], \
+            poolfile = self.file['hello_old'], install_date = self.now())
+        self.source['hello_old'].suites.append(self.suite['sid'])
+        self.source['sl'] = DBSource(source = 'sl', version = '3.03-16', \
+            maintainer = self.maintainer['maintainer'], \
+            changedby = self.maintainer['uploader'], \
+            poolfile = self.file['sl'], install_date = self.now())
+        self.source['sl'].suites.append(self.suite['squeeze'])
+        self.source['sl'].suites.append(self.suite['sid'])
+        self.session.add_all(self.source.values())
 
     def test_maintainers(self):
         '''
@@ -198,20 +236,89 @@ class PackageTestCase(DBDakTestCase):
         lazyguy = self.maintainer['lazyguy']
         self.assertEqual(lazyguy,
             self.session.query(Maintainer).get(lazyguy.maintainer_id))
-        self.assertEqual(maintainer.maintains_sources, [self.source])
+        self.assertEqual(3, len(maintainer.maintains_sources))
+        self.assertTrue(self.source['hello'] in maintainer.maintains_sources)
         self.assertEqual(maintainer.changed_sources, [])
         self.assertEqual(uploader.maintains_sources, [])
-        self.assertEqual(uploader.changed_sources, [self.source])
+        self.assertEqual(3, len(uploader.changed_sources))
+        self.assertTrue(self.source['sl'] in uploader.changed_sources)
         self.assertEqual(lazyguy.maintains_sources, [])
         self.assertEqual(lazyguy.changed_sources, [])
 
+    def get_source_in_suite_fail(self):
+        '''
+        This function throws the MultipleResultsFound exception because
+        get_source_in_suite is broken.
+
+        TODO: fix get_source_in_suite
+        '''
+
+        return get_source_in_suite('hello', 'sid', self.session)
+
     def test_sources(self):
-        'test relation between DBSource and PoolFile'
+        'test relation between DBSource and PoolFile or Suite'
 
         self.setup_sources()
-        self.assertEqual(self.file['hello'], self.source.poolfile)
-        self.assertEqual(self.source, self.file['hello'].source)
-        self.assertEqual(None, self.file['sl'].source)
+        # test PoolFile
+        self.assertEqual(self.file['hello'], self.source['hello'].poolfile)
+        self.assertEqual(self.source['hello'], self.file['hello'].source)
+        self.assertEqual(None, self.file['python'].source)
+        # test Suite
+        squeeze = self.session.query(Suite). \
+            filter(Suite.sources.contains(self.source['sl'])). \
+            order_by(Suite.suite_name)[1]
+        self.assertEqual(self.suite['squeeze'], squeeze)
+        self.assertEqual(1, len(squeeze.sources))
+        self.assertEqual(self.source['sl'], squeeze.sources[0])
+        sl = self.session.query(DBSource). \
+            filter(DBSource.suites.contains(self.suite['squeeze'])).one()
+        self.assertEqual(self.source['sl'], sl)
+        self.assertEqual(2, len(sl.suites))
+        self.assertTrue(self.suite['sid'] in sl.suites)
+        # test get_source_in_suite()
+        self.assertRaises(MultipleResultsFound, self.get_source_in_suite_fail)
+        self.assertEqual(None, \
+            get_source_in_suite('hello', 'squeeze', self.session))
+        self.assertEqual(self.source['sl'], \
+            get_source_in_suite('sl', 'sid', self.session))
+        # test get_suites_source_in()
+        self.assertEqual([self.suite['sid']], \
+            get_suites_source_in('hello', self.session))
+        self.assertEqual(2, len(get_suites_source_in('sl', self.session)))
+        self.assertTrue(self.suite['squeeze'] in \
+            get_suites_source_in('sl', self.session))
+
+    def test_upload(self):
+        'tests function add_dsc_to_db()'
+
+        self.setup_maintainers()
+        self.setup_locations()
+        self.setup_poolfiles()
+        pkg = Pkg()
+        pkg.dsc['source'] = 'hello'
+        pkg.dsc['version'] = '2.2-2'
+        pkg.dsc['maintainer'] = self.maintainer['maintainer'].name
+        pkg.changes['changed-by'] = self.maintainer['uploader'].name
+        pkg.changes['fingerprint'] = 'deadbeef'
+        pkg.changes['distribution'] = { 'sid': '' }
+        self.session.flush()
+        self.session.refresh(self.file['hello'])
+        pkg.files['hello_2.2-2.dsc'] = { \
+            'component': 'main',
+            'location id': self.loc['main'].component_id,
+            'files id': self.file['hello'].file_id }
+        pkg.dsc_files = {}
+        upload = Upload(pkg)
+        (source, dsc_component, dsc_location_id, pfs) = \
+            add_dsc_to_db(upload, 'hello_2.2-2.dsc', self.session)
+        self.session.refresh(source)
+        self.assertEqual('hello', source.source)
+        self.assertEqual('2.2-2', source.version)
+        self.assertEqual('sid', source.suites[0].suite_name)
+        self.assertEqual('main', dsc_component)
+        # no dsc files defined above
+        self.assertEqual(None, dsc_location_id)
+        self.assertEqual([], pfs)
 
 
 if __name__ == '__main__':