]> git.donarmstrong.com Git - dsa-puppet.git/blob - modules/roles/files/static-mirroring/static-master-run
Add timeline.debian.net to the static CDN
[dsa-puppet.git] / modules / roles / files / static-mirroring / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import sys
9 import tempfile
10 import time
11
12 base="/srv/static.debian.org"
13 serialname = '.serial'
14 had_warnings = False
15
16 allclients = set()
17 with open('/etc/static-clients.conf') as f:
18   for line in f:
19     line = line.strip()
20     if line == "": continue
21     if line.startswith('#'): continue
22     allclients.add(line)
23
24 def log(m):
25   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
26   print t, m
27
28 def stage1(pipes, status, clients):
29   for c in clients:
30     p = pipes[c]
31     while 1:
32       line = p.stdout.readline()
33       if line == '':
34         status[c] = 'failed'
35         p.stdout.close()
36         p.stdin.close()
37         p.wait()
38         log("%s: failed with returncode %d"%(c,p.returncode))
39         break
40
41       line = line.strip()
42       log("%s >> %s"%(c, line))
43       if not line.startswith('[MSM]'): continue
44       kw = string.split(line, ' ', 2)[1]
45
46       if kw == 'ALREADY-CURRENT':
47         pipes[c].stdout.close()
48         pipes[c].stdin.close()
49         p.wait()
50         if p.returncode == 0:
51           log("%s: already current"%(c,))
52           status[c] = 'ok'
53         else:
54           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
55           status[c] = 'failed'
56         break
57       elif kw == 'STAGE1-DONE':
58         log("%s: waiting"%(c,))
59         status[c] = 'waiting'
60         break
61       elif kw in ['STAGE1-START']:
62         pass
63       else:
64         log("%s: ignoring unknown line"%(c,))
65
66 def count_statuses(status):
67   cnt = {}
68   for k in status:
69     v = status[k]
70     if v not in cnt: cnt[v] = 1
71     else: cnt[v] += 1
72   return cnt
73
74 def stage2(pipes, status, command, clients):
75   for c in clients:
76     if status[c] != 'waiting': continue
77     log("%s << %s"%(c, command))
78     pipes[c].stdin.write("%s\n"%(command,))
79
80   for c in clients:
81     if status[c] != 'waiting': continue
82     p = pipes[c]
83
84     (o, dummy) = p.communicate('')
85     for l in string.split(o, "\n"):
86       log("%s >> %s"%(c, l))
87     log("%s: returned %d"%(c, p.returncode))
88
89 def callout(component, serial, clients):
90   log("Calling clients...")
91   pipes = {}
92   status = {}
93   for c in clients:
94     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', component, "%d"%(serial,)]
95     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
96     pipes[c] = p
97     status[c] = 'in-progress'
98
99   log("Stage 1...")
100   stage1(pipes, status, clients)
101   log("Stage 1 done.")
102   cnt = count_statuses(status)
103
104   if 'failed' in cnt and cnt['failed'] >= 2:
105     log("%d clients failed, aborting..."%(cnt['failed'],))
106     stage2(pipes, status, 'abort', clients)
107     return False
108
109   failedmirrorsfile = os.path.join(base, 'master', component + "-failedmirrors")
110   if 'failed' in cnt:
111     log("WARNING: %d clients failed!  Continuing anyway!"%(cnt['failed'],))
112     global had_warnings
113     had_warnings = True
114     f = open(failedmirrorsfile, "w")
115     for c in status:
116       if status[c] == 'failed': f.write(c+"\n")
117     f.close()
118   else:
119     if os.path.exists(failedmirrorsfile): os.unlink(failedmirrorsfile)
120
121   if 'waiting' in cnt:
122     log("Committing...")
123     stage2(pipes, status, 'go', clients)
124     return True
125   else:
126     log("All clients up to date.")
127     return True
128
129 def load_component_info(component):
130   with open('/etc/static-components.conf') as f:
131     for line in f:
132       if line.startswith('#'): continue
133       field = line.strip().split()
134       if len(field) < 4: continue
135       if field[1] != component: continue
136       meta = {}
137       meta['master'] = field[0]
138       meta['sourcehost'] = field[2]
139       meta['sourcedir'] = field[3]
140       meta['extrapushhosts'] = set(field[4].split(',')) if len(field) > 4 else set()
141       meta['extraignoreclients'] = set(field[5].split(',')) if len(field) > 5 else set()
142       return meta
143     else:
144       return None
145
146 cleanup_dirs = []
147 def run_mirror(component):
148   meta = load_component_info(component)
149   if meta is None:
150     log("Component %s not found."%(component,))
151     return False
152   clients = allclients - meta['extraignoreclients']
153
154   # setup
155   basemaster = os.path.join(base, 'master')
156   componentdir = os.path.join(basemaster, component)
157   cur = componentdir + '-current-push'
158   live = componentdir + '-current-live'
159   tmpdir_new = tempfile.mkdtemp(prefix=component+'-live.new-', dir=basemaster); cleanup_dirs.append(tmpdir_new);
160   tmpdir_old = tempfile.mkdtemp(prefix=component+'-live.old-', dir=basemaster); cleanup_dirs.append(tmpdir_old);
161   os.chmod(tmpdir_new, 0755)
162
163   locks = []
164   for p in (componentdir, live, tmpdir_new):
165     if not os.path.exists(p): os.mkdir(p, 0755)
166     fd = os.open(p, os.O_RDONLY)
167     log("Acquiring lock for %s(%d)."%(p,fd))
168     fcntl.flock(fd, fcntl.LOCK_EX)
169     locks.append(fd)
170   log("All locks acquired.")
171
172   serialfile = os.path.join(componentdir, serialname)
173   try:
174     with open(serialfile) as f: serial = int(f.read())
175   except:
176     serial = int(time.time())
177     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
178   log("Serial is %s."%(serial,))
179
180   log("Populating %s."%(tmpdir_new,))
181   subprocess.check_call(['cp', '-al', os.path.join(componentdir, '.'), tmpdir_new])
182
183   if os.path.exists(cur):
184     log("Removing existing %s."%(cur,))
185     shutil.rmtree(cur)
186
187   log("Renaming %s to %s."%(tmpdir_new, cur))
188   os.rename(tmpdir_new, cur)
189
190   proceed = callout(component, serial, clients)
191
192   if proceed:
193     log("Moving %s aside."%(live,))
194     os.rename(live, os.path.join(tmpdir_old, 'old'))
195     log("Renaming %s to %s."%(cur, live))
196     os.rename(cur, live)
197     log("Cleaning up.")
198     shutil.rmtree(tmpdir_old)
199     if had_warnings: log("Done, with warnings.")
200     else: log("Done.")
201     ret = True
202   else:
203     log("Aborted.")
204     ret = False
205
206   for fd in locks:
207     os.close(fd)
208
209   return ret
210
211
212 if len(sys.argv) != 2:
213   print >> sys.stderr, "Usage: %s <component>"%(sys.argv[0],)
214   sys.exit(1)
215 component = sys.argv[1]
216
217 ok = False
218 try:
219   ok = run_mirror(component)
220 finally:
221   for p in cleanup_dirs:
222     if os.path.exists(p): shutil.rmtree(p)
223
224 if not ok:
225   sys.exit(1)
226 # vim:set et:
227 # vim:set ts=2:
228 # vim:set shiftwidth=2: