]> git.donarmstrong.com Git - dsa-puppet.git/blob - modules/roles/files/static-mirroring/static-master-run
static mirroring system
[dsa-puppet.git] / modules / roles / files / static-mirroring / static-master-run
1 #!/usr/bin/python
2
3 import fcntl
4 import os
5 import shutil
6 import subprocess
7 import string
8 import tempfile
9 import time
10
11 base='/home/staticsync/static-master'
12 subdirs = { 'master': 'master',       # where updates from off-site end up going, the source of everything we do here
13             'cur':    'current-push', # where clients rsync from during a mirror push
14             'live':   'current-live'} # what is currently on the mirrors, and what they rsync from when they come back from being down
15 serialname = '.serial'
16
17 clients = []
18 with open('/home/staticsync/etc/static-clients') as f:
19   for line in f:
20     clients.append(line.strip())
21
22 def log(m):
23   t = time.strftime("[%Y-%m-%d %H:%M:%S]", time.gmtime())
24   print t, m
25
26 def stage1(pipes, status):
27   for c in clients:
28     p = pipes[c]
29     while 1:
30       line = p.stdout.readline()
31       if line == '':
32         status[c] = 'failed'
33         p.stdout.close()
34         p.stdin.close()
35         p.wait()
36         log("%s: failed with returncode %d"%(c,p.returncode))
37         break
38
39       line = line.strip()
40       log("%s >> %s"%(c, line))
41       if not line.startswith('[MSM]'): continue
42       kw = string.split(line, ' ', 2)[1]
43
44       if kw == 'ALREADY-CURRENT':
45         pipes[c].stdout.close()
46         pipes[c].stdin.close()
47         p.wait()
48         if p.returncode == 0:
49           log("%s: already current"%(c,))
50           status[c] = 'ok'
51         else:
52           log("%s: said ALREADY-CURRENT but returncode %d"%(c,p.returncode))
53           status[c] = 'failed'
54         break
55       elif kw == 'STAGE1-DONE':
56         log("%s: waiting"%(c,))
57         status[c] = 'waiting'
58         break
59       elif kw in ['STAGE1-START']:
60         pass
61       else:
62         log("%s: ignoring unknown line"%(c,))
63
64 def count_statuses(status):
65   cnt = {}
66   for k in status:
67     v = status[k]
68     if v not in cnt: cnt[v] = 1
69     else: cnt[v] += 1
70   return cnt
71
72 def stage2(pipes, status, command):
73   for c in clients:
74     if status[c] != 'waiting': continue
75     log("%s << %s"%(c, command))
76     pipes[c].stdin.write("%s\n"%(command,))
77
78   for c in clients:
79     if status[c] != 'waiting': continue
80     p = pipes[c]
81
82     (o, dummy) = p.communicate('')
83     for l in string.split(o, "\n"):
84       log("%s >> %s"%(c, l))
85     log("%s: returned %d"%(c, p.returncode))
86
87 def callout(serial):
88   log("Calling clients...")
89   pipes = {}
90   status = {}
91   for c in clients:
92     args = ['ssh', '-o', 'BatchMode=yes', c, 'mirror', "%d"%(serial,)]
93     p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
94     pipes[c] = p
95     status[c] = 'in-progress'
96
97   log("Stage 1...")
98   stage1(pipes, status)
99   log("Stage 1 done.")
100   cnt = count_statuses(status)
101
102   if 'failed' in cnt > 0:
103     log("Some clients failed, aborting...")
104     stage2(pipes, status, 'abort')
105     return False
106   elif 'waiting' in cnt > 0:
107     log("Committing...")
108     stage2(pipes, status, 'go')
109     return True
110   else:
111     log("All clients up to date.")
112     return True
113
114
115 cleanup_dirs = []
116 def run_mirror():
117   # setup
118   master = os.path.join(base, subdirs['master'])
119   cur = os.path.join(base, subdirs['cur'])
120   live = os.path.join(base, subdirs['live'])
121   tmpdir_new = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_new);
122   tmpdir_old = tempfile.mkdtemp(prefix='live.new-', dir=base); cleanup_dirs.append(tmpdir_old);
123   os.chmod(tmpdir_new, 0755)
124
125   locks = []
126   for p in (master, live, tmpdir_new):
127     if not os.path.exists(p): os.mkdir(p, 0755)
128     fd = os.open(p, os.O_RDONLY)
129     log("Acquiring lock for %s(%d)."%(p,fd))
130     fcntl.flock(fd, fcntl.LOCK_EX)
131     locks.append(fd)
132   log("All locks acquired.")
133
134   serialfile = os.path.join(master, serialname)
135   try:
136     with open(serialfile) as f: serial = int(f.read())
137   except:
138     serial = int(time.time())
139     with open(serialfile, "w") as f: f.write("%d\n"%(serial,))
140   log("Serial is %s."%(serial,))
141
142   log("Populating %s."%(tmpdir_new,))
143   subprocess.check_call(['cp', '-al', os.path.join(master, '.'), tmpdir_new])
144
145   if os.path.exists(cur):
146     log("Removing existing %s."%(cur,))
147     shutil.rmtree(cur)
148
149   log("Renaming %s to %s."%(tmpdir_new, cur))
150   os.rename(tmpdir_new, cur)
151
152   proceed = callout(serial)
153
154   if proceed:
155     log("Moving %s aside."%(live,))
156     os.rename(live, os.path.join(tmpdir_old, 'old'))
157     log("Renaming %s to %s."%(cur, live))
158     os.rename(cur, live)
159     log("Cleaning up.")
160     shutil.rmtree(tmpdir_old)
161     log("Done.")
162   else:
163     log("Aborted.")
164
165
166 try:
167   run_mirror()
168 finally:
169   for p in cleanup_dirs:
170     if os.path.exists(p): shutil.rmtree(p)
171 # vim:set et:
172 # vim:set ts=2:
173 # vim:set shiftwidth=2: