]> git.donarmstrong.com Git - dsa-puppet.git/blob - modules/roles/files/static-mirroring/static-master-run
Ship list of mirrors with puppet too
[dsa-puppet.git] / modules / roles / files / static-mirroring / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import tempfile
9 import time
10
11 base='/home/staticsync/static-master'
12 subdirs = { 'master': 'master',       # where updates from off-site end up going, the source of everything we do here
13             'cur':    'current-push', # where clients rsync from during a mirror push
14             'live':   'current-live'} # what is currently on the mirrors, and what they rsync from when they come back from being down
15 serialname = '.serial'
16
17 clients = []
18 with open('/etc/static-clients.conf') as f:
19   for line in f:
20     line = line.strip()
21     if line == "": continue
22     if line.startswith('#'): continue
23     clients.append(line)
24
25 def log(m):
26   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
27   print t, m
28
29 def stage1(pipes, status):
30   for c in clients:
31     p = pipes[c]
32     while 1:
33       line = p.stdout.readline()
34       if line == '':
35         status[c] = 'failed'
36         p.stdout.close()
37         p.stdin.close()
38         p.wait()
39         log("%s: failed with returncode %d"%(c,p.returncode))
40         break
41
42       line = line.strip()
43       log("%s >> %s"%(c, line))
44       if not line.startswith('[MSM]'): continue
45       kw = string.split(line, ' ', 2)[1]
46
47       if kw == 'ALREADY-CURRENT':
48         pipes[c].stdout.close()
49         pipes[c].stdin.close()
50         p.wait()
51         if p.returncode == 0:
52           log("%s: already current"%(c,))
53           status[c] = 'ok'
54         else:
55           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
56           status[c] = 'failed'
57         break
58       elif kw == 'STAGE1-DONE':
59         log("%s: waiting"%(c,))
60         status[c] = 'waiting'
61         break
62       elif kw in ['STAGE1-START']:
63         pass
64       else:
65         log("%s: ignoring unknown line"%(c,))
66
67 def count_statuses(status):
68   cnt = {}
69   for k in status:
70     v = status[k]
71     if v not in cnt: cnt[v] = 1
72     else: cnt[v] += 1
73   return cnt
74
75 def stage2(pipes, status, command):
76   for c in clients:
77     if status[c] != 'waiting': continue
78     log("%s << %s"%(c, command))
79     pipes[c].stdin.write("%s\n"%(command,))
80
81   for c in clients:
82     if status[c] != 'waiting': continue
83     p = pipes[c]
84
85     (o, dummy) = p.communicate('')
86     for l in string.split(o, "\n"):
87       log("%s >> %s"%(c, l))
88     log("%s: returned %d"%(c, p.returncode))
89
90 def callout(serial):
91   log("Calling clients...")
92   pipes = {}
93   status = {}
94   for c in clients:
95     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', "%d"%(serial,)]
96     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
97     pipes[c] = p
98     status[c] = 'in-progress'
99
100   log("Stage 1...")
101   stage1(pipes, status)
102   log("Stage 1 done.")
103   cnt = count_statuses(status)
104
105   if 'failed' in cnt > 0:
106     log("Some clients failed, aborting...")
107     stage2(pipes, status, 'abort')
108     return False
109   elif 'waiting' in cnt > 0:
110     log("Committing...")
111     stage2(pipes, status, 'go')
112     return True
113   else:
114     log("All clients up to date.")
115     return True
116
117
118 cleanup_dirs = []
119 def run_mirror():
120   # setup
121   master = os.path.join(base, subdirs['master'])
122   cur = os.path.join(base, subdirs['cur'])
123   live = os.path.join(base, subdirs['live'])
124   tmpdir_new = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_new);
125   tmpdir_old = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_old);
126   os.chmod(tmpdir_new, 0755)
127
128   locks = []
129   for p in (master, live, tmpdir_new):
130     if not os.path.exists(p): os.mkdir(p, 0755)
131     fd = os.open(p, os.O_RDONLY)
132     log("Acquiring lock for %s(%d)."%(p,fd))
133     fcntl.flock(fd, fcntl.LOCK_EX)
134     locks.append(fd)
135   log("All locks acquired.")
136
137   serialfile = os.path.join(master, serialname)
138   try:
139     with open(serialfile) as f: serial = int(f.read())
140   except:
141     serial = int(time.time())
142     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
143   log("Serial is %s."%(serial,))
144
145   log("Populating %s."%(tmpdir_new,))
146   subprocess.check_call(['cp', '-al', os.path.join(master, '.'), tmpdir_new])
147
148   if os.path.exists(cur):
149     log("Removing existing %s."%(cur,))
150     shutil.rmtree(cur)
151
152   log("Renaming %s to %s."%(tmpdir_new, cur))
153   os.rename(tmpdir_new, cur)
154
155   proceed = callout(serial)
156
157   if proceed:
158     log("Moving %s aside."%(live,))
159     os.rename(live, os.path.join(tmpdir_old, 'old'))
160     log("Renaming %s to %s."%(cur, live))
161     os.rename(cur, live)
162     log("Cleaning up.")
163     shutil.rmtree(tmpdir_old)
164     log("Done.")
165   else:
166     log("Aborted.")
167
168
169 try:
170   run_mirror()
171 finally:
172   for p in cleanup_dirs:
173     if os.path.exists(p): shutil.rmtree(p)
174 # vim:set et:
175 # vim:set ts=2:
176 # vim:set shiftwidth=2: