# Twisted, the Framework of Your Internet
# Copyright (C) 2001 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
from twisted.trial import unittest
import string, random, copy
from twisted.web import server,resource,widgets,guard
from twisted.internet import app,defer
from twisted.cred import service,identity,perspective,authorizer
from twisted.protocols import http,loopback
from twisted.python import log,reflect
class DummyRequest:
uri='http://dummy/'
def __init__(self, postpath, session=None):
self.sitepath = []
self.written = []
self.finished = 0
self.postpath = postpath
self.prepath = []
self.session = None
self.protoSession = session or server.Session(0, self)
self.args = {}
def getSession(self):
if self.session:
return self.session
assert not self.written, "Session cannot be requested after data has been written."
self.session = self.protoSession
return self.session
def write(self, data):
self.written.append(data)
def finish(self):
self.finished = self.finished + 1
def addArg(self, name, value):
self.args[name] = [value]
def setResponseCode(self, code):
assert not self.written, "Response code cannot be set after data has been written: %s." % string.join(self.written, "@@@@")
def setLastModified(self, when):
assert not self.written, "Last-Modified cannot be set after data has been written: %s." % string.join(self.written, "@@@@")
def setETag(self, tag):
assert not self.written, "ETag cannot be set after data has been written: %s." % string.join(self.written, "@@@@")
class SimpleResource(resource.Resource):
def render(self, request):
if http.CACHED in (request.setLastModified(10),
request.setETag('MatchingTag')):
return ''
else:
return "correct"
class SiteTest(unittest.TestCase):
def testSimplestSite(self):
sres1 = SimpleResource()
sres2 = SimpleResource()
sres1.putChild("",sres2)
site = server.Site(sres1)
assert site.getResourceFor(DummyRequest([''])) is sres2, "Got the wrong resource."
class SimpleWidget(widgets.Widget):
def display(self, request):
return ['correct']
class TextDeferred(widgets.Widget):
def __init__(self, text):
self.text = text
def display(self, request):
d = defer.Deferred()
d.callback([self.text])
return [d]
class DeferredWidget(widgets.Widget):
def display(self, request):
d = defer.Deferred()
d.callback(["correct"])
return [d]
class MultiDeferredWidget(widgets.Widget):
def display(self, request):
d = defer.Deferred()
d2 = defer.Deferred()
d3 = defer.Deferred()
d.callback([d2])
d2.callback([d3])
d3.callback(["correct"])
return [d]
class WidgetTest(unittest.TestCase):
def testSimpleRenderSession(self):
w1 = SimpleWidget()
d = DummyRequest([''])
widgets.WidgetResource(w1).render(d)
assert d.finished
assert d.written == ['correct']
def testDeferredRenderSession(self):
w1 = DeferredWidget()
d = DummyRequest([''])
widgets.WidgetResource(w1).render(d)
assert d.finished
assert d.written == ['correct']
def testMultiDeferredRenderSession(self):
w1 = MultiDeferredWidget()
d = DummyRequest([''])
widgets.WidgetResource(w1).render(d)
assert d.finished
assert d.written == ['correct']
class GuardTest(unittest.TestCase):
def setUp(self):
self.auth = authorizer.DefaultAuthorizer()
self.app = app.Application("guard", authorizer=self.auth)
ident = identity.Identity("bob", authorizer=self.auth)
ident.setPassword("joe")
self.auth.addIdentity(ident)
self.svc = service.Service("simple", authorizer=self.auth, serviceParent=self.app)
self.psp = perspective.Perspective('jethro',ident.name)
self.svc.addPerspective(self.psp)
ident.addKeyForPerspective(self.psp)
def testSuccess(self):
g = guard.ResourceGuard(SimpleResource(), self.svc)
d = DummyRequest([])
g.render(d)
assert d.written != ['correct']
assert d.finished
d = DummyRequest([])
d.site = self
d.addArg('username', 'bob')
d.addArg('password', 'joe')
d.addArg('perspective', 'jethro')
d.addArg('__formtype__', reflect.qual(guard.AuthForm))
g.render(d)
assert d.finished, "didn't finish"
assert d.written == ['correct'], "incorrect result: %s" % d.written
# Conditional requests:
# If-None-Match, If-Modified-Since
# make conditional request:
# normal response if condition succeeds
# if condition fails:
# response code
# no body
def httpBody(whole):
return whole.split('\r\n\r\n', 1)[1]
def httpHeader(whole, key):
key = key.lower()
headers = whole.split('\r\n\r\n', 1)[0]
for header in headers.split('\r\n'):
if header.lower().startswith(key):
return header.split(':', 1)[1].strip()
return None
def httpCode(whole):
l1 = whole.split('\r\n', 1)[0]
return int(l1.split()[1])
class ConditionalTest(unittest.TestCase):
"""web.server's handling of conditional requests for cache validation."""
# XXX: test web.distrib.
def setUp(self):
self.resrc = SimpleResource()
self.resrc.putChild('', self.resrc)
self.site = server.Site(self.resrc)
self.site = server.Site(self.resrc)
self.site.logFile = log.logfile
# HELLLLLLLLLLP! This harness is Very Ugly.
self.channel = self.site.buildProtocol(None)
self.transport = http.StringTransport()
self.transport.close = lambda *a, **kw: None
self.transport.disconnecting = lambda *a, **kw: 0
self.transport.getPeer = lambda *a, **kw: "peer"
self.transport.getHost = lambda *a, **kw: "host"
self.channel.makeConnection(self.transport)
for l in ["GET / HTTP/1.1",
"Accept: text/html"]:
self.channel.lineReceived(l)
def test_modified(self):
"""If-Modified-Since cache validator (positive)"""
self.channel.lineReceived("If-Modified-Since: %s"
% http.datetimeToString(1))
self.channel.lineReceived('')
result = self.transport.getvalue()
self.failUnlessEqual(httpCode(result), http.OK)
self.failUnlessEqual(httpBody(result), "correct")
def test_unmodified(self):
"""If-Modified-Since cache validator (negative)"""
self.channel.lineReceived("If-Modified-Since: %s"
% http.datetimeToString(100))
self.channel.lineReceived('')
result = self.transport.getvalue()
self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED)
self.failUnlessEqual(httpBody(result), "")
def test_etagMatchedNot(self):
"""If-None-Match ETag cache validator (positive)"""
self.channel.lineReceived("If-None-Match: unmatchedTag")
self.channel.lineReceived('')
result = self.transport.getvalue()
self.failUnlessEqual(httpCode(result), http.OK)
self.failUnlessEqual(httpBody(result), "correct")
def test_etagMatched(self):
"""If-None-Match ETag cache validator (negative)"""
self.channel.lineReceived("If-None-Match: MatchingTag")
self.channel.lineReceived('')
result = self.transport.getvalue()
self.failUnlessEqual(httpHeader(result, "ETag"), "MatchingTag")
self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED)
self.failUnlessEqual(httpBody(result), "")
|