# -*- test-case-name: twisted.test.test_sister -*-
# Sibling Server
from twisted.spread.pb import Service,Perspective,Error
from twisted.spread.flavors import Referenceable
from twisted.spread.refpath import PathReferenceDirectory
from twisted.internet import defer
from twisted.python import log
from random import choice
class MotherService(Service, Perspective):
"""A `mother' object, managing many sister-servers.
I maintain a list of all "sister" servers who are connected, so that all
servers can connect to each other. I also negotiate which distributed
objects are owned by which sister servers, so that if any sister-server
needs to locate an object it can be made available.
"""
def __init__(self, sharedSecret, serviceName, application=None):
Service.__init__(self, serviceName, application)
Perspective.__init__(self, "mother")
self.addPerspective(self)
# Three states: unlocked, pending lock, locked
self.pendingResources = {} # path: deferred, host, port
self.toLoadOnConnect = [] # [deferred, deferred, ...]
self.lockedResources = {} # path: host, port
self.sisters = [] # [(host, port, reference)]
self.makeIdentity(sharedSecret)
def _cbLoadedResource(self, ticket, resourceType, resourceName, host, port, sisterPerspective):
log.msg( 'mother: loaded resource')
self.lockedResources[(resourceType, resourceName)] = (host, port, sisterPerspective)
return (ticket, host, port, sisterPerspective)
def loadRemoteResource(self, resourceType, resourceName, generateTicket, *args):
"""Request a sister-server to load a resource.
NOTE: caching of ticket resources could be an issue... do we cache tickets??
Return a Deferred which will fire with (ticket, host, port), that will
describe where and how a resource can be located.
"""
if self.lockedResources.has_key( (resourceType, resourceName) ):
(host,port, sisterPerspective)= self.lockedResources[(resourceType, resourceName)]
return defer.succeed( (None, host, port, sisterPerspective) )
log.msg( 'mother: loading resource (%s)' % self.sisters)
if not self.sisters:
defr = defer.Deferred()
self.toLoadOnConnect.append((resourceType, resourceName, generateTicket, args, defr))
return defr
#TODO: better selection mechanism for sister server
(host, port, sisterPerspective) = choice(self.sisters)
d = apply( sisterPerspective.callRemote, ("loadResource", resourceType, resourceName, generateTicket) + args )
d.addCallback(self._cbLoadedResource, resourceType, resourceName, host, port, sisterPerspective)
return d
def loadRemoteResourceFor(self, sisterPerspective, resourceType, resourceName, generateTicket, *args):
"""Use to load a remote resource on a specified sister
service. Dont load it if already loaded on a sister.
"""
# lookup sister info in sisters
found = 0
for host, port, sister in self.sisters:
if sister == sisterPerspective:
found = 1
break
if not found:
raise ("Attempt to load resource for no-existent sister")
if self.lockedResources.has_key( (resourceType, resourceName) ):
raise ("resource %s:%s already loaded on a sister" % (resourceName, resourceType) )
d = apply( sisterPerspective.callRemote, ("loadResource", resourceType, resourceName, generateTicket) + args )
d.addCallback(self._cbLoadedResource, resourceType, resourceName, host, port, sisterPerspective)
return d
def perspective_unloadResource(self, resourceType, resourceName):
"""This is called by sister services to unload a resource
"""
log.msg( "mother: unloading %s/%s" %( resourceType, resourceName ) )
data = self.lockedResources.get( (resourceType, resourceName) )
if not data:
raise "Unable to unload not-loaded resource."
(host, port, perspective) = data
del self.lockedResources[ (resourceType, resourceName) ]
def perspective_publishIP(self, host, port, clientRef):
"""called by sister to set the host and port to publish for clients.
"""
log.msg( "sister attached: %s:%s" % (host, port ) )
self.sisters.append((host, port,clientRef) )
for resourceType, resourceName, generateTicket, args, deferred in self.toLoadOnConnect:
apply(self.loadRemoteResource, (resourceType, resourceName, generateTicket) + args).chainDeferred(deferred)
self.toLoadOnConnect = []
def perspective_callDistributed(self, srcResourceType, srcResourceName, destResourceType, destResourceName, methodName, *args, **kw):
"""Call a remote method on a resources that is managed by the system.
"""
data = self.lockedResources.get( (destResourceType, destResourceName) )
if not data:
raise "Unable to find not-loaded resource."
(host, port, perspective) = data
print "Calling distributed method <%s> for %s:%s" % (methodName, destResourceType, destResourceName)
return perspective.callRemote('callDistributed', srcResourceType, srcResourceName, destResourceType, destResourceName, methodName, args, kw)
def detached(self, client, identity):
for path, (host, port, clientRef) in self.lockedResources.items():
if client == clientRef:
del self.lockedResources[path]
log.msg( "sister detached: %s" % client)
return Perspective.detached(self, client, identity)
|