threading.Thread.__init__(self)
self.queue = queue
self.session = DBConn().session()
+ self.die = False
+
+ def plsDie(self):
+ self.die = True
def run(self):
cnf = Config()
if not filenames:
# Empty directory (or only subdirectories), next
continue
+ if self.die:
+ return
+
for changesfile in filenames:
if not changesfile.endswith(".changes"):
# Only interested in changes files.
if not get_knownchange(changesfile, self.session):
to_import = ChangesToImport(dirpath, changesfile, count)
- print("enqueue: %s" % to_import)
self.queue.enqueue(to_import)
self.queue.enqueue(EndOfChanges())
threading.Thread.__init__(self)
self.queue = queue
self.session = DBConn().session()
+ self.die = False
+
+ def plsDie(self):
+ self.die = True
def run(self):
while True:
try:
+ if self.die:
+ return
to_import = self.queue.dequeue()
if not to_import:
return
changes = Changes()
changes.changes_file = to_import.changesfile
changesfile = os.path.join(to_import.dirpath, to_import.changesfile)
- print( "STU: %s / %s" % (to_import.dirpath, to_import.changesfile))
changes.changes = parse_changes(changesfile, signing_rules=-1)
changes.changes["fingerprint"] = check_signature(changesfile)
changes.add_known_changes(to_import.dirpath, self.session)
queue = OneAtATime()
- ChangesGenerator(queue).start()
+ threads = [ ChangesGenerator(queue) ]
for i in range(num_threads):
- ImportThread(queue).start()
+ threads.append( ImportThread(queue) )
+
+ try:
+ for thread in threads:
+ thread.start()
+
+ for thread in thrads:
+ thread.join()
+
+ except KeyboardInterrupt:
+ utils.warn("Caught C-c; terminating.")
+ for thread in threads:
+ thread.plsDie()
+
+ for thread in threads:
+ thread.join()
if __name__ == '__main__':