]> git.donarmstrong.com Git - dsa-puppet.git/blob - modules/roles/files/static-mirroring/static-master-run
f8f2be36ab71d4b93ebf6a94c2f03f7ae9aaed0c
[dsa-puppet.git] / modules / roles / files / static-mirroring / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import sys
9 import tempfile
10 import time
11
12 base="/srv/static.debian.org"
13 serialname = '.serial'
14
15 clients = []
16 with open('/etc/static-clients.conf') as f:
17   for line in f:
18     line = line.strip()
19     if line == "": continue
20     if line.startswith('#'): continue
21     clients.append(line)
22
23 def log(m):
24   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
25   print t, m
26
27 def stage1(pipes, status):
28   for c in clients:
29     p = pipes[c]
30     while 1:
31       line = p.stdout.readline()
32       if line == '':
33         status[c] = 'failed'
34         p.stdout.close()
35         p.stdin.close()
36         p.wait()
37         log("%s: failed with returncode %d"%(c,p.returncode))
38         break
39
40       line = line.strip()
41       log("%s >> %s"%(c, line))
42       if not line.startswith('[MSM]'): continue
43       kw = string.split(line, ' ', 2)[1]
44
45       if kw == 'ALREADY-CURRENT':
46         pipes[c].stdout.close()
47         pipes[c].stdin.close()
48         p.wait()
49         if p.returncode == 0:
50           log("%s: already current"%(c,))
51           status[c] = 'ok'
52         else:
53           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
54           status[c] = 'failed'
55         break
56       elif kw == 'STAGE1-DONE':
57         log("%s: waiting"%(c,))
58         status[c] = 'waiting'
59         break
60       elif kw in ['STAGE1-START']:
61         pass
62       else:
63         log("%s: ignoring unknown line"%(c,))
64
65 def count_statuses(status):
66   cnt = {}
67   for k in status:
68     v = status[k]
69     if v not in cnt: cnt[v] = 1
70     else: cnt[v] += 1
71   return cnt
72
73 def stage2(pipes, status, command):
74   for c in clients:
75     if status[c] != 'waiting': continue
76     log("%s << %s"%(c, command))
77     pipes[c].stdin.write("%s\n"%(command,))
78
79   for c in clients:
80     if status[c] != 'waiting': continue
81     p = pipes[c]
82
83     (o, dummy) = p.communicate('')
84     for l in string.split(o, "\n"):
85       log("%s >> %s"%(c, l))
86     log("%s: returned %d"%(c, p.returncode))
87
88 def callout(component, serial):
89   log("Calling clients...")
90   pipes = {}
91   status = {}
92   for c in clients:
93     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', component, "%d"%(serial,)]
94     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
95     pipes[c] = p
96     status[c] = 'in-progress'
97
98   log("Stage 1...")
99   stage1(pipes, status)
100   log("Stage 1 done.")
101   cnt = count_statuses(status)
102
103   if 'failed' in cnt > 0:
104     log("Some clients failed, aborting...")
105     stage2(pipes, status, 'abort')
106     return False
107   elif 'waiting' in cnt > 0:
108     log("Committing...")
109     stage2(pipes, status, 'go')
110     return True
111   else:
112     log("All clients up to date.")
113     return True
114
115
116 cleanup_dirs = []
117 def run_mirror(component):
118   # setup
119   basemaster = os.path.join(base, 'master')
120   componentdir = os.path.join(basemaster, component)
121   cur = componentdir + '-current-push'
122   live = componentdir + '-current-live'
123   tmpdir_new = tempfile.mkdtemp(prefix=component+'-live.new-', dir=basemaster); cleanup_dirs.append(tmpdir_new);
124   tmpdir_old = tempfile.mkdtemp(prefix=component+'-live.old-', dir=basemaster); cleanup_dirs.append(tmpdir_old);
125   os.chmod(tmpdir_new, 0755)
126
127   locks = []
128   for p in (componentdir, live, tmpdir_new):
129     if not os.path.exists(p): os.mkdir(p, 0755)
130     fd = os.open(p, os.O_RDONLY)
131     log("Acquiring lock for %s(%d)."%(p,fd))
132     fcntl.flock(fd, fcntl.LOCK_EX)
133     locks.append(fd)
134   log("All locks acquired.")
135
136   serialfile = os.path.join(componentdir, serialname)
137   try:
138     with open(serialfile) as f: serial = int(f.read())
139   except:
140     serial = int(time.time())
141     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
142   log("Serial is %s."%(serial,))
143
144   log("Populating %s."%(tmpdir_new,))
145   subprocess.check_call(['cp', '-al', os.path.join(componentdir, '.'), tmpdir_new])
146
147   if os.path.exists(cur):
148     log("Removing existing %s."%(cur,))
149     shutil.rmtree(cur)
150
151   log("Renaming %s to %s."%(tmpdir_new, cur))
152   os.rename(tmpdir_new, cur)
153
154   proceed = callout(component, serial)
155
156   if proceed:
157     log("Moving %s aside."%(live,))
158     os.rename(live, os.path.join(tmpdir_old, 'old'))
159     log("Renaming %s to %s."%(cur, live))
160     os.rename(cur, live)
161     log("Cleaning up.")
162     shutil.rmtree(tmpdir_old)
163     log("Done.")
164     ret = True
165   else:
166     log("Aborted.")
167     ret = False
168
169   for fd in locks:
170     os.close(fd)
171
172   return ret
173
174
175 if len(sys.argv) != 2:
176   print >> sys.stderr, "Usage: %s <component>"%(sys.argv[0],)
177   sys.exit(1)
178 component = sys.argv[1]
179
180 ok = False
181 try:
182   ok = run_mirror(component)
183 finally:
184   for p in cleanup_dirs:
185     if os.path.exists(p): shutil.rmtree(p)
186
187 if not ok:
188   sys.exit(1)
189 # vim:set et:
190 # vim:set ts=2:
191 # vim:set shiftwidth=2: