1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager implementation and related classes
24
25 API Stability: semi-stable
26
27 @var LOCAL_IDENTITY: an identity for the manager itself; can be used
28 to compare against to verify that the manager
29 requested an action
30 @type LOCAL_IDENTITY: L{LocalIdentity}
31 """
32
33 import os
34
35 from twisted.internet import reactor, defer
36 from twisted.python import components, failure
37 from twisted.spread import pb
38 from twisted.cred import portal
39 from zope.interface import implements
40
41 from flumotion.common import errors, interfaces, log, registry
42 from flumotion.common import planet, common, dag, messages, reflectcall, server
43 from flumotion.common.i18n import N_, gettexter
44 from flumotion.common.identity import RemoteIdentity, LocalIdentity
45 from flumotion.common.netutils import addressGetHost
46 from flumotion.common.planet import moods
47 from flumotion.configure import configure
48 from flumotion.manager import admin, component, worker, base, config
49 from flumotion.twisted import checkers
50 from flumotion.twisted import portal as fportal
51 from flumotion.project import project
52
53 __all__ = ['ManagerServerFactory', 'Vishnu']
54 __version__ = "$Rev: 8652 $"
55 T_ = gettexter()
56 LOCAL_IDENTITY = LocalIdentity('manager')
57
58
59
60
61
63 """
64 I implement L{twisted.cred.portal.IRealm}.
65 I make sure that when a L{pb.Avatar} is requested through me, the
66 Avatar being returned knows about the mind (client) requesting
67 the Avatar.
68 """
69
70 implements(portal.IRealm)
71
72 logCategory = 'dispatcher'
73
75 """
76 @param computeIdentity: see L{Vishnu.computeIdentity}
77 @type computeIdentity: callable
78 """
79 self._interfaceHeavens = {}
80 self._computeIdentity = computeIdentity
81 self._bouncer = None
82 self._avatarKeycards = {}
83
85 """
86 @param bouncer: the bouncer to authenticate with
87 @type bouncer: L{flumotion.component.bouncers.bouncer}
88 """
89 self._bouncer = bouncer
90
92 """
93 Register a Heaven as managing components with the given interface.
94
95 @type interface: L{twisted.python.components.Interface}
96 @param interface: a component interface to register the heaven with.
97 """
98 assert isinstance(heaven, base.ManagerHeaven)
99
100 self._interfaceHeavens[interface] = heaven
101
102
103
129
130 return (pb.IPerspective, avatar, cleanup)
131
132 def got_error(failure):
133
134
135
136
137
138 reactor.callLater(0, mind.broker.transport.loseConnection)
139 return failure
140
141 if pb.IPerspective not in ifaces:
142 raise errors.NoPerspectiveError(avatarId)
143 if len(ifaces) != 2:
144
145 raise errors.NoPerspectiveError(avatarId)
146 iface = [x for x in ifaces if x != pb.IPerspective][0]
147 if iface not in self._interfaceHeavens:
148 self.warning('unknown interface %r', iface)
149 raise errors.NoPerspectiveError(avatarId)
150
151 heaven = self._interfaceHeavens[iface]
152 klass = heaven.avatarClass
153 host = addressGetHost(mind.broker.transport.getPeer())
154 d = self._computeIdentity(keycard, host)
155 d.addCallback(lambda identity: \
156 klass.makeAvatar(heaven, avatarId, identity, mind))
157 d.addCallbacks(got_avatar, got_error)
158 return d
159
160
162 """
163 I am an object that ties together different objects related to a
164 component. I am used as values in a lookup hash in the vishnu.
165 """
166
168 self.state = None
169 self.id = None
170 self.avatar = None
171 self.jobState = None
172
173
175 """
176 I am the toplevel manager object that knows about all
177 heavens and factories.
178
179 @cvar dispatcher: dispatcher to create avatars
180 @type dispatcher: L{Dispatcher}
181 @cvar workerHeaven: the worker heaven
182 @type workerHeaven: L{worker.WorkerHeaven}
183 @cvar componentHeaven: the component heaven
184 @type componentHeaven: L{component.ComponentHeaven}
185 @cvar adminHeaven: the admin heaven
186 @type adminHeaven: L{admin.AdminHeaven}
187 @cvar configDir: the configuration directory for
188 this Vishnu's manager
189 @type configDir: str
190 """
191
192 implements(server.IServable)
193
194 logCategory = "vishnu"
195
196 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
212 reactor.addSystemEventTrigger('before', 'shutdown', setStopped)
213
214 if configDir is not None:
215 self.configDir = configDir
216 else:
217 self.configDir = os.path.join(configure.configdir,
218 "managers", name)
219
220 self.bouncer = None
221
222 self.bundlerBasket = registry.getRegistry().makeBundlerBasket()
223
224 self._componentMappers = {}
225
226 self.state = planet.ManagerPlanetState()
227 self.state.set('name', name)
228 self.state.set('version', configure.version)
229
230 self.plugs = {}
231
232
233
234 self.portal = fportal.BouncerPortal(self.dispatcher, None)
235
236 self.factory = pb.PBServerFactory(self.portal,
237 unsafeTracebacks=unsafeTracebacks)
238 self.connectionInfo = {}
239 self.setConnectionInfo(None, None, None)
240
242 """Cancel any pending operations in preparation for shutdown.
243
244 This method is mostly useful for unit tests; currently, it is
245 not called during normal operation. Note that the caller is
246 responsible for stopping listening on the port, as the the
247 manager does not have a handle on the twisted port object.
248
249 @returns: A deferred that will fire when the manager has shut
250 down.
251 """
252 if self.bouncer:
253 return self.bouncer.stop()
254 else:
255 return defer.succeed(None)
256
260
262 """Returns the manager's configuration as a string suitable for
263 importing via loadConfiguration().
264 """
265 return config.exportPlanetXml(self.state)
266
281
282 - def addMessage(self, level, mid, format, *args, **kwargs):
283 """
284 Convenience message to construct a message and add it to the
285 planet state. `format' should be marked as translatable in the
286 source with N_, and *args will be stored as format arguments.
287 Keyword arguments are passed on to the message constructor. See
288 L{flumotion.common.messages.Message} for the meanings of the
289 rest of the arguments.
290
291 For example::
292
293 self.addMessage(messages.WARNING, 'foo-warning',
294 N_('The answer is %d'), 42, debug='not really')
295 """
296 self.addMessageObject(messages.Message(level,
297 T_(format, *args),
298 mid=mid, **kwargs))
299
301 """
302 Add a message to the planet state.
303
304 @type message: L{flumotion.common.messages.Message}
305 """
306 self.state.setitem('messages', message.id, message)
307
309 """
310 Clear any messages with the given message ID from the planet
311 state.
312
313 @type mid: message ID, normally a str
314 """
315 if mid in self.state.get('messages'):
316 self.state.delitem('messages', mid)
317
319 """
320 @param identity: L{flumotion.common.identity.Identity}
321 """
322 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug'
323 if socket in self.plugs:
324 for plug in self.plugs[socket]:
325 plug.action(identity, message, args, kw)
326
328 """
329 Compute a suitable identity for a remote host. First looks to
330 see if there is a
331 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug
332 installed on the manager, falling back to user@host.
333
334 The identity is only used in the adminaction interface. An
335 example of its use is when you have an adminaction plug that
336 checks an admin's privileges before actually doing an action;
337 the identity object you use here might store the privileges that
338 the admin has.
339
340 @param keycard: the keycard that the remote host used to log in.
341 @type keycard: L{flumotion.common.keycards.Keycard}
342 @param remoteHost: the ip of the remote host
343 @type remoteHost: str
344
345 @rtype: a deferred that will fire a
346 L{flumotion.common.identity.RemoteIdentity}
347 """
348
349 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug'
350 if socket in self.plugs:
351 for plug in self.plugs[socket]:
352 identity = plug.computeIdentity(keycard, remoteHost)
353 if identity:
354 return identity
355 username = getattr(keycard, 'username', None)
356 return defer.succeed(RemoteIdentity(username, remoteHost))
357
359 """
360 Add a component state for the given component config entry.
361
362 @rtype: L{flumotion.common.planet.ManagerComponentState}
363 """
364
365 self.debug('adding component %s to %s'
366 % (conf.name, parent.get('name')))
367
368 if identity != LOCAL_IDENTITY:
369 self.adminAction(identity, '_addComponent', (conf, parent), {})
370
371 state = planet.ManagerComponentState()
372 state.set('name', conf.name)
373 state.set('type', conf.getType())
374 state.set('workerRequested', conf.worker)
375 state.setMood(moods.sleeping.value)
376 state.set('config', conf.getConfigDict())
377
378 state.set('parent', parent)
379 parent.append('components', state)
380
381 avatarId = conf.getConfigDict()['avatarId']
382
383 self.clearMessage('loadComponent-%s' % avatarId)
384
385 configDict = conf.getConfigDict()
386 projectName = configDict['project']
387 versionTuple = configDict['version']
388
389 projectVersion = None
390 try:
391 projectVersion = project.get(projectName, 'version')
392 except errors.NoProjectError:
393 m = messages.Warning(T_(N_(
394 "This component is configured for Flumotion project '%s', "
395 "but that project is not installed.\n"),
396 projectName))
397 state.append('messages', m)
398
399 if projectVersion:
400 self.debug('project %s, version %r, project version %r' % (
401 projectName, versionTuple, projectVersion))
402 if not common.checkVersionsCompat(
403 versionTuple,
404 common.versionStringToTuple(projectVersion)):
405 m = messages.Warning(T_(N_(
406 "This component is configured for "
407 "Flumotion '%s' version %s, "
408 "but you are running version %s.\n"
409 "Please update the configuration of the component.\n"),
410 projectName, common.versionTupleToString(versionTuple),
411 projectVersion))
412 state.append('messages', m)
413
414
415 m = ComponentMapper()
416 m.state = state
417 m.id = avatarId
418 self._componentMappers[state] = m
419 self._componentMappers[avatarId] = m
420
421 return state
422
424 """
425 Add a new config object into the planet state.
426
427 @returns: a list of all components added
428 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
429 """
430
431 self.debug('syncing up planet state with config')
432 added = []
433
434 def checkNotRunning(comp, parentState):
435 name = comp.getName()
436
437 comps = dict([(x.get('name'), x)
438 for x in parentState.get('components')])
439 runningComps = dict([(x.get('name'), x)
440 for x in parentState.get('components')
441 if x.get('mood') != moods.sleeping.value])
442 if name not in comps:
443
444 return True
445 elif name not in runningComps:
446
447
448 oldComp = comps[name]
449 self.deleteComponent(oldComp)
450 return True
451
452
453
454
455 parent = comps[name].get('parent').get('name')
456 newConf = c.getConfigDict()
457 oldConf = comps[name].get('config')
458
459 if newConf == oldConf:
460 self.debug('%s already has component %s running with '
461 'same configuration', parent, name)
462 self.clearMessage('loadComponent-%s' % oldConf['avatarId'])
463 return False
464
465 self.info('%s already has component %s, but configuration '
466 'not the same -- notifying admin', parent, name)
467
468 diff = config.dictDiff(oldConf, newConf)
469 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new')
470
471 self.addMessage(messages.WARNING,
472 'loadComponent-%s' % oldConf['avatarId'],
473 N_('Could not load component %r into %r: '
474 'a component is already running with '
475 'this name, but has a different '
476 'configuration.'), name, parent,
477 debug=diffMsg)
478 return False
479
480 state = self.state
481 atmosphere = state.get('atmosphere')
482 for c in conf.atmosphere.components.values():
483 if checkNotRunning(c, atmosphere):
484 added.append(self._addComponent(c, atmosphere, identity))
485
486 flows = dict([(x.get('name'), x) for x in state.get('flows')])
487 for f in conf.flows:
488 if f.name in flows:
489 flow = flows[f.name]
490 else:
491 self.info('creating flow %r', f.name)
492 flow = planet.ManagerFlowState(name=f.name, parent=state)
493 state.append('flows', flow)
494
495 for c in f.components.values():
496 if checkNotRunning(c, flow):
497 added.append(self._addComponent(c, flow, identity))
498
499 return added
500
502
503
504 componentsToStart = {}
505 for c in components:
506 workerId = c.get('workerRequested')
507 if not workerId in componentsToStart:
508 componentsToStart[workerId] = []
509 componentsToStart[workerId].append(c)
510 self.debug('_startComponents: componentsToStart %r' %
511 (componentsToStart, ))
512
513 for workerId, componentStates in componentsToStart.items():
514 self._workerCreateComponents(workerId, componentStates)
515
522
524 """
525 Load the configuration from the given XML, merging it on top of
526 the currently running configuration.
527
528 @param file: file to parse, either as an open file object,
529 or as the name of a file to open
530 @type file: str or file
531 @param identity: The identity making this request.. This is used by the
532 adminaction logging mechanism in order to say who is
533 performing the action.
534 @type identity: L{flumotion.common.identity.Identity}
535 """
536 self.debug('loading configuration')
537 mid = 'loadComponent-parse-error'
538 if isinstance(file, str):
539 mid += '-%s' % file
540 try:
541 self.clearMessage(mid)
542 conf = config.PlanetConfigParser(file)
543 conf.parse()
544 return self._loadComponentConfiguration(conf, identity)
545 except errors.ConfigError, e:
546 self.addMessage(messages.WARNING, mid,
547 N_('Invalid component configuration.'),
548 debug=e.args[0])
549 return defer.fail(e)
550 except errors.UnknownComponentError, e:
551 if isinstance(file, str):
552 debug = 'Configuration loaded from file %r' % file
553 else:
554 debug = 'Configuration loaded remotely'
555 self.addMessage(messages.WARNING, mid,
556 N_('Unknown component in configuration: %s.'),
557 e.args[0], debug=debug)
558 return defer.fail(e)
559 except Exception, e:
560 self.addMessage(messages.WARNING, mid,
561 N_('Unknown error while loading configuration.'),
562 debug=log.getExceptionMessage(e))
563 return defer.fail(e)
564
581
587
609
610 def setupErrback(failure):
611 self.warning('Error starting manager bouncer')
612 d.addCallbacks(setupCallback, setupErrback)
613 return d
614
631
632 __pychecker__ = 'maxargs=11'
633
634 - def loadComponent(self, identity, componentType, componentId,
635 componentLabel, properties, workerName,
636 plugs, eaters, isClockMaster, virtualFeeds):
637 """
638 Load a component into the manager configuration.
639
640 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent}
641 for a definition of the argument types.
642 """
643 self.debug('loading %s component %s on %s',
644 componentType, componentId, workerName)
645 parentName, compName = common.parseComponentId(componentId)
646
647 if isClockMaster:
648 raise NotImplementedError("Clock master components are not "
649 "yet supported")
650 if worker is None:
651 raise errors.ConfigError("Component %r needs to specify the"
652 " worker on which it should run"
653 % componentId)
654
655 state = self.state
656 compState = None
657
658 compConf = config.ConfigEntryComponent(compName, parentName,
659 componentType,
660 componentLabel,
661 properties,
662 plugs, workerName,
663 eaters, isClockMaster,
664 None, None, virtualFeeds)
665
666 if compConf.defs.getNeedsSynchronization():
667 raise NotImplementedError("Components that need "
668 "synchronization are not yet "
669 "supported")
670
671 if parentName == 'atmosphere':
672 parentState = state.get('atmosphere')
673 else:
674 flows = dict([(x.get('name'), x) for x in state.get('flows')])
675 if parentName in flows:
676 parentState = flows[parentName]
677 else:
678 self.info('creating flow %r', parentName)
679 parentState = planet.ManagerFlowState(name=parentName,
680 parent=state)
681 state.append('flows', parentState)
682
683 components = [x.get('name') for x in parentState.get('components')]
684 if compName in components:
685 self.debug('%r already has component %r', parentName, compName)
686 raise errors.ComponentAlreadyExistsError(compName)
687
688 compState = self._addComponent(compConf, parentState, identity)
689
690 self._startComponents([compState], identity)
691
692 return compState
693
695 """
696 Create a heaven of the given klass that will send avatars to clients
697 implementing the given medium interface.
698
699 @param interface: the medium interface to create a heaven for
700 @type interface: L{flumotion.common.interfaces.IMedium}
701 @param klass: the type of heaven to create
702 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven}
703 """
704 assert issubclass(interface, interfaces.IMedium)
705 heaven = klass(self)
706 self.dispatcher.registerHeaven(heaven, interface)
707 return heaven
708
710 """
711 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer}
712 """
713 if self.bouncer:
714 self.warning("manager already had a bouncer, setting anyway")
715
716 self.bouncer = bouncer
717 self.portal.bouncer = bouncer
718 self.dispatcher.setBouncer(bouncer)
719
722
751
766
767 def stopLost():
768
769 def gotComponents(comps):
770 return avatarId in comps
771
772 def gotJobRunning(running):
773 if running:
774 self.warning('asked to stop lost component %r, but '
775 'it is still running', avatarId)
776
777
778 msg = "Cannot stop lost component which is still running."
779 raise errors.ComponentMoodError(msg)
780 else:
781 self.debug('component %r seems to be really lost, '
782 'setting to sleeping')
783 componentState.setMood(moods.sleeping.value)
784 componentState.set('moodPending', None)
785 return None
786
787 self.debug('asked to stop a lost component without avatar')
788 workerName = componentState.get('workerRequested')
789 if workerName and self.workerHeaven.hasAvatar(workerName):
790 self.debug('checking if component has job process running')
791 d = self.workerHeaven.getAvatar(workerName).getComponents()
792 d.addCallback(gotComponents)
793 d.addCallback(gotJobRunning)
794 return d
795 else:
796 self.debug('component lacks a worker, setting to sleeping')
797 d = defer.maybeDeferred(gotJobRunning, False)
798 return d
799
800 def stopUnknown():
801 msg = ('asked to stop a component without avatar in mood %s'
802 % moods.get(mood))
803 self.warning(msg)
804 return defer.fail(errors.ComponentMoodError(msg))
805
806 mood = componentState.get('mood')
807 stoppers = {moods.sad.value: stopSad,
808 moods.lost.value: stopLost}
809 return stoppers.get(mood, stopUnknown)()
810
812
813
814 d = componentAvatar.stop()
815
816 return d
817
848
850 """
851 Set the given message on the given component's state.
852 Can be called e.g. by a worker to report on a crashed component.
853 Sets the mood to sad if it is an error message.
854 """
855 if not avatarId in self._componentMappers:
856 self.warning('asked to set a message on non-mapped component %s' %
857 avatarId)
858 return
859
860 m = self._componentMappers[avatarId]
861 m.state.append('messages', message)
862 if message.level == messages.ERROR:
863 self.debug('Error message makes component sad')
864 m.state.setMood(moods.sad.value)
865
866
867
869
870 workerId = workerAvatar.avatarId
871 self.debug('vishnu.workerAttached(): id %s' % workerId)
872
873
874
875
876 components = [c for c in self._getComponentsToCreate()
877 if c.get('workerRequested') in (workerId, None)]
878
879
880
881
882 d = workerAvatar.getComponents()
883
884 def workerAvatarComponentListReceived(workerComponents):
885
886 lostComponents = list([c for c in self.getComponentStates()
887 if c.get('workerRequested') == workerId and \
888 c.get('mood') == moods.lost.value])
889 for comp in workerComponents:
890
891
892 if comp in self._componentMappers:
893 compState = self._componentMappers[comp].state
894 if compState in components:
895 components.remove(compState)
896 if compState in lostComponents:
897 lostComponents.remove(compState)
898
899 for compState in lostComponents:
900 self.info(
901 "Restarting previously lost component %s on worker %s",
902 self._componentMappers[compState].id, workerId)
903
904
905
906 compState.set('moodPending', None)
907 compState.setMood(moods.sleeping.value)
908
909 allComponents = components + lostComponents
910
911 if not allComponents:
912 self.debug(
913 "vishnu.workerAttached(): no components for this worker")
914 return
915
916 self._workerCreateComponents(workerId, allComponents)
917 d.addCallback(workerAvatarComponentListReceived)
918
919 reactor.callLater(0, self.componentHeaven.feedServerAvailable,
920 workerId)
921
923 """
924 Create the list of components on the given worker, sequentially, but
925 in no specific order.
926
927 @param workerId: avatarId of the worker
928 @type workerId: string
929 @param components: components to start
930 @type components: list of
931 L{flumotion.common.planet.ManagerComponentState}
932 """
933 self.debug("_workerCreateComponents: workerId %r, components %r" % (
934 workerId, components))
935
936 if not workerId in self.workerHeaven.avatars:
937 self.debug('worker %s not logged in yet, delaying '
938 'component start' % workerId)
939 return defer.succeed(None)
940
941 workerAvatar = self.workerHeaven.avatars[workerId]
942
943 d = defer.Deferred()
944
945 for c in components:
946 componentType = c.get('type')
947 conf = c.get('config')
948 self.debug('scheduling create of %s on %s'
949 % (conf['avatarId'], workerId))
950 d.addCallback(self._workerCreateComponentDelayed,
951 workerAvatar, c, componentType, conf)
952
953 d.addCallback(lambda result: self.debug(
954 '_workerCreateComponents(): completed setting up create chain'))
955
956
957 self.debug('_workerCreateComponents(): triggering create chain')
958 d.callback(None)
959
960 return d
961
978
979
980
981
983 self.debug('got avatarId %s for state %s' % (result, componentState))
984 m = self._componentMappers[componentState]
985 assert result == m.id, "received id %s is not the expected id %s" % (
986 result, m.id)
987
1011
1013
1014 workerId = workerAvatar.avatarId
1015 self.debug('vishnu.workerDetached(): id %s' % workerId)
1016
1033
1035
1036 m = (self.getComponentMapper(componentAvatar.avatarId)
1037 or ComponentMapper())
1038
1039 m.state = componentAvatar.componentState
1040 m.jobState = componentAvatar.jobState
1041 m.id = componentAvatar.avatarId
1042 m.avatar = componentAvatar
1043
1044 self._componentMappers[m.state] = m
1045 self._componentMappers[m.jobState] = m
1046 self._componentMappers[m.id] = m
1047 self._componentMappers[m.avatar] = m
1048
1050
1051
1052 self.debug('unregisterComponent(%r): cleaning up state' %
1053 componentAvatar)
1054
1055 m = self._componentMappers[componentAvatar]
1056
1057
1058 try:
1059 del self._componentMappers[m.jobState]
1060 except KeyError:
1061 self.warning('Could not remove jobState for %r' % componentAvatar)
1062 m.jobState = None
1063
1064 m.state.set('pid', None)
1065 m.state.set('workerName', None)
1066 m.state.set('moodPending', None)
1067
1068
1069 del self._componentMappers[m.avatar]
1070 m.avatar = None
1071
1073 cList = self.state.getComponents()
1074 self.debug('getComponentStates(): %d components' % len(cList))
1075 for c in cList:
1076 self.log(repr(c))
1077 mood = c.get('mood')
1078 if mood == None:
1079 self.warning('%s has mood None' % c.get('name'))
1080
1081 return cList
1082
1084 """
1085 Empty the planet of the given component.
1086
1087 @returns: a deferred that will fire when all listeners have been
1088 notified of the removal of the component.
1089 """
1090 self.debug('deleting component %r from state', componentState)
1091 c = componentState
1092 if c not in self._componentMappers:
1093 raise errors.UnknownComponentError(c)
1094
1095 flow = componentState.get('parent')
1096 if (c.get('moodPending') != None
1097 or c.get('mood') is not moods.sleeping.value):
1098 raise errors.BusyComponentError(c)
1099
1100 del self._componentMappers[self._componentMappers[c].id]
1101 del self._componentMappers[c]
1102 return flow.remove('components', c)
1103
1105 for flow in self.state.get('flows'):
1106 if flow.get('name') == flowName:
1107 return flow
1108
1110 """
1111 Empty the planet of a flow.
1112
1113 @returns: a deferred that will fire when the flow is removed.
1114 """
1115
1116 flow = self._getFlowByName(flowName)
1117 if flow is None:
1118 raise ValueError("No flow called %s found" % (flowName, ))
1119
1120 components = flow.get('components')
1121 for c in components:
1122
1123 if (c.get('moodPending') != None or
1124 c.get('mood') is not moods.sleeping.value):
1125 raise errors.BusyComponentError(c)
1126 for c in components:
1127 del self._componentMappers[self._componentMappers[c].id]
1128 del self._componentMappers[c]
1129 d = flow.empty()
1130 d.addCallback(lambda _: self.state.remove('flows', flow))
1131 return d
1132
1134 """
1135 Empty the planet of all components, and flows. Also clears all
1136 messages.
1137
1138 @returns: a deferred that will fire when the planet is empty.
1139 """
1140 for mid in self.state.get('messages').keys():
1141 self.clearMessage(mid)
1142
1143
1144 components = self.getComponentStates()
1145
1146
1147 components = [c for c in components
1148 if c.get('moodPending') != None]
1149 if components:
1150 state = components[0]
1151 raise errors.BusyComponentError(
1152 state,
1153 "moodPending is %s" % moods.get(state.get('moodPending')))
1154
1155
1156 components = [c for c in self.getComponentStates()
1157 if c.get('mood') is not moods.sleeping.value]
1158
1159
1160 d = defer.Deferred()
1161
1162 self.debug('need to stop %d components: %r' % (
1163 len(components), components))
1164
1165 for c in components:
1166 avatar = self._componentMappers[c].avatar
1167
1168
1169 if avatar:
1170 d.addCallback(lambda result, a: a.stop(), avatar)
1171 else:
1172 assert (c.get('mood') is moods.sad.value or
1173 c.get('mood') is moods.lost.value)
1174
1175 d.addCallback(self._emptyPlanetCallback)
1176
1177
1178 reactor.callLater(0, d.callback, None)
1179
1180 return d
1181
1183
1184
1185 components = self.getComponentStates()
1186 self.debug('_emptyPlanetCallback: need to delete %d components' %
1187 len(components))
1188
1189 for c in components:
1190 if c.get('mood') is not moods.sleeping.value:
1191 self.warning('Component %s is not sleeping', c.get('name'))
1192
1193 m = self._componentMappers[c]
1194 del self._componentMappers[m.id]
1195 del self._componentMappers[c]
1196
1197
1198 l = self._componentMappers.keys()
1199 if len(l) > 0:
1200 self.warning('mappers still has keys %r' % (repr(l)))
1201
1202 dList = []
1203
1204 dList.append(self.state.get('atmosphere').empty())
1205
1206 for f in self.state.get('flows'):
1207 self.debug('appending deferred for emptying flow %r' % f)
1208 dList.append(f.empty())
1209 self.debug('appending deferred for removing flow %r' % f)
1210 dList.append(self.state.remove('flows', f))
1211 self.debug('appended deferreds')
1212
1213 dl = defer.DeferredList(dList)
1214 return dl
1215
1217 """
1218 @rtype: list of L{flumotion.common.planet.ManagerComponentState}
1219 """
1220
1221 components = self.state.getComponents()
1222
1223
1224
1225
1226
1227 isSleeping = lambda c: c.get('mood') == moods.sleeping.value
1228 components = filter(isSleeping, components)
1229 return components
1230
1232
1233 if not workerName in self.workerHeaven.avatars:
1234 raise errors.ComponentNoWorkerError("Worker %s not logged in?"
1235 % workerName)
1236
1237 return self.workerHeaven.avatars[workerName]
1238
1240 if workerName in self.workerHeaven.avatars:
1241 return self._getWorker(workerName).feedServerPort
1242 return None
1243
1245 """
1246 Requests a number of ports on the worker named workerName. The
1247 ports will be reserved for the use of the caller until
1248 releasePortsOnWorker is called.
1249
1250 @returns: a list of ports as integers
1251 """
1252 return self._getWorker(workerName).reservePorts(numPorts)
1253
1255 """
1256 Tells the manager that the given ports are no longer being used,
1257 and may be returned to the allocation pool.
1258 """
1259 try:
1260 return self._getWorker(workerName).releasePorts(ports)
1261 except errors.ComponentNoWorkerError, e:
1262 self.warning('could not release ports: %r' % e.args)
1263
1265 """
1266 Look up an object mapper given the object.
1267
1268 @rtype: L{ComponentMapper} or None
1269 """
1270 if object in self._componentMappers.keys():
1271 return self._componentMappers[object]
1272
1273 return None
1274
1276 """
1277 Look up an object mapper given the object.
1278
1279 @rtype: L{ComponentMapper} or None
1280 """
1281 if object in self._componentMappers.keys():
1282 return self._componentMappers[object].state
1283
1284 return None
1285
1287 """
1288 Invokes method on all components of a certain type
1289 """
1290
1291 def invokeOnOneComponent(component, methodName, *args, **kwargs):
1292 m = self.getComponentMapper(component)
1293 if not m:
1294 self.warning('Component %s not mapped. Maybe deleted.',
1295 component.get('name'))
1296 raise errors.UnknownComponentError(component)
1297
1298 avatar = m.avatar
1299 if not avatar:
1300 self.warning('No avatar for %s, cannot call remote',
1301 component.get('name'))
1302 raise errors.SleepingComponentError(component)
1303
1304 try:
1305 return avatar.mindCallRemote(methodName, *args, **kwargs)
1306 except Exception, e:
1307 log_message = log.getExceptionMessage(e)
1308 msg = "exception on remote call %s: %s" % (methodName,
1309 log_message)
1310 self.warning(msg)
1311 raise errors.RemoteMethodError(methodName,
1312 log_message)
1313
1314
1315 dl_array = []
1316 for c in self.getComponentStates():
1317 if c.get('type') == componentType and \
1318 (c.get('mood') is moods.happy.value or
1319 c.get('mood') is moods.hungry.value):
1320 self.info("component %r to have %s run", c, methodName)
1321 d = invokeOnOneComponent(c, methodName, *args, **kwargs)
1322 dl_array.append(d)
1323 dl = defer.DeferredList(dl_array)
1324 return dl
1325