1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Servicer object used in service scripts
19 """
20
21 import os
22 import glob
23 import time
24
25 from flumotion.configure import configure
26 from flumotion.common import errors, log
27 from flumotion.common.python import makedirs
28 from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \
29 killPid, termPid, waitPidFile
30
31 __version__ = "$Rev: 8426 $"
32
33
35 """
36 I manage running managers and workers on behalf of a service script.
37 """
38
39 logCategory = 'servicer'
40
41 - def __init__(self, configDir=None, logDir=None, runDir=None):
42 """
43 @type configDir: string
44 @param configDir: overridden path to the configuration directory.
45 @type logDir: string
46 @param logDir: overridden path to the log directory.
47 @type runDir: string
48 @param runDir: overridden path to the run directory.
49 """
50 self.managersDir = os.path.join(configure.configdir, 'managers')
51 self.workersDir = os.path.join(configure.configdir, 'workers')
52 self._overrideDir = {
53 'logdir': logDir,
54 'rundir': runDir,
55 }
56
58
59
60 managers = []
61 workers = []
62
63 if not args:
64 managers = self.getManagers().keys()
65 managers.sort()
66 workers = self.getWorkers()
67 workers.sort()
68 return (managers, workers)
69
70 which = args[0]
71 if which not in ['manager', 'worker']:
72 raise errors.FatalError, 'Please specify either manager or worker'
73
74 if len(args) < 2:
75 raise errors.FatalError, 'Please specify which %s to %s' % (
76 which, command)
77
78 name = args[1]
79 if which == 'manager':
80 managers = self.getManagers()
81 if not name in managers:
82 raise errors.FatalError, 'No manager "%s"' % name
83 managers = [name, ]
84 elif which == 'worker':
85 workers = self.getWorkers()
86 if not name in workers:
87 raise errors.FatalError, 'No worker with name %s' % name
88 workers = [name, ]
89
90 return (managers, workers)
91
93 """
94 Return a list of override directories for configure.configure
95 suitable for appending to a command line.
96 """
97 args = []
98 for key, value in self._overrideDir.items():
99 if value:
100 args.append('--%s=%s' % (key, value))
101 return " ".join(args)
102
104 """
105 @returns: a dictionary of manager names -> flow names
106 """
107 managers = {}
108
109 self.log('getManagers()')
110 if not os.path.exists(self.managersDir):
111 return managers
112
113 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
114 flows = []
115
116 flowsDir = os.path.join(managerDir, 'flows')
117 if os.path.exists(flowsDir):
118 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
119 for flowFile in flowFiles:
120 filename = os.path.split(flowFile)[1]
121 name = filename.split(".xml")[0]
122 flows.append(name)
123 managerName = os.path.split(managerDir)[1]
124 self.log('Adding flows %r to manager %s' % (flows, managerName))
125 managers[managerName] = flows
126 self.log('returning managers: %r' % managers)
127 return managers
128
130 """
131 @returns: a list of worker names
132 """
133 workers = []
134
135 if not os.path.exists(self.workersDir):
136 return workers
137
138 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
139 filename = os.path.split(workerFile)[1]
140 name = filename.split(".xml")[0]
141 workers.append(name)
142 workers.sort()
143 return workers
144
146 """
147 Start processes as given in the args.
148
149 If nothing specified, start all managers and workers.
150 If first argument is "manager", start given manager.
151 If first argument is "worker", start given worker.
152
153 @returns: an exit value reflecting the number of processes that failed
154 to start
155 """
156 (managers, workers) = self._parseManagersWorkers('start', args)
157 self.debug("Start managers %r and workers %r" % (managers, workers))
158 managersDict = self.getManagers()
159 exitvalue = 0
160
161 for name in managers:
162 if not self.startManager(name, managersDict[name]):
163 exitvalue += 1
164 for name in workers:
165 if not self.startWorker(name):
166 exitvalue += 1
167
168 return exitvalue
169
170 - def stop(self, args):
171 """
172 Stop processes as given in the args.
173
174 If nothing specified, stop all managers and workers.
175 If first argument is "manager", stop given manager.
176 If first argument is "worker", stop given worker.
177
178 @returns: an exit value reflecting the number of processes that failed
179 to stop
180 """
181 (managers, workers) = self._parseManagersWorkers('stop', args)
182 self.debug("Stop managers %r and workers %r" % (managers, workers))
183
184 exitvalue = 0
185
186 for name in workers:
187 if not self.stopWorker(name):
188 exitvalue += 1
189 for name in managers:
190 if not self.stopManager(name):
191 exitvalue += 1
192
193 return exitvalue
194
196 """
197 Give status on processes as given in the args.
198 """
199 (managers, workers) = self._parseManagersWorkers('status', args)
200 self.debug("Status managers %r and workers %r" % (managers, workers))
201 for kind, names in [('manager', managers), ('worker', workers)]:
202 for name in names:
203 pid = getPid(kind, name)
204 if not pid:
205 print "%s %s not running" % (kind, name)
206 continue
207 if checkPidRunning(pid):
208 print "%s %s is running with pid %d" % (kind, name, pid)
209 else:
210 print "%s %s dead (stale pid %d)" % (kind, name, pid)
211
213 """
214 Clean up dead process pid files as given in the args.
215 """
216 (managers, workers) = self._parseManagersWorkers('clean', args)
217 self.debug("Clean managers %r and workers %r" % (managers, workers))
218 for kind, names in [('manager', managers), ('worker', workers)]:
219 for name in names:
220 pid = getPid(kind, name)
221 if not pid:
222
223 try:
224 deletePidFile(kind, name)
225 print "deleted bogus pid file for %s %s" % (kind, name)
226 except OSError:
227 print ("failed to delete pid file for %s %s "
228 "- ignoring" % (kind, name))
229 continue
230 if not checkPidRunning(pid):
231 self.debug("Cleaning up stale pid %d for %s %s" % (
232 pid, kind, name))
233 print "deleting stale pid file for %s %s" % (kind, name)
234 deletePidFile(kind, name)
235
237 """
238 Restart running processes as given in the args.
239
240 If nothing specified, condrestart all managers and workers.
241 If first argument is "manager", condrestart given manager.
242 If first argument is "worker", condrestart given worker.
243
244 @returns: an exit value reflecting the number of processes that failed
245 to start
246 """
247 (managers, workers) = self._parseManagersWorkers('condrestart', args)
248 self.debug("condrestart managers %r and workers %r" % (
249 managers, workers))
250 managersDict = self.getManagers()
251 exitvalue = 0
252
253 for kind, names in [('manager', managers), ('worker', workers)]:
254 for name in names:
255 pid = getPid(kind, name)
256 if not pid:
257 continue
258 if checkPidRunning(pid):
259 if kind == 'manager':
260 if not self.stopManager(name):
261 exitvalue += 1
262 continue
263 if not self.startManager(name, managersDict[name]):
264 exitvalue += 1
265 elif kind == 'worker':
266 if not self.stopWorker(name):
267 exitvalue += 1
268 continue
269 if not self.startWorker(name):
270 exitvalue += 1
271 else:
272 print "%s %s dead (stale pid %d)" % (kind, name, pid)
273
274 return exitvalue
275
277
278
279
280
281 """
282 Create a default manager or worker config.
283 """
284 if len(args) == 0:
285 raise errors.FatalError, \
286 "Please specify 'manager' or 'worker' to create."
287 kind = args[0]
288 if len(args) == 1:
289 raise errors.FatalError, \
290 "Please specify name of %s to create." % kind
291 name = args[1]
292
293 port = 7531
294 if len(args) == 3:
295 port = int(args[2])
296
297 if kind == 'manager':
298 self.createManager(name, port)
299 elif kind == 'worker':
300 self.createWorker(name, managerPort=port, randomFeederports=True)
301 else:
302 raise errors.FatalError, \
303 "Please specify 'manager' or 'worker' to create."
304
306 """
307 Create a sample manager.
308
309 @returns: whether or not the config was created.
310 """
311 self.info("Creating manager %s" % name)
312 managerDir = os.path.join(self.managersDir, name)
313 if os.path.exists(managerDir):
314 raise errors.FatalError, \
315 "Manager directory %s already exists" % managerDir
316 makedirs(managerDir)
317
318 planetFile = os.path.join(managerDir, 'planet.xml')
319
320
321 pemFile = os.path.join(configure.configdir, 'default.pem')
322 if not os.path.exists(pemFile):
323
324 retval = os.system("sh %s %s" % (
325 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
326
327
328
329
330
331 if retval != 0:
332 pemFile = 'default.pem'
333
334
335 handle = open(planetFile, 'w')
336 handle.write("""<planet>
337 <manager>
338 <debug>4</debug>
339 <host>localhost</host>
340 <port>%(port)d</port>
341 <transport>ssl</transport>
342 <!-- certificate path can be relative to $sysconfdir/flumotion,
343 or absolute -->
344 <certificate>%(pemFile)s</certificate>
345 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
346 <property name="data"><![CDATA[
347 user:PSfNpHTkpTx1M
348 ]]></property>
349 </component>
350 </manager>
351 </planet>
352 """ % locals())
353 handle.close()
354
355 return True
356
357 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
358 """
359 Create a sample worker.
360
361 @returns: whether or not the config was created.
362 """
363 makedirs(self.workersDir)
364 self.info("Creating worker %s" % name)
365 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
366 if os.path.exists(workerFile):
367 raise errors.FatalError, \
368 "Worker file %s already exists." % workerFile
369
370 feederports = " <!-- <feederports>8600-8639</feederports> -->"
371 if randomFeederports:
372 feederports = ' <feederports random="True" />'
373
374 handle = open(workerFile, 'w')
375 handle.write("""<worker>
376
377 <debug>4</debug>
378
379 <manager>
380 <host>localhost</host>
381 <port>%(managerPort)s</port>
382 </manager>
383
384 <authentication type="plaintext">
385 <username>user</username>
386 <password>test</password>
387 </authentication>
388
389 %(feederports)s
390
391 </worker>
392 """ % locals())
393 handle.close()
394
395 return True
396
398 """
399 Start the manager as configured in the manager directory for the given
400 manager name, together with the given flows.
401
402 @returns: whether or not the manager daemon started
403 """
404 self.info("Starting manager %s" % name)
405 self.debug("Starting manager with flows %r" % flowNames)
406 managerDir = os.path.join(self.managersDir, name)
407 planetFile = os.path.join(managerDir, 'planet.xml')
408 if not os.path.exists(planetFile):
409 raise errors.FatalError, \
410 "Planet file %s does not exist" % planetFile
411 self.info("Loading planet %s" % planetFile)
412
413 flowsDir = os.path.join(managerDir, 'flows')
414 flowFiles = []
415 for flowName in flowNames:
416 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
417 if not os.path.exists(flowFile):
418 raise errors.FatalError, \
419 "Flow file %s does not exist" % flowFile
420 flowFiles.append(flowFile)
421 self.info("Loading flow %s" % flowFile)
422
423 pid = getPid('manager', name)
424 if pid:
425 if checkPidRunning(pid):
426 raise errors.FatalError, \
427 "Manager %s is already running (with pid %d)" % (name, pid)
428 else:
429
430
431 self.warning("Removing stale pid file %d for manager %s",
432 pid, name)
433 deletePidFile('manager', name)
434
435 dirOptions = self._getDirOptions()
436 command = "flumotion-manager %s -D --daemonize-to %s " \
437 "--service-name %s %s %s" % (
438 dirOptions, configure.daemondir, name, planetFile,
439 " ".join(flowFiles))
440 self.debug("starting process %s" % command)
441 retval = self.startProcess(command)
442
443 if retval == 0:
444 self.debug("Waiting for pid for manager %s" % name)
445 pid = waitPidFile('manager', name)
446 if pid:
447 self.info("Started manager %s with pid %d" % (name, pid))
448 return True
449 else:
450 self.warning("manager %s could not start" % name)
451 return False
452
453 self.warning("manager %s could not start (return value %d)" % (
454 name, retval))
455 return False
456
458 """
459 Start the worker as configured in the worker directory for the given
460 worker name.
461
462 @returns: whether or not the worker daemon started
463 """
464 self.info("Starting worker %s" % name)
465 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
466 if not os.path.exists(workerFile):
467 raise errors.FatalError, \
468 "Worker file %s does not exist" % workerFile
469
470 pid = getPid('worker', name)
471 if pid:
472 if checkPidRunning(pid):
473 raise errors.FatalError, \
474 "Worker %s is already running (with pid %d)" % (name, pid)
475 else:
476
477
478 self.warning("Removing stale pid file %d for worker %s",
479 pid, name)
480 deletePidFile('worker', name)
481
482
483 self.info("Loading worker %s" % workerFile)
484
485 dirOptions = self._getDirOptions()
486 command = "flumotion-worker %s -D --daemonize-to %s " \
487 "--service-name %s %s" % (
488 dirOptions, configure.daemondir, name, workerFile)
489 self.debug("Running %s" % command)
490 retval = self.startProcess(command)
491
492 if retval == 0:
493 self.debug("Waiting for pid for worker %s" % name)
494 pid = waitPidFile('worker', name)
495 if pid:
496 self.info("Started worker %s with pid %d" % (name, pid))
497 return True
498 else:
499 self.warning("worker %s could not start" % name)
500 return False
501
502 self.warning("worker %s could not start (return value %d)" % (
503 name, retval))
504 return False
505
507 """
508 Start the given process and block.
509 Returns the exit status of the process, or -1 in case of another error.
510 """
511 status = os.system(command)
512 if os.WIFEXITED(status):
513 retval = os.WEXITSTATUS(status)
514 return retval
515
516
517 return -1
518
546
575
577 """
578 Stop the process with the given pid.
579 Wait until the pid has disappeared.
580 """
581 startClock = time.clock()
582 termClock = startClock + configure.processTermWait
583 killClock = termClock + configure.processKillWait
584
585 self.debug('stopping process with pid %d' % pid)
586 if not termPid(pid):
587 self.warning('No process with pid %d' % pid)
588 return False
589
590
591 while (checkPidRunning(pid)):
592 if time.clock() > termClock:
593 self.warning("Process with pid %d has not responded to TERM " \
594 "for %d seconds, killing" % (pid,
595 configure.processTermWait))
596 killPid(pid)
597
598 termClock = killClock + 1.0
599
600 if time.clock() > killClock:
601 self.warning("Process with pid %d has not responded to KILL " \
602 "for %d seconds, stopping" % (pid,
603 configure.processKillWait))
604 return False
605
606
607
608 return True
609
611 """
612 List all service parts managed.
613 """
614 managers = self.getManagers()
615 for name in managers.keys():
616 flows = managers[name]
617 print "manager %s" % name
618 if flows:
619 for flow in flows:
620 print " flow %s" % flow
621
622 workers = self.getWorkers()
623 for worker in workers:
624 print "worker %s" % worker
625