]> git.donarmstrong.com Git - dsa-puppet.git/blob - modules/roles/files/static-mirroring/static-master-run
Prevent python from releasing locks prematurely, II
[dsa-puppet.git] / modules / roles / files / static-mirroring / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import sys
9 import tempfile
10 import time
11
12 base='/home/staticsync/static-master'
13 subdirs = { 'master': 'master',       # where updates from off-site end up going, the source of everything we do here
14             'cur':    'current-push', # where clients rsync from during a mirror push
15             'live':   'current-live'} # what is currently on the mirrors, and what they rsync from when they come back from being down
16 serialname = '.serial'
17
18 clients = []
19 with open('/etc/static-clients.conf') as f:
20   for line in f:
21     line = line.strip()
22     if line == "": continue
23     if line.startswith('#'): continue
24     clients.append(line)
25
26 def log(m):
27   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
28   print t, m
29
30 def stage1(pipes, status):
31   for c in clients:
32     p = pipes[c]
33     while 1:
34       line = p.stdout.readline()
35       if line == '':
36         status[c] = 'failed'
37         p.stdout.close()
38         p.stdin.close()
39         p.wait()
40         log("%s: failed with returncode %d"%(c,p.returncode))
41         break
42
43       line = line.strip()
44       log("%s >> %s"%(c, line))
45       if not line.startswith('[MSM]'): continue
46       kw = string.split(line, ' ', 2)[1]
47
48       if kw == 'ALREADY-CURRENT':
49         pipes[c].stdout.close()
50         pipes[c].stdin.close()
51         p.wait()
52         if p.returncode == 0:
53           log("%s: already current"%(c,))
54           status[c] = 'ok'
55         else:
56           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
57           status[c] = 'failed'
58         break
59       elif kw == 'STAGE1-DONE':
60         log("%s: waiting"%(c,))
61         status[c] = 'waiting'
62         break
63       elif kw in ['STAGE1-START']:
64         pass
65       else:
66         log("%s: ignoring unknown line"%(c,))
67
68 def count_statuses(status):
69   cnt = {}
70   for k in status:
71     v = status[k]
72     if v not in cnt: cnt[v] = 1
73     else: cnt[v] += 1
74   return cnt
75
76 def stage2(pipes, status, command):
77   for c in clients:
78     if status[c] != 'waiting': continue
79     log("%s << %s"%(c, command))
80     pipes[c].stdin.write("%s\n"%(command,))
81
82   for c in clients:
83     if status[c] != 'waiting': continue
84     p = pipes[c]
85
86     (o, dummy) = p.communicate('')
87     for l in string.split(o, "\n"):
88       log("%s >> %s"%(c, l))
89     log("%s: returned %d"%(c, p.returncode))
90
91 def callout(serial):
92   log("Calling clients...")
93   pipes = {}
94   status = {}
95   for c in clients:
96     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', "%d"%(serial,)]
97     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
98     pipes[c] = p
99     status[c] = 'in-progress'
100
101   log("Stage 1...")
102   stage1(pipes, status)
103   log("Stage 1 done.")
104   cnt = count_statuses(status)
105
106   if 'failed' in cnt > 0:
107     log("Some clients failed, aborting...")
108     stage2(pipes, status, 'abort')
109     return False
110   elif 'waiting' in cnt > 0:
111     log("Committing...")
112     stage2(pipes, status, 'go')
113     return True
114   else:
115     log("All clients up to date.")
116     return True
117
118
119 cleanup_dirs = []
120 def run_mirror():
121   # setup
122   master = os.path.join(base, subdirs['master'])
123   cur = os.path.join(base, subdirs['cur'])
124   live = os.path.join(base, subdirs['live'])
125   tmpdir_new = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_new);
126   tmpdir_old = tempfile.mkdtemp(prefix='live.old-', dir=base); cleanup_dirs.append(tmpdir_old);
127   os.chmod(tmpdir_new, 0755)
128
129   locks = []
130   for p in (master, live, tmpdir_new):
131     if not os.path.exists(p): os.mkdir(p, 0755)
132     fd = os.open(p, os.O_RDONLY)
133     log("Acquiring lock for %s(%d)."%(p,fd))
134     fcntl.flock(fd, fcntl.LOCK_EX)
135     locks.append(fd)
136   log("All locks acquired.")
137
138   serialfile = os.path.join(master, serialname)
139   try:
140     with open(serialfile) as f: serial = int(f.read())
141   except:
142     serial = int(time.time())
143     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
144   log("Serial is %s."%(serial,))
145
146   log("Populating %s."%(tmpdir_new,))
147   subprocess.check_call(['cp', '-al', os.path.join(master, '.'), tmpdir_new])
148
149   if os.path.exists(cur):
150     log("Removing existing %s."%(cur,))
151     shutil.rmtree(cur)
152
153   log("Renaming %s to %s."%(tmpdir_new, cur))
154   os.rename(tmpdir_new, cur)
155
156   proceed = callout(serial)
157
158   if proceed:
159     log("Moving %s aside."%(live,))
160     os.rename(live, os.path.join(tmpdir_old, 'old'))
161     log("Renaming %s to %s."%(cur, live))
162     os.rename(cur, live)
163     log("Cleaning up.")
164     shutil.rmtree(tmpdir_old)
165     log("Done.")
166     ret = True
167   else:
168     log("Aborted.")
169     ret = False
170
171   for fd in locks:
172     os.close(fd)
173
174   return ret
175
176
177 ok = False
178 try:
179   ok = run_mirror()
180 finally:
181   for p in cleanup_dirs:
182     if os.path.exists(p): shutil.rmtree(p)
183
184 if not ok:
185   sys.exit(1)
186 # vim:set et:
187 # vim:set ts=2:
188 # vim:set shiftwidth=2: