#!/usr/bin/env python """Tell me who you are! """ import urllib import apt from debian_bundle import deb822 from debian_bundle import debtags from ConfigParser import SafeConfigParser import subprocess import os import sys import shutil import pysvn from optparse import OptionParser, Option, OptionGroup, OptionConflictError def transCodename(codename, cfg): """Translate a known codename into a release description. Unknown codenames will simply be returned as is. """ # strip repository codename codename = codename[codename.find('_') + 1:] # if we know something, tell if codename in cfg.options('release codenames'): return cfg.get('release codenames', codename) else: return codename def createDir(path): if os.path.exists(path): return ps = path.split(os.path.sep) for i in range(1,len(ps) + 1): p = os.path.sep.join(ps[:i]) if not os.path.exists(p): os.mkdir(p) class AptListsCache(object): def __init__(self, cachedir='cache', ro_cachedirs=None, init_db=None): self.cachedir = cachedir if not ro_cachedirs is None: self.ro_cachedirs = ro_cachedirs else: self.ro_cachedirs = [] # always use system cache #self.ro_cachedirs.append('/var/lib/apt/lists/') # create cachedir createDir(self.cachedir) def get(self, url, update=False): """Looks in the cache if the file is there and takes the cached one. Otherwise it is downloaded first. Knows how to deal with http:// and svn:// URLs. :Return: file handler """ # look whether it is compressed cext = url.split('.')[-1] if cext in ['gz', 'bz2']: target_url = url[:-1 * len(cext) -1] else: # assume not compressed target_url = url cext = None # turn url into a filename -- mimik what APT does for # /var/lib/apt/lists/ tfilename = '_'.join(target_url.split('/')[2:]) # if we need to download anyway do not search if update: cfilename = os.path.join(self.cachedir, tfilename) else: # look for the uncompressed file anywhere in the cache cfilename = None for cp in [self.cachedir] + self.ro_cachedirs: if os.path.exists(os.path.join(cp, tfilename)): cfilename = os.path.join(cp, tfilename) # nothing found? if cfilename is None: # add cache item cfilename = os.path.join(self.cachedir, tfilename) update = True # if updated needed -- download if update: print 'Caching file from %s' % url if url.startswith('svn://'): # export from SVN pysvn.Client().export(url, cfilename) if url.startswith('http://'): # download tempfile, ignored = urllib.urlretrieve(url) # decompress decompressor = None if cext == 'gz': decompressor = 'gzip' elif cext == 'bz2': decompressor = 'bzip2' elif cext == None: decompressor = None else: raise ValueError, \ "Don't know how to decompress %s files" \ % cext if not decompressor is None: if subprocess.call([decompressor, '-d', '-q', '-f', tempfile]) == 1: raise RuntimeError, \ "Something went wrong while decompressing '%s'" \ % tempfile # move decompressed file into cache shutil.move(os.path.splitext(tempfile)[0], cfilename) # XXX do we need that if explicit filename is provided? urllib.urlcleanup() # open cached file fh = open(cfilename, 'r') return fh class DebianPkgArchive(SafeConfigParser): """ """ def __init__(self, cache=None, init_db=None): """ :Parameter: """ SafeConfigParser.__init__(self) # read an existing database if provided if not init_db is None: self.read(init_db) # use provided file cache or use fresh one if not cache is None: self.cache = cache else: self.cache = AptListsCache() # init debtags DB self.dtags = debtags.DB() self.dtags.read(open('/var/lib/debtags/package-tags')) # init package filter self.pkgfilter = None self._updateReleases() def _updateReleases(self): self.releases = {} # for all packages for p in self.sections(): # no releases, nothing to do if not self.has_option(p, 'releases'): continue # for all releases of this package for r in \ [rel.strip() for rel in self.get(p, 'releases').split(',')]: # push release code if not self.releases.has_key(r): self.releases[r] = [] # store component component = self.get(p, '%s component' % r) if not component in self.releases[r]: self.releases[r].append(component) def __repr__(self): """Generate INI file content for current content. """ # make adaptor to use str as file-like (needed for ConfigParser.write() class file2str(object): def __init__(self): self.__s = '' def write(self, val): self.__s += val def str(self): return self.__s r = file2str() self.write(r) return r.str() def save(self, filename): """Write current content to a file. """ f = open(filename, 'w') self.write(f) f.close() def ensureUnique(self, section, option, value): if not self.has_option(section, option): self.set(section, option, value) else: if not self.get(section, option) == value: raise ValueError, "%s: %s is not unique (%s != %s)" \ % (section, option, self.get(section, option), value) def appendUniqueCSV(self, section, option, value): """ """ if not self.has_option(section, option): self.set(section, option, value) else: l = self.get(section, option).split(', ') if not value in l: self.set(section, option, ', '.join(l + [value])) def getReleaseInfo(self, rurl, force_update=False): # root URL of the repository baseurl = '/'.join(rurl.split('/')[:-1]) # get the release file from the cache release_file = self.cache.get(rurl, update=force_update) # create parser instance rp = deb822.Release(release_file) # architectures on this dist archs = rp['Architectures'].split() components = rp['Components'].split() # compile a new codename that also considers the repository label # to distinguish between official and unofficial repos. label = rp['Label'] codename = rp['Codename'] labelcode = '_'.join([rp['Label'], rp['Codename']]) # cleanup release_file.close() return {'baseurl': baseurl, 'archs': archs, 'components': components, 'codename': codename, 'label': label, 'labelcode': labelcode} def checkOfficialRelease(self, rurl, force_update=False): ri = self.getReleaseInfo(rurl, force_update=force_update) # try with a i386 packages file, since that should be the most common # one # loop over all components for c in ri['components']: pkgsurl = self.buildPkgsURL(ri['baseurl'], c, 'i386') packages_file = self.cache.get(pkgsurl, update=force_update) # now check every package, whether we also have it in the DB already for st in deb822.Packages.iter_paragraphs(packages_file): pkg = st['Package'] if self.has_section(pkg): # store the label code self.appendUniqueCSV(pkg, "releases", ri['labelcode']) # and the associated component self.ensureUnique(pkg, "%s component" % ri['labelcode'], c) # and version self.set(pkg, "%s version" % ri['labelcode'], st['Version']) # cleanup packages_file.close() def buildPkgsURL(self, baseurl, component, arch): return '/'.join([baseurl, component, 'binary-' + arch, 'Packages.bz2']) def importRelease(self, rurl, force_update=False): ri = self.getReleaseInfo(rurl, force_update=force_update) # compile the list of Packages files to parse and parse them for c in ri['components']: for a in ri['archs']: # compile packages URL pkgsurl = self.buildPkgsURL(ri['baseurl'], c, a) # retrieve from cache packages_file = self.cache.get(pkgsurl, update=force_update) # parse for stanza in deb822.Packages.iter_paragraphs(packages_file): self._storePkg(stanza, ri['labelcode'], c, ri['baseurl']) # cleanup packages_file.close() self._updateReleases() def _storePkg(self, st, codename, component, baseurl): """ :Parameter: st: Package section """ pkg = st['Package'] if not self.has_section(pkg): self.add_section(pkg) # do nothing if package is not in filter if there is any if not self.pkgfilter is None and not pkg in self.pkgfilter: self.ensureUnique(pkg, 'visibility', 'shadowed') else: self.ensureUnique(pkg, 'visibility', 'featured') # which releases self.appendUniqueCSV(pkg, "releases", codename) # arch listing self.appendUniqueCSV(pkg, "%s archs" % codename, st['Architecture']) # versions self.ensureUnique(pkg, "%s version %s" % (codename, st['Architecture']), st['Version']) # link to .deb self.ensureUnique(pkg, "%s file %s" % (codename, st['Architecture']), '/'.join(baseurl.split('/')[:-2] + [st['Filename']])) # component self.ensureUnique(pkg, '%s component' % codename, component) # store the pool url self.ensureUnique(pkg, "%s poolurl" % codename, '/'.join(baseurl.split('/')[:-2] \ + [os.path.dirname(st['Filename'])])) # now the stuff where a single variant is sufficient and where we go for # the latest available one if self.has_option(pkg, "newest version") \ and apt.VersionCompare(st['Version'], self.get(pkg, "newest version")) < 0: return # everything from here will overwrite existing ones # we seems to have an updated package self.set(pkg, "newest version", st['Version']) # description self.set(pkg, "description", st['Description'].replace('%', '%%')) # maintainer self.set(pkg, "maintainer", st['Maintainer']) # optional stuff if st.has_key('Homepage'): self.set(pkg, 'homepage', st['Homepage']) # query debtags debtags = self.dtags.tagsOfPackage(pkg) if debtags: self.set(pkg, 'debtags', ', '.join(debtags)) def writeSourcesLists(self, outdir, cfg): createDir(outdir) createDir(os.path.join(outdir, 'static')) fl = open(os.path.join(outdir, 'sources_lists'), 'w') for trans, r in sorted([(transCodename(k, cfg), k) for k in self.releases.keys() if k.startswith('apsy')]): # need to turn 'apsy_lenny' back into 'lenny' debneuro_r = r.split('_')[1] f = open(os.path.join(outdir, 'static', 'debneuro.%s.sources.list' % debneuro_r), 'w') f.write("deb http://apsy.gse.uni-magdeburg.de/debian %s %s\n" \ % (debneuro_r, ' '.join(self.releases[r]))) f.write("deb-src http://apsy.gse.uni-magdeburg.de/debian %s %s\n" \ % (debneuro_r, ' '.join(self.releases[r]))) # XXX use :download: role from sphinx 0.6 on #fl.write('* `%s `_\n' \ fl.write('* `%s <_static/debneuro.%s.sources.list>`_\n' \ % (trans, debneuro_r)) f.close() fl.close() def importProspectivePkgsFromTaskFile(self, url): fh = self.cache.get(url) for st in deb822.Packages.iter_paragraphs(fh): # do not stop unless we have a description if not st.has_key('Pkg-Description'): continue if st.has_key('Depends'): pkg = st['Depends'] elif st.has_key('Suggests'): pkg = st['Suggests'] else: print 'Warning: Cannot determine name of prospective package ' \ '... ignoring.' continue # store pkg info if not self.has_section(pkg): self.add_section(pkg) # prospective ones are always featured self.ensureUnique(pkg, 'visibility', 'featured') # pkg description self.set(pkg, "description", st['Pkg-Description'].replace('%', '%%')) # optional stuff if st.has_key('Homepage'): self.set(pkg, 'homepage', st['Homepage']) if st.has_key('Pkg-URL'): self.set(pkg, 'external pkg url', st['Pkg-URL']) if st.has_key('WNPP'): self.set(pkg, 'wnpp debian', st['WNPP']) if st.has_key('License'): self.set(pkg, 'license', st['License']) # treat responsible as maintainer if st.has_key('Responsible'): self.set(pkg, "maintainer", st['Responsible']) def setPkgFilterFromTaskFile(self, urls): pkgs = [] for task in urls: fh = self.cache.get(task) # loop over all stanzas for stanza in deb822.Packages.iter_paragraphs(fh): if stanza.has_key('Depends'): pkg = stanza['Depends'] elif stanza.has_key('Suggests'): pkg = stanza['Suggests'] else: continue # account for multiple packages per line if pkg.count(','): pkgs += [p.strip() for p in pkg.split(',')] else: pkgs.append(pkg.strip()) # activate filter self.pkgfilter = pkgs def genPkgPage(db, pkg, cfg): """ :Parameters: db: database pkg: str Package name """ descr = db.get(pkg, 'description').split('\n') s = '' # only put index markup for featured packages if db.get(pkg, 'visibility') == 'featured': s = '.. index:: %s, ' % pkg s += '\n' # add a subset of available debtags (if present) if db.has_option(pkg, 'debtags'): # filter tags tags = [t for t in db.get(pkg, 'debtags').split(', ') if t.split('::')[0] in ['field', 'works-with']] if len(tags): s += '.. index:: %s\n\n' % ', '.join(tags) # main ref target for this package s += '.. _deb_' + pkg + ':\n' # separate header from the rest s += '\n\n\n' header = '%s -- %s' % (pkg, descr[0]) s += '*' * (len(header) + 2) s += '\n ' + header + '\n' s += '*' * (len(header) + 2) + '\n\n' # put description # XXX honour formating syntax s += '\n'.join([l.lstrip(' .') for l in descr[1:]]) s += '\n' if db.has_option(pkg, 'homepage'): s += '\n**Homepage**: %s\n' % db.get(pkg, 'homepage') s += '\nBinary packages'\ '\n===============\n' s += genMaintainerSection(db, pkg) if db.has_option(pkg, 'wnpp debian'): s += """\ A Debian packaging effort has been officially announced. Please see the corresponding `intent-to-package bug report`_ for more information about its current status. .. _intent-to-package bug report: http://bugs.debian.org/%s """ % db.get(pkg, 'wnpp debian') # write repository content summary for NeuroDebian s += getReposContentSummary(db, cfg, 'apsy', pkg) # see if there is something about a package in Debian proper s += getDebianRefs(db, cfg, pkg) return s def genMaintainerSection(db, pkg): s = '' if not db.has_option(pkg, 'maintainer'): s += """\ Currently, nobody seems to be responsible for creating or maintaining Debian packages of this software. """ return s # there is someone responsible maintainer = db.get(pkg, 'maintainer') # do we have actual packages, or is it just a note if not db.has_option(pkg, 'releases'): s += """\ There are currently no binary packages available. However, the last known packaging effort was started by %s which meanwhile might have led to an initial unofficial Debian packaging. """ % maintainer return s s += '\n**Maintainer**: %s\n\n' % maintainer if not maintainer.startswith('Michael Hanke'): s += """\ .. note:: Do not contact the original package maintainer regarding bugs in this unofficial binary package. Instead, contact the repository maintainer at michael.hanke@gmail.com\ . """ return s def getDebianRefs(db, cfg, pkg): # no release, nothing to do if not db.has_option(pkg, 'releases'): return '' # which Debian release is this package part of? debrels = [r.split('_')[1] for r in db.get(pkg, 'releases').split(', ') if r.startswith('Debian')] # do nothing if there is no package in Debian proper if not len(debrels): return '' s = """\ Official Debian archive ----------------------- This package is available from the offical Debian archive for: * %s Please see the following ressources for more information: * `Debian package summary page`_ * `Bugreports in the Debian bug tracking system`_ * `Debian package popularity statistics`_ .. _Debian package summary page: http://packages.debian.org/%s .. _Bugreports in the Debian bug tracking system: http://bugs.debian.org/%s .. _Debian package popularity statistics: http://qa.debian.org/popcon.php?package=%s """ % ('\n* '.join(['`%s `_ *[%s]*: %s' \ % (transCodename(rel, cfg), rel, db.get(pkg, 'debian_%s component' % rel), db.get(pkg, 'debian_%s version' % rel)) for rel in debrels]), pkg, pkg, pkg) return s def getReposContentSummary(db, cfg, reposlabel, pkg): # do nothing if the are no packages if not db.has_option(pkg, 'releases'): return '' reposname = cfg.get('repository labels', reposlabel) s = '\n%s\n%s\n' % (reposname, '-' * len(reposname)) s += """\ The repository contains binary packages for the following distribution releases and system architectures. The corresponding source packages are available too. .. note:: Do not download this package manually if you plan to use it regularly. Instead configure your package manager to use this repository by following the instructions on the :ref:`front page `. """ # for all releases this package is part of for rel in db.get(pkg, 'releases').split(', '): # ignore items associated with other repositories if not rel.split('_')[0] == reposlabel: continue # write release description and component s += '\n%s *[%s]*:\n ' \ % (transCodename(rel, cfg), db.get(pkg, '%s component' % rel)) s += '`source <%s>`_' % db.get(pkg, '%s poolurl' % rel) # archs this package is available for archs = db.get(pkg, '%s archs' % rel).split(', ') # extract all present versions for any arch versions = [db.get(pkg, '%s version %s' % (rel, arch)) for arch in archs] # if there is only a single version for all of them, simplify the list single_ver = versions.count(versions[0]) == len(versions) if single_ver: s += ', ' # only one version string for all s += ', '.join(['`%s <%s>`_' \ % (arch, db.get(pkg, '%s file %s' % (rel, arch))) for arch in archs]) s += ' (%s)' % versions[0] else: s += ', ' # a separate version string for each arch s += ', '.join(['`%s <%s>`_ (%s)' \ % (arch, db.get(pkg, '%s file %s' % (rel, arch)), db.get(pkg, '%s version %s' % (rel, arch))) for arch in archs]) s += '\n' return s def maintainer2email(maint): return maint.split('<')[1].rstrip('>') def writePkgsBy(db, key, value2id, outdir, heading): createDir(outdir) nwkey = key.replace(' ', '') createDir(os.path.join(outdir, 'by%s' % nwkey)) collector = {} # get packages by maintainer for p in db.sections(): # only featured packages if db.get(p, 'visibility') == 'shadowed': continue if db.has_option(p, key): by = db.get(p, key) if not collector.has_key(by): collector[by] = (value2id(by), [p]) else: collector[by][1].append(p) toc = open(os.path.join(outdir, 'by%s.rst' % nwkey), 'w') toc.write('.. index:: Packages by %s\n.. _by%s:\n\n' % (key, key)) toc_heading = 'Packages by %s' % key toc.write('%s\n%s\n\n' % (toc_heading, '=' * len(toc_heading))) toc.write('.. toctree::\n :maxdepth: 1\n\n') # summary page per maintainer for by in sorted(collector.keys()): toc.write(' by%s/%s\n' % (nwkey, collector[by][0])) fh = open(os.path.join(outdir, 'by%s' % nwkey, collector[by][0] + '.rst'), 'w') fh.write('.. index:: %s\n.. _%s:\n\n' % (by, by)) hdr = heading.replace('', by) fh.write(hdr + '\n') fh.write('=' * len(hdr) + '\n\n') # write sorted list of packages for p in sorted(collector[by][1]): fh.write('* :ref:`deb_%s`\n' % p) fh.close() toc.close() def writeRst(db, outdir, cfg, addenum_dir=None): createDir(outdir) createDir(os.path.join(outdir, 'pkgs')) # open pkgs toctree toc = open(os.path.join(outdir, 'pkgs.rst'), 'w') # write header toc.write('.. _full_pkg_list:\n\n') toc.write('Archive content\n===============\n\n' '.. toctree::\n :maxdepth: 1\n\n') for p in sorted(db.sections()): print "Generating page for '%s'" % p pf = open(os.path.join(outdir, 'pkgs', '%s.rst' % p), 'w') pf.write(genPkgPage(db, p, cfg)) # check for doc addons if addenum_dir is not None: addenum = os.path.join(os.path.abspath(addenum_dir), '%s.rst' % p) if os.path.exists(addenum): pf.write('\n\n.. include:: %s\n' % addenum) pf.close() toc.write(' pkgs/%s\n' % p) toc.close() def prepOptParser(op): # use module docstring for help output op.usage = "%s [OPTIONS]\n\n" % sys.argv[0] + __doc__ op.add_option("--db", action="store", type="string", dest="db", default=None, help="Database file to read. Default: None") op.add_option("--cfg", action="store", type="string", dest="cfg", default=None, help="Repository config file.") op.add_option("-o", "--outdir", action="store", type="string", dest="outdir", default=None, help="Target directory for ReST output. Default: None") op.add_option("-r", "--release-url", action="append", dest="release_urls", help="None") op.add_option("--pkgaddenum", action="store", dest="addenum_dir", type="string", default=None, help="None") def main(): op = OptionParser(version="%prog 0.0.1") prepOptParser(op) (opts, args) = op.parse_args() if len(args) != 1: print('There needs to be exactly one command') sys.exit(1) cmd = args[0] if opts.cfg is None: print("'--cfg' option is mandatory.") sys.exit(1) cfg = SafeConfigParser() cfg.read(opts.cfg) # load existing db, unless renew is requested if cmd == 'refreshdb': dpa = DebianPkgArchive() else: dpa = DebianPkgArchive(init_db=opts.db) if cmd == 'generate': if opts.outdir is None: print('Not output directory specified!') sys.exit(1) dpa.writeSourcesLists(opts.outdir, cfg) writeRst(dpa, opts.outdir, cfg, opts.addenum_dir) writePkgsBy(dpa, 'maintainer', maintainer2email, opts.outdir, 'Packages maintained by ') # stop here sys.exit(0) if cfg.has_option('packages', 'select taskfiles'): dpa.setPkgFilterFromTaskFile(cfg.get('packages', 'select taskfiles').split()) if cfg.has_option('packages', 'select names'): dpa.pkgfilter += cfg.get('packages', 'select names').split() if cfg.has_option('packages', 'prospective'): for p in cfg.get('packages', 'prospective').split(): dpa.importProspectivePkgsFromTaskFile(p) if cfg.has_option('repositories', 'releases'): for rurl in cfg.get('repositories', 'releases').split(): dpa.importRelease(rurl, force_update=False) if cfg.has_option('repositories', 'releases'): for rurl in cfg.get('repositories', 'releases').split(): dpa.importRelease(rurl, force_update=False) if cfg.has_option('officials', 'releases'): for rurl in cfg.get('officials', 'releases').split(): dpa.checkOfficialRelease(rurl, force_update=False) if not opts.db is None: dpa.save(opts.db) if __name__ == "__main__": main()