stanza = self.read_control()
return apt_pkg.TagSection(stanza)
+ @property
+ def proxy(self):
+ session = object_session(self)
+ query = session.query(BinaryMetadata).filter_by(binary=self)
+ return MetadataProxy(session, query)
+
__all__.append('DBBinary')
@session_wrapper
self.contents_sha1 = signed_file.contents_sha1()
return self
+ def query(self, session):
+ return session.query(SignatureHistory).filter_by(fingerprint=self.fingerprint, signature_timestamp=self.signature_timestamp, contents_sha1=self.contents_sha1).first()
+
__all__.append('SignatureHistory')
################################################################################
fileset.add(name)
return fileset
+ @property
+ def proxy(self):
+ session = object_session(self)
+ query = session.query(SourceMetadata).filter_by(source=self)
+ return MetadataProxy(session, query)
+
__all__.append('DBSource')
@session_wrapper
################################################################################
+class MetadataProxy(object):
+ def __init__(self, session, query):
+ self.session = session
+ self.query = query
+
+ def _get(self, key):
+ metadata_key = self.session.query(MetadataKey).filter_by(key=key).first()
+ if metadata_key is None:
+ return None
+ metadata = self.query.filter_by(key=metadata_key).first()
+ return metadata
+
+ def __contains__(self, key):
+ if self._get(key) is not None:
+ return True
+ return False
+
+ def __getitem__(self, key):
+ metadata = self._get(key)
+ if metadata is None:
+ raise KeyError
+ return metadata.value
+
+ def get(self, key, default=None):
+ try:
+ return self[key]
+ except KeyError:
+ return default
+
+################################################################################
+
class VersionCheck(ORMObject):
def __init__(self, *args, **kwargs):
pass