test_transport.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » test_transport.py
# Copyright (C) 2005-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


from cStringIO import StringIO
import os
import subprocess
import sys
import threading

from bzrlib import (
    errors,
    osutils,
    tests,
    transport,
    urlutils,
    )
from bzrlib.transport import (
    chroot,
    fakenfs,
    local,
    memory,
    pathfilter,
    readonly,
    )
from bzrlib.tests import (
    features,
    test_server,
    )


# TODO: Should possibly split transport-specific tests into their own files.


class TestTransport(tests.TestCase):
    """Test the non transport-concrete class functionality."""

    # FIXME: These tests should use addCleanup() and/or overrideAttr() instead
    # of try/finally -- vila 20100205

    def test__get_set_protocol_handlers(self):
        handlers = transport._get_protocol_handlers()
        self.assertNotEqual([], handlers.keys( ))
        try:
            transport._clear_protocol_handlers()
            self.assertEqual([], transport._get_protocol_handlers().keys())
        finally:
            transport._set_protocol_handlers(handlers)

    def test_get_transport_modules(self):
        handlers = transport._get_protocol_handlers()
        # don't pollute the current handlers
        transport._clear_protocol_handlers()
        class SampleHandler(object):
            """I exist, isnt that enough?"""
        try:
            transport._clear_protocol_handlers()
            transport.register_transport_proto('foo')
            transport.register_lazy_transport('foo',
                                              'bzrlib.tests.test_transport',
                                              'TestTransport.SampleHandler')
            transport.register_transport_proto('bar')
            transport.register_lazy_transport('bar',
                                              'bzrlib.tests.test_transport',
                                              'TestTransport.SampleHandler')
            self.assertEqual([SampleHandler.__module__,
                              'bzrlib.transport.chroot',
                              'bzrlib.transport.pathfilter'],
                             transport._get_transport_modules())
        finally:
            transport._set_protocol_handlers(handlers)

    def test_transport_dependency(self):
        """Transport with missing dependency causes no error"""
        saved_handlers = transport._get_protocol_handlers()
        # don't pollute the current handlers
        transport._clear_protocol_handlers()
        try:
            transport.register_transport_proto('foo')
            transport.register_lazy_transport(
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
            try:
                transport.get_transport('foo://fooserver/foo')
            except errors.UnsupportedProtocol, e:
                e_str = str(e)
                self.assertEquals('Unsupported protocol'
                                  ' for url "foo://fooserver/foo":'
                                  ' Unable to import library "some_lib":'
                                  ' testing missing dependency', str(e))
            else:
                self.fail('Did not raise UnsupportedProtocol')
        finally:
            # restore original values
            transport._set_protocol_handlers(saved_handlers)

    def test_transport_fallback(self):
        """Transport with missing dependency causes no error"""
        saved_handlers = transport._get_protocol_handlers()
        try:
            transport._clear_protocol_handlers()
            transport.register_transport_proto('foo')
            transport.register_lazy_transport(
                'foo', 'bzrlib.tests.test_transport', 'BackupTransportHandler')
            transport.register_lazy_transport(
                'foo', 'bzrlib.tests.test_transport', 'BadTransportHandler')
            t = transport.get_transport('foo://fooserver/foo')
            self.assertTrue(isinstance(t, BackupTransportHandler))
        finally:
            transport._set_protocol_handlers(saved_handlers)

    def test_ssh_hints(self):
        """Transport ssh:// should raise an error pointing out bzr+ssh://"""
        try:
            transport.get_transport('ssh://fooserver/foo')
        except errors.UnsupportedProtocol, e:
            e_str = str(e)
            self.assertEquals('Unsupported protocol'
                              ' for url "ssh://fooserver/foo":'
                              ' bzr supports bzr+ssh to operate over ssh,'
                              ' use "bzr+ssh://fooserver/foo".',
                              str(e))
        else:
            self.fail('Did not raise UnsupportedProtocol')

    def test_LateReadError(self):
        """The LateReadError helper should raise on read()."""
        a_file = transport.LateReadError('a path')
        try:
            a_file.read()
        except errors.ReadError, error:
            self.assertEqual('a path', error.path)
        self.assertRaises(errors.ReadError, a_file.read, 40)
        a_file.close()

    def test__combine_paths(self):
        t = transport.Transport('/')
        self.assertEqual('/home/sarah/project/foo',
                         t._combine_paths('/home/sarah', 'project/foo'))
        self.assertEqual('/etc',
                         t._combine_paths('/home/sarah', '../../etc'))
        self.assertEqual('/etc',
                         t._combine_paths('/home/sarah', '../../../etc'))
        self.assertEqual('/etc',
                         t._combine_paths('/home/sarah', '/etc'))

    def test_local_abspath_non_local_transport(self):
        # the base implementation should throw
        t = memory.MemoryTransport()
        e = self.assertRaises(errors.NotLocalUrl, t.local_abspath, 't')
        self.assertEqual('memory:///t is not a local path.', str(e))


class TestCoalesceOffsets(tests.TestCase):

    def check(self, expected, offsets, limit=0, max_size=0, fudge=0):
        coalesce = transport.Transport._coalesce_offsets
        exp = [transport._CoalescedOffset(*x) for x in expected]
        out = list(coalesce(offsets, limit=limit, fudge_factor=fudge,
                            max_size=max_size))
        self.assertEqual(exp, out)

    def test_coalesce_empty(self):
        self.check([], [])

    def test_coalesce_simple(self):
        self.check([(0, 10, [(0, 10)])], [(0, 10)])

    def test_coalesce_unrelated(self):
        self.check([(0, 10, [(0, 10)]),
                    (20, 10, [(0, 10)]),
                   ], [(0, 10), (20, 10)])

    def test_coalesce_unsorted(self):
        self.check([(20, 10, [(0, 10)]),
                    (0, 10, [(0, 10)]),
                   ], [(20, 10), (0, 10)])

    def test_coalesce_nearby(self):
        self.check([(0, 20, [(0, 10), (10, 10)])],
                   [(0, 10), (10, 10)])

    def test_coalesce_overlapped(self):
        self.assertRaises(ValueError,
            self.check, [(0, 15, [(0, 10), (5, 10)])],
                        [(0, 10), (5, 10)])

    def test_coalesce_limit(self):
        self.check([(10, 50, [(0, 10), (10, 10), (20, 10),
                              (30, 10), (40, 10)]),
                    (60, 50, [(0, 10), (10, 10), (20, 10),
                              (30, 10), (40, 10)]),
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
                       (50, 10), (60, 10), (70, 10), (80, 10),
                       (90, 10), (100, 10)],
                    limit=5)

    def test_coalesce_no_limit(self):
        self.check([(10, 100, [(0, 10), (10, 10), (20, 10),
                               (30, 10), (40, 10), (50, 10),
                               (60, 10), (70, 10), (80, 10),
                               (90, 10)]),
                   ], [(10, 10), (20, 10), (30, 10), (40, 10),
                       (50, 10), (60, 10), (70, 10), (80, 10),
                       (90, 10), (100, 10)])

    def test_coalesce_fudge(self):
        self.check([(10, 30, [(0, 10), (20, 10)]),
                    (100, 10, [(0, 10),]),
                   ], [(10, 10), (30, 10), (100, 10)],
                   fudge=10
                  )
    def test_coalesce_max_size(self):
        self.check([(10, 20, [(0, 10), (10, 10)]),
                    (30, 50, [(0, 50)]),
                    # If one range is above max_size, it gets its own coalesced
                    # offset
                    (100, 80, [(0, 80),]),],
                   [(10, 10), (20, 10), (30, 50), (100, 80)],
                   max_size=50
                  )

    def test_coalesce_no_max_size(self):
        self.check([(10, 170, [(0, 10), (10, 10), (20, 50), (70, 100)]),],
                   [(10, 10), (20, 10), (30, 50), (80, 100)],
                  )

    def test_coalesce_default_limit(self):
        # By default we use a 100MB max size.
        ten_mb = 10*1024*1024
        self.check([(0, 10*ten_mb, [(i*ten_mb, ten_mb) for i in range(10)]),
                    (10*ten_mb, ten_mb, [(0, ten_mb)])],
                   [(i*ten_mb, ten_mb) for i in range(11)])
        self.check([(0, 11*ten_mb, [(i*ten_mb, ten_mb) for i in range(11)]),],
                   [(i*ten_mb, ten_mb) for i in range(11)],
                   max_size=1*1024*1024*1024)


class TestMemoryServer(tests.TestCase):

    def test_create_server(self):
        server = memory.MemoryServer()
        server.start_server()
        url = server.get_url()
        self.assertTrue(url in transport.transport_list_registry)
        t = transport.get_transport(url)
        del t
        server.stop_server()
        self.assertFalse(url in transport.transport_list_registry)
        self.assertRaises(errors.UnsupportedProtocol,
                          transport.get_transport, url)


class TestMemoryTransport(tests.TestCase):

    def test_get_transport(self):
        memory.MemoryTransport()

    def test_clone(self):
        t = memory.MemoryTransport()
        self.assertTrue(isinstance(t, memory.MemoryTransport))
        self.assertEqual("memory:///", t.clone("/").base)

    def test_abspath(self):
        t = memory.MemoryTransport()
        self.assertEqual("memory:///relpath", t.abspath('relpath'))

    def test_abspath_of_root(self):
        t = memory.MemoryTransport()
        self.assertEqual("memory:///", t.base)
        self.assertEqual("memory:///", t.abspath('/'))

    def test_abspath_of_relpath_starting_at_root(self):
        t = memory.MemoryTransport()
        self.assertEqual("memory:///foo", t.abspath('/foo'))

    def test_append_and_get(self):
        t = memory.MemoryTransport()
        t.append_bytes('path', 'content')
        self.assertEqual(t.get('path').read(), 'content')
        t.append_file('path', StringIO('content'))
        self.assertEqual(t.get('path').read(), 'contentcontent')

    def test_put_and_get(self):
        t = memory.MemoryTransport()
        t.put_file('path', StringIO('content'))
        self.assertEqual(t.get('path').read(), 'content')
        t.put_bytes('path', 'content')
        self.assertEqual(t.get('path').read(), 'content')

    def test_append_without_dir_fails(self):
        t = memory.MemoryTransport()
        self.assertRaises(errors.NoSuchFile,
                          t.append_bytes, 'dir/path', 'content')

    def test_put_without_dir_fails(self):
        t = memory.MemoryTransport()
        self.assertRaises(errors.NoSuchFile,
                          t.put_file, 'dir/path', StringIO('content'))

    def test_get_missing(self):
        transport = memory.MemoryTransport()
        self.assertRaises(errors.NoSuchFile, transport.get, 'foo')

    def test_has_missing(self):
        t = memory.MemoryTransport()
        self.assertEquals(False, t.has('foo'))

    def test_has_present(self):
        t = memory.MemoryTransport()
        t.append_bytes('foo', 'content')
        self.assertEquals(True, t.has('foo'))

    def test_list_dir(self):
        t = memory.MemoryTransport()
        t.put_bytes('foo', 'content')
        t.mkdir('dir')
        t.put_bytes('dir/subfoo', 'content')
        t.put_bytes('dirlike', 'content')

        self.assertEquals(['dir', 'dirlike', 'foo'], sorted(t.list_dir('.')))
        self.assertEquals(['subfoo'], sorted(t.list_dir('dir')))

    def test_mkdir(self):
        t = memory.MemoryTransport()
        t.mkdir('dir')
        t.append_bytes('dir/path', 'content')
        self.assertEqual(t.get('dir/path').read(), 'content')

    def test_mkdir_missing_parent(self):
        t = memory.MemoryTransport()
        self.assertRaises(errors.NoSuchFile, t.mkdir, 'dir/dir')

    def test_mkdir_twice(self):
        t = memory.MemoryTransport()
        t.mkdir('dir')
        self.assertRaises(errors.FileExists, t.mkdir, 'dir')

    def test_parameters(self):
        t = memory.MemoryTransport()
        self.assertEqual(True, t.listable())
        self.assertEqual(False, t.is_readonly())

    def test_iter_files_recursive(self):
        t = memory.MemoryTransport()
        t.mkdir('dir')
        t.put_bytes('dir/foo', 'content')
        t.put_bytes('dir/bar', 'content')
        t.put_bytes('bar', 'content')
        paths = set(t.iter_files_recursive())
        self.assertEqual(set(['dir/foo', 'dir/bar', 'bar']), paths)

    def test_stat(self):
        t = memory.MemoryTransport()
        t.put_bytes('foo', 'content')
        t.put_bytes('bar', 'phowar')
        self.assertEqual(7, t.stat('foo').st_size)
        self.assertEqual(6, t.stat('bar').st_size)


class ChrootDecoratorTransportTest(tests.TestCase):
    """Chroot decoration specific tests."""

    def test_abspath(self):
        # The abspath is always relative to the chroot_url.
        server = chroot.ChrootServer(
            transport.get_transport('memory:///foo/bar/'))
        self.start_server(server)
        t = transport.get_transport(server.get_url())
        self.assertEqual(server.get_url(), t.abspath('/'))

        subdir_t = t.clone('subdir')
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))

    def test_clone(self):
        server = chroot.ChrootServer(
            transport.get_transport('memory:///foo/bar/'))
        self.start_server(server)
        t = transport.get_transport(server.get_url())
        # relpath from root and root path are the same
        relpath_cloned = t.clone('foo')
        abspath_cloned = t.clone('/foo')
        self.assertEqual(server, relpath_cloned.server)
        self.assertEqual(server, abspath_cloned.server)

    def test_chroot_url_preserves_chroot(self):
        """Calling get_transport on a chroot transport's base should produce a
        transport with exactly the same behaviour as the original chroot
        transport.

        This is so that it is not possible to escape a chroot by doing::
            url = chroot_transport.base
            parent_url = urlutils.join(url, '..')
            new_t = transport.get_transport(parent_url)
        """
        server = chroot.ChrootServer(
            transport.get_transport('memory:///path/subpath'))
        self.start_server(server)
        t = transport.get_transport(server.get_url())
        new_t = transport.get_transport(t.base)
        self.assertEqual(t.server, new_t.server)
        self.assertEqual(t.base, new_t.base)

    def test_urljoin_preserves_chroot(self):
        """Using urlutils.join(url, '..') on a chroot URL should not produce a
        URL that escapes the intended chroot.

        This is so that it is not possible to escape a chroot by doing::
            url = chroot_transport.base
            parent_url = urlutils.join(url, '..')
            new_t = transport.get_transport(parent_url)
        """
        server = chroot.ChrootServer(transport.get_transport('memory:///path/'))
        self.start_server(server)
        t = transport.get_transport(server.get_url())
        self.assertRaises(
            errors.InvalidURLJoin, urlutils.join, t.base, '..')


class TestChrootServer(tests.TestCase):

    def test_construct(self):
        backing_transport = memory.MemoryTransport()
        server = chroot.ChrootServer(backing_transport)
        self.assertEqual(backing_transport, server.backing_transport)

    def test_setUp(self):
        backing_transport = memory.MemoryTransport()
        server = chroot.ChrootServer(backing_transport)
        server.start_server()
        try:
            self.assertTrue(server.scheme
                            in transport._get_protocol_handlers().keys())
        finally:
            server.stop_server()

    def test_stop_server(self):
        backing_transport = memory.MemoryTransport()
        server = chroot.ChrootServer(backing_transport)
        server.start_server()
        server.stop_server()
        self.assertFalse(server.scheme
                         in transport._get_protocol_handlers().keys())

    def test_get_url(self):
        backing_transport = memory.MemoryTransport()
        server = chroot.ChrootServer(backing_transport)
        server.start_server()
        try:
            self.assertEqual('chroot-%d:///' % id(server), server.get_url())
        finally:
            server.stop_server()


class PathFilteringDecoratorTransportTest(tests.TestCase):
    """Pathfilter decoration specific tests."""

    def test_abspath(self):
        # The abspath is always relative to the base of the backing transport.
        server = pathfilter.PathFilteringServer(
            transport.get_transport('memory:///foo/bar/'),
            lambda x: x)
        server.start_server()
        t = transport.get_transport(server.get_url())
        self.assertEqual(server.get_url(), t.abspath('/'))

        subdir_t = t.clone('subdir')
        self.assertEqual(server.get_url(), subdir_t.abspath('/'))
        server.stop_server()

    def make_pf_transport(self, filter_func=None):
        """Make a PathFilteringTransport backed by a MemoryTransport.
        
        :param filter_func: by default this will be a no-op function.  Use this
            parameter to override it."""
        if filter_func is None:
            filter_func = lambda x: x
        server = pathfilter.PathFilteringServer(
            transport.get_transport('memory:///foo/bar/'), filter_func)
        server.start_server()
        self.addCleanup(server.stop_server)
        return transport.get_transport(server.get_url())

    def test__filter(self):
        # _filter (with an identity func as filter_func) always returns
        # paths relative to the base of the backing transport.
        t = self.make_pf_transport()
        self.assertEqual('foo', t._filter('foo'))
        self.assertEqual('foo/bar', t._filter('foo/bar'))
        self.assertEqual('', t._filter('..'))
        self.assertEqual('', t._filter('/'))
        # The base of the pathfiltering transport is taken into account too.
        t = t.clone('subdir1/subdir2')
        self.assertEqual('subdir1/subdir2/foo', t._filter('foo'))
        self.assertEqual('subdir1/subdir2/foo/bar', t._filter('foo/bar'))
        self.assertEqual('subdir1', t._filter('..'))
        self.assertEqual('', t._filter('/'))

    def test_filter_invocation(self):
        filter_log = []
        def filter(path):
            filter_log.append(path)
            return path
        t = self.make_pf_transport(filter)
        t.has('abc')
        self.assertEqual(['abc'], filter_log)
        del filter_log[:]
        t.clone('abc').has('xyz')
        self.assertEqual(['abc/xyz'], filter_log)
        del filter_log[:]
        t.has('/abc')
        self.assertEqual(['abc'], filter_log)

    def test_clone(self):
        t = self.make_pf_transport()
        # relpath from root and root path are the same
        relpath_cloned = t.clone('foo')
        abspath_cloned = t.clone('/foo')
        self.assertEqual(t.server, relpath_cloned.server)
        self.assertEqual(t.server, abspath_cloned.server)

    def test_url_preserves_pathfiltering(self):
        """Calling get_transport on a pathfiltered transport's base should
        produce a transport with exactly the same behaviour as the original
        pathfiltered transport.

        This is so that it is not possible to escape (accidentally or
        otherwise) the filtering by doing::
            url = filtered_transport.base
            parent_url = urlutils.join(url, '..')
            new_t = transport.get_transport(parent_url)
        """
        t = self.make_pf_transport()
        new_t = transport.get_transport(t.base)
        self.assertEqual(t.server, new_t.server)
        self.assertEqual(t.base, new_t.base)


class ReadonlyDecoratorTransportTest(tests.TestCase):
    """Readonly decoration specific tests."""

    def test_local_parameters(self):
        # connect to . in readonly mode
        t = readonly.ReadonlyTransportDecorator('readonly+.')
        self.assertEqual(True, t.listable())
        self.assertEqual(True, t.is_readonly())

    def test_http_parameters(self):
        from bzrlib.tests.http_server import HttpServer
        # connect to '.' via http which is not listable
        server = HttpServer()
        self.start_server(server)
        t = transport.get_transport('readonly+' + server.get_url())
        self.failUnless(isinstance(t, readonly.ReadonlyTransportDecorator))
        self.assertEqual(False, t.listable())
        self.assertEqual(True, t.is_readonly())


class FakeNFSDecoratorTests(tests.TestCaseInTempDir):
    """NFS decorator specific tests."""

    def get_nfs_transport(self, url):
        # connect to url with nfs decoration
        return fakenfs.FakeNFSTransportDecorator('fakenfs+' + url)

    def test_local_parameters(self):
        # the listable and is_readonly parameters
        # are not changed by the fakenfs decorator
        t = self.get_nfs_transport('.')
        self.assertEqual(True, t.listable())
        self.assertEqual(False, t.is_readonly())

    def test_http_parameters(self):
        # the listable and is_readonly parameters
        # are not changed by the fakenfs decorator
        from bzrlib.tests.http_server import HttpServer
        # connect to '.' via http which is not listable
        server = HttpServer()
        self.start_server(server)
        t = self.get_nfs_transport(server.get_url())
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)
        self.assertEqual(False, t.listable())
        self.assertEqual(True, t.is_readonly())

    def test_fakenfs_server_default(self):
        # a FakeNFSServer() should bring up a local relpath server for itself
        server = test_server.FakeNFSServer()
        self.start_server(server)
        # the url should be decorated appropriately
        self.assertStartsWith(server.get_url(), 'fakenfs+')
        # and we should be able to get a transport for it
        t = transport.get_transport(server.get_url())
        # which must be a FakeNFSTransportDecorator instance.
        self.assertIsInstance(t, fakenfs.FakeNFSTransportDecorator)

    def test_fakenfs_rename_semantics(self):
        # a FakeNFS transport must mangle the way rename errors occur to
        # look like NFS problems.
        t = self.get_nfs_transport('.')
        self.build_tree(['from/', 'from/foo', 'to/', 'to/bar'],
                        transport=t)
        self.assertRaises(errors.ResourceBusy, t.rename, 'from', 'to')


class FakeVFATDecoratorTests(tests.TestCaseInTempDir):
    """Tests for simulation of VFAT restrictions"""

    def get_vfat_transport(self, url):
        """Return vfat-backed transport for test directory"""
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
        return FakeVFATTransportDecorator('vfat+' + url)

    def test_transport_creation(self):
        from bzrlib.transport.fakevfat import FakeVFATTransportDecorator
        t = self.get_vfat_transport('.')
        self.assertIsInstance(t, FakeVFATTransportDecorator)

    def test_transport_mkdir(self):
        t = self.get_vfat_transport('.')
        t.mkdir('HELLO')
        self.assertTrue(t.has('hello'))
        self.assertTrue(t.has('Hello'))

    def test_forbidden_chars(self):
        t = self.get_vfat_transport('.')
        self.assertRaises(ValueError, t.has, "<NU>")


class BadTransportHandler(transport.Transport):
    def __init__(self, base_url):
        raise errors.DependencyNotPresent('some_lib',
                                          'testing missing dependency')


class BackupTransportHandler(transport.Transport):
    """Test transport that works as a backup for the BadTransportHandler"""
    pass


class TestTransportImplementation(tests.TestCaseInTempDir):
    """Implementation verification for transports.

    To verify a transport we need a server factory, which is a callable
    that accepts no parameters and returns an implementation of
    bzrlib.transport.Server.

    That Server is then used to construct transport instances and test
    the transport via loopback activity.

    Currently this assumes that the Transport object is connected to the
    current working directory.  So that whatever is done
    through the transport, should show up in the working
    directory, and vice-versa. This is a bug, because its possible to have
    URL schemes which provide access to something that may not be
    result in storage on the local disk, i.e. due to file system limits, or
    due to it being a database or some other non-filesystem tool.

    This also tests to make sure that the functions work with both
    generators and lists (assuming iter(list) is effectively a generator)
    """

    def setUp(self):
        super(TestTransportImplementation, self).setUp()
        self._server = self.transport_server()
        self.start_server(self._server)

    def get_transport(self, relpath=None):
        """Return a connected transport to the local directory.

        :param relpath: a path relative to the base url.
        """
        base_url = self._server.get_url()
        url = self._adjust_url(base_url, relpath)
        # try getting the transport via the regular interface:
        t = transport.get_transport(url)
        # vila--20070607 if the following are commented out the test suite
        # still pass. Is this really still needed or was it a forgotten
        # temporary fix ?
        if not isinstance(t, self.transport_class):
            # we did not get the correct transport class type. Override the
            # regular connection behaviour by direct construction.
            t = self.transport_class(url)
        return t


class TestLocalTransports(tests.TestCase):

    def test_get_transport_from_abspath(self):
        here = osutils.abspath('.')
        t = transport.get_transport(here)
        self.assertIsInstance(t, local.LocalTransport)
        self.assertEquals(t.base, urlutils.local_path_to_url(here) + '/')

    def test_get_transport_from_relpath(self):
        here = osutils.abspath('.')
        t = transport.get_transport('.')
        self.assertIsInstance(t, local.LocalTransport)
        self.assertEquals(t.base, urlutils.local_path_to_url('.') + '/')

    def test_get_transport_from_local_url(self):
        here = osutils.abspath('.')
        here_url = urlutils.local_path_to_url(here) + '/'
        t = transport.get_transport(here_url)
        self.assertIsInstance(t, local.LocalTransport)
        self.assertEquals(t.base, here_url)

    def test_local_abspath(self):
        here = osutils.abspath('.')
        t = transport.get_transport(here)
        self.assertEquals(t.local_abspath(''), here)


class TestWin32LocalTransport(tests.TestCase):

    def test_unc_clone_to_root(self):
        # Win32 UNC path like \\HOST\path
        # clone to root should stop at least at \\HOST part
        # not on \\
        t = local.EmulatedWin32LocalTransport('file://HOST/path/to/some/dir/')
        for i in xrange(4):
            t = t.clone('..')
        self.assertEquals(t.base, 'file://HOST/')
        # make sure we reach the root
        t = t.clone('..')
        self.assertEquals(t.base, 'file://HOST/')


class TestConnectedTransport(tests.TestCase):
    """Tests for connected to remote server transports"""

    def test_parse_url(self):
        t = transport.ConnectedTransport(
            'http://simple.example.com/home/source')
        self.assertEquals(t._host, 'simple.example.com')
        self.assertEquals(t._port, None)
        self.assertEquals(t._path, '/home/source/')
        self.failUnless(t._user is None)
        self.failUnless(t._password is None)

        self.assertEquals(t.base, 'http://simple.example.com/home/source/')

    def test_parse_url_with_at_in_user(self):
        # Bug 228058
        t = transport.ConnectedTransport('ftp://user@host.com@www.host.com/')
        self.assertEquals(t._user, 'user@host.com')

    def test_parse_quoted_url(self):
        t = transport.ConnectedTransport(
            'http://ro%62ey:h%40t@ex%41mple.com:2222/path')
        self.assertEquals(t._host, 'exAmple.com')
        self.assertEquals(t._port, 2222)
        self.assertEquals(t._user, 'robey')
        self.assertEquals(t._password, 'h@t')
        self.assertEquals(t._path, '/path/')

        # Base should not keep track of the password
        self.assertEquals(t.base, 'http://robey@exAmple.com:2222/path/')

    def test_parse_invalid_url(self):
        self.assertRaises(errors.InvalidURL,
                          transport.ConnectedTransport,
                          'sftp://lily.org:~janneke/public/bzr/gub')

    def test_relpath(self):
        t = transport.ConnectedTransport('sftp://user@host.com/abs/path')

        self.assertEquals(t.relpath('sftp://user@host.com/abs/path/sub'), 'sub')
        self.assertRaises(errors.PathNotChild, t.relpath,
                          'http://user@host.com/abs/path/sub')
        self.assertRaises(errors.PathNotChild, t.relpath,
                          'sftp://user2@host.com/abs/path/sub')
        self.assertRaises(errors.PathNotChild, t.relpath,
                          'sftp://user@otherhost.com/abs/path/sub')
        self.assertRaises(errors.PathNotChild, t.relpath,
                          'sftp://user@host.com:33/abs/path/sub')
        # Make sure it works when we don't supply a username
        t = transport.ConnectedTransport('sftp://host.com/abs/path')
        self.assertEquals(t.relpath('sftp://host.com/abs/path/sub'), 'sub')

        # Make sure it works when parts of the path will be url encoded
        t = transport.ConnectedTransport('sftp://host.com/dev/%path')
        self.assertEquals(t.relpath('sftp://host.com/dev/%path/sub'), 'sub')

    def test_connection_sharing_propagate_credentials(self):
        t = transport.ConnectedTransport('ftp://user@host.com/abs/path')
        self.assertEquals('user', t._user)
        self.assertEquals('host.com', t._host)
        self.assertIs(None, t._get_connection())
        self.assertIs(None, t._password)
        c = t.clone('subdir')
        self.assertIs(None, c._get_connection())
        self.assertIs(None, t._password)

        # Simulate the user entering a password
        password = 'secret'
        connection = object()
        t._set_connection(connection, password)
        self.assertIs(connection, t._get_connection())
        self.assertIs(password, t._get_credentials())
        self.assertIs(connection, c._get_connection())
        self.assertIs(password, c._get_credentials())

        # credentials can be updated
        new_password = 'even more secret'
        c._update_credentials(new_password)
        self.assertIs(connection, t._get_connection())
        self.assertIs(new_password, t._get_credentials())
        self.assertIs(connection, c._get_connection())
        self.assertIs(new_password, c._get_credentials())


class TestReusedTransports(tests.TestCase):
    """Tests for transport reuse"""

    def test_reuse_same_transport(self):
        possible_transports = []
        t1 = transport.get_transport('http://foo/',
                                     possible_transports=possible_transports)
        self.assertEqual([t1], possible_transports)
        t2 = transport.get_transport('http://foo/',
                                     possible_transports=[t1])
        self.assertIs(t1, t2)

        # Also check that final '/' are handled correctly
        t3 = transport.get_transport('http://foo/path/')
        t4 = transport.get_transport('http://foo/path',
                                     possible_transports=[t3])
        self.assertIs(t3, t4)

        t5 = transport.get_transport('http://foo/path')
        t6 = transport.get_transport('http://foo/path/',
                                     possible_transports=[t5])
        self.assertIs(t5, t6)

    def test_don_t_reuse_different_transport(self):
        t1 = transport.get_transport('http://foo/path')
        t2 = transport.get_transport('http://bar/path',
                                     possible_transports=[t1])
        self.assertIsNot(t1, t2)


class TestTransportTrace(tests.TestCase):

    def test_get(self):
        t = transport.get_transport('trace+memory://')
        self.assertIsInstance(t, bzrlib.transport.trace.TransportTraceDecorator)

    def test_clone_preserves_activity(self):
        t = transport.get_transport('trace+memory://')
        t2 = t.clone('.')
        self.assertTrue(t is not t2)
        self.assertTrue(t._activity is t2._activity)

    # the following specific tests are for the operations that have made use of
    # logging in tests; we could test every single operation but doing that
    # still won't cause a test failure when the top level Transport API
    # changes; so there is little return doing that.
    def test_get(self):
        t = transport.get_transport('trace+memory:///')
        t.put_bytes('foo', 'barish')
        t.get('foo')
        expected_result = []
        # put_bytes records the bytes, not the content to avoid memory
        # pressure.
        expected_result.append(('put_bytes', 'foo', 6, None))
        # get records the file name only.
        expected_result.append(('get', 'foo'))
        self.assertEqual(expected_result, t._activity)

    def test_readv(self):
        t = transport.get_transport('trace+memory:///')
        t.put_bytes('foo', 'barish')
        list(t.readv('foo', [(0, 1), (3, 2)],
                     adjust_for_latency=True, upper_limit=6))
        expected_result = []
        # put_bytes records the bytes, not the content to avoid memory
        # pressure.
        expected_result.append(('put_bytes', 'foo', 6, None))
        # readv records the supplied offset request
        expected_result.append(('readv', 'foo', [(0, 1), (3, 2)], True, 6))
        self.assertEqual(expected_result, t._activity)


class TestSSHConnections(tests.TestCaseWithTransport):

    def test_bzr_connect_to_bzr_ssh(self):
        """User acceptance that get_transport of a bzr+ssh:// behaves correctly.

        bzr+ssh:// should cause bzr to run a remote bzr smart server over SSH.
        """
        # This test actually causes a bzr instance to be invoked, which is very
        # expensive: it should be the only such test in the test suite.
        # A reasonable evolution for this would be to simply check inside
        # check_channel_exec_request that the command is appropriate, and then
        # satisfy requests in-process.
        self.requireFeature(features.paramiko)
        # SFTPFullAbsoluteServer has a get_url method, and doesn't
        # override the interface (doesn't change self._vendor).
        # Note that this does encryption, so can be slow.
        from bzrlib.tests import stub_sftp

        # Start an SSH server
        self.command_executed = []
        # XXX: This is horrible -- we define a really dumb SSH server that
        # executes commands, and manage the hooking up of stdin/out/err to the
        # SSH channel ourselves.  Surely this has already been implemented
        # elsewhere?
        started = []
        class StubSSHServer(stub_sftp.StubServer):

            test = self

            def check_channel_exec_request(self, channel, command):
                self.test.command_executed.append(command)
                proc = subprocess.Popen(
                    command, shell=True, stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE)

                # XXX: horribly inefficient, not to mention ugly.
                # Start a thread for each of stdin/out/err, and relay bytes from
                # the subprocess to channel and vice versa.
                def ferry_bytes(read, write, close):
                    while True:
                        bytes = read(1)
                        if bytes == '':
                            close()
                            break
                        write(bytes)

                file_functions = [
                    (channel.recv, proc.stdin.write, proc.stdin.close),
                    (proc.stdout.read, channel.sendall, channel.close),
                    (proc.stderr.read, channel.sendall_stderr, channel.close)]
                started.append(proc)
                for read, write, close in file_functions:
                    t = threading.Thread(
                        target=ferry_bytes, args=(read, write, close))
                    t.start()
                    started.append(t)

                return True

        ssh_server = stub_sftp.SFTPFullAbsoluteServer(StubSSHServer)
        # We *don't* want to override the default SSH vendor: the detected one
        # is the one to use.
        self.start_server(ssh_server)
        port = ssh_server._listener.port

        if sys.platform == 'win32':
            bzr_remote_path = sys.executable + ' ' + self.get_bzr_path()
        else:
            bzr_remote_path = self.get_bzr_path()
        os.environ['BZR_REMOTE_PATH'] = bzr_remote_path

        # Access the branch via a bzr+ssh URL.  The BZR_REMOTE_PATH environment
        # variable is used to tell bzr what command to run on the remote end.
        path_to_branch = osutils.abspath('.')
        if sys.platform == 'win32':
            # On Windows, we export all drives as '/C:/, etc. So we need to
            # prefix a '/' to get the right path.
            path_to_branch = '/' + path_to_branch
        url = 'bzr+ssh://fred:secret@localhost:%d%s' % (port, path_to_branch)
        t = transport.get_transport(url)
        self.permit_url(t.base)
        t.mkdir('foo')

        self.assertEqual(
            ['%s serve --inet --directory=/ --allow-writes' % bzr_remote_path],
            self.command_executed)
        # Make sure to disconnect, so that the remote process can stop, and we
        # can cleanup. Then pause the test until everything is shutdown
        t._client._medium.disconnect()
        if not started:
            return
        # First wait for the subprocess
        started[0].wait()
        # And the rest are threads
        for t in started[1:]:
            t.join()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.