# Twisted, the Framework of Your Internet
# Copyright (C) 2001-2002 Matthew W. Lefkowitz
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of version 2.1 of the GNU Lesser General Public
# License as published by the Free Software Foundation.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""Test code for policies."""
from twisted.trial import unittest
import time
from twisted.internet import protocol,reactor
from twisted.protocols import policies
class SimpleProtocol(protocol.Protocol):
connected = disconnected = 0
buffer = ""
def connectionMade(self):
self.connected = 1
def connectionLost(self, reason):
self.disconnected = 1
def dataReceived(self, data):
self.buffer += data
class SillyFactory(protocol.ClientFactory):
def __init__(self, p):
self.p = p
def buildProtocol(self, addr):
return self.p
class EchoProtocol(protocol.Protocol):
def pauseProducing(self):
self.paused = time.time()
def resumeProducing(self):
self.resume = time.time()
def stopProducing(self):
pass
def dataReceived(self, data):
self.transport.write(data)
class Server(protocol.ServerFactory):
protocol = EchoProtocol
class ThrottlingTestCase(unittest.TestCase):
def testLimit(self):
server = Server()
c1, c2, c3, c4 = [SimpleProtocol() for i in range(4)]
tServer = policies.ThrottlingFactory(server, 2)
p = reactor.listenTCP(0, tServer)
n = p.getHost()[2]
reactor.iterate(); reactor.iterate()
for c in c1, c2, c3:
reactor.connectTCP("127.0.0.1", n, SillyFactory(c))
reactor.iterate(); reactor.iterate()
self.assertEquals([c.connected for c in c1, c2, c3], [1, 1, 1])
self.assertEquals([c.disconnected for c in c1, c2, c3], [0, 0, 1])
self.assertEquals(len(tServer.protocols.keys()), 2)
# disconnect one protocol and now another should be able to connect
c1.transport.loseConnection()
reactor.iterate(); reactor.iterate()
reactor.iterate(); reactor.iterate()
reactor.connectTCP("127.0.0.1", n, SillyFactory(c4))
reactor.iterate(); reactor.iterate()
self.assertEquals(c4.connected, 1)
self.assertEquals(c4.disconnected, 0)
for c in c2, c4: c.transport.loseConnection()
p.stopListening()
reactor.iterate(); reactor.iterate()
reactor.iterate(); reactor.iterate()
def testWriteLimit(self):
server = Server()
c1, c2 = SimpleProtocol(), SimpleProtocol()
# The throttling factory starts checking bandwidth immediately
now = time.time()
tServer = policies.ThrottlingFactory(server, writeLimit=10)
port = reactor.listenTCP(0, tServer)
n = port.getHost()[2]
reactor.iterate(); reactor.iterate()
for c in c1, c2:
reactor.connectTCP("127.0.0.1", n, SillyFactory(c))
reactor.iterate(); reactor.iterate()
for p in tServer.protocols.keys():
p = p.wrappedProtocol
self.assert_(isinstance(p, EchoProtocol))
p.transport.registerProducer(p, 1)
c1.transport.write("0123456789")
c2.transport.write("abcdefghij")
reactor.iterate(); reactor.iterate()
reactor.iterate(); reactor.iterate()
self.assertEquals(c1.buffer, "0123456789")
self.assertEquals(c2.buffer, "abcdefghij")
self.assertEquals(tServer.writtenThisSecond, 20)
# at this point server should've written 20 bytes, 10 bytes
# above the limit so writing should be paused around 1 second
# from 'now', and resumed a second after that
for p in tServer.protocols.keys():
self.assert_(not hasattr(p.wrappedProtocol, "paused"))
self.assert_(not hasattr(p.wrappedProtocol, "resume"))
while not hasattr(p.wrappedProtocol, "paused"):
reactor.iterate()
self.assertEquals(tServer.writtenThisSecond, 0)
for p in tServer.protocols.keys():
self.assert_(hasattr(p.wrappedProtocol, "paused"))
self.assert_(not hasattr(p.wrappedProtocol, "resume"))
self.assert_(abs(p.wrappedProtocol.paused - now - 1.0) < 0.1)
while not hasattr(p.wrappedProtocol, "resume"):
reactor.iterate()
for p in tServer.protocols.keys():
self.assert_(hasattr(p.wrappedProtocol, "resume"))
self.assert_(abs(p.wrappedProtocol.resume -
p.wrappedProtocol.paused - 1.0) < 0.1)
c1.transport.loseConnection()
c2.transport.loseConnection()
port.stopListening()
for p in tServer.protocols.keys():
p.loseConnection()
reactor.iterate(); reactor.iterate()
def testReadLimit(self):
server = Server()
c1, c2 = SimpleProtocol(), SimpleProtocol()
now = time.time()
tServer = policies.ThrottlingFactory(server, readLimit=10)
port = reactor.listenTCP(0, tServer)
n = port.getHost()[2]
reactor.iterate(); reactor.iterate()
for c in c1, c2:
reactor.connectTCP("127.0.0.1", n, SillyFactory(c))
reactor.iterate(); reactor.iterate()
c1.transport.write("0123456789")
c2.transport.write("abcdefghij")
reactor.iterate(); reactor.iterate()
reactor.iterate(); reactor.iterate()
self.assertEquals(c1.buffer, "0123456789")
self.assertEquals(c2.buffer, "abcdefghij")
self.assertEquals(tServer.readThisSecond, 20)
# we wrote 20 bytes, so after one second it should stop reading
# and then a second later start reading again
while time.time() - now < 1.05:
reactor.iterate()
self.assertEquals(tServer.readThisSecond, 0)
# write some more - data should *not* get written for another second
c1.transport.write("0123456789")
c2.transport.write("abcdefghij")
reactor.iterate(); reactor.iterate()
reactor.iterate(); reactor.iterate()
self.assertEquals(c1.buffer, "0123456789")
self.assertEquals(c2.buffer, "abcdefghij")
self.assertEquals(tServer.readThisSecond, 0)
while time.time() - now < 2.05:
reactor.iterate()
self.assertEquals(c1.buffer, "01234567890123456789")
self.assertEquals(c2.buffer, "abcdefghijabcdefghij")
c1.transport.loseConnection()
c2.transport.loseConnection()
port.stopListening()
for p in tServer.protocols.keys():
p.loseConnection()
reactor.iterate(); reactor.iterate()
|