test_foreign.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » test_foreign.py
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA


"""Tests for foreign VCS utility code."""


from bzrlib import (
    branch,
    bzrdir,
    errors,
    foreign,
    lockable_files,
    lockdir,
    revision,
    tests,
    trace,
    )

# This is the dummy foreign revision control system, used 
# mainly here in the testsuite to test the foreign VCS infrastructure.
# It is basically standard Bazaar with some minor modifications to 
# make it "foreign". 
# 
# It has the following differences to "regular" Bazaar:
# - The control directory is named ".dummy", not ".bzr".
# - The revision ids are tuples, not strings.
# - Doesn't support more than one parent natively


class DummyForeignVcsMapping(foreign.VcsMapping):
    """A simple mapping for the dummy Foreign VCS, for use with testing."""

    def __eq__(self, other):
        return type(self) == type(other)

    def revision_id_bzr_to_foreign(self, bzr_revid):
        return tuple(bzr_revid[len("dummy-v1:"):].split("-")), self

    def revision_id_foreign_to_bzr(self, foreign_revid):
        return "dummy-v1:%s-%s-%s" % foreign_revid


class DummyForeignVcsMappingRegistry(foreign.VcsMappingRegistry):

    def revision_id_bzr_to_foreign(self, revid):
        if not revid.startswith("dummy-"):
            raise errors.InvalidRevisionId(revid, None)
        mapping_version = revid[len("dummy-"):len("dummy-vx")]
        mapping = self.get(mapping_version)
        return mapping.revision_id_bzr_to_foreign(revid)


class DummyForeignVcs(foreign.ForeignVcs):
    """A dummy Foreign VCS, for use with testing.

    It has revision ids that are a tuple with three strings.
    """

    def __init__(self):
        self.mapping_registry = DummyForeignVcsMappingRegistry()
        self.mapping_registry.register("v1", DummyForeignVcsMapping(self),
                                       "Version 1")
        self.abbreviation = "dummy"

    def show_foreign_revid(self, foreign_revid):
        return { "dummy ding": "%s/%s\\%s" % foreign_revid }

    def serialize_foreign_revid(self, foreign_revid):
        return "%s|%s|%s" % foreign_revid


class DummyForeignVcsBranch(branch.BzrBranch6,foreign.ForeignBranch):
    """A Dummy VCS Branch."""

    def __init__(self, _format, _control_files, a_bzrdir, *args, **kwargs):
        self._format = _format
        self._base = a_bzrdir.transport.base
        self._ignore_fallbacks = False
        self.bzrdir = a_bzrdir
        foreign.ForeignBranch.__init__(self, 
            DummyForeignVcsMapping(DummyForeignVcs()))
        branch.BzrBranch6.__init__(self, _format, _control_files, a_bzrdir, 
            *args, **kwargs)


class InterToDummyVcsBranch(branch.GenericInterBranch,
                            foreign.InterToForeignBranch):

    @staticmethod
    def is_compatible(source, target):
        return isinstance(target, DummyForeignVcsBranch)

    def push(self, overwrite=False, stop_revision=None):
        raise errors.NoRoundtrippingSupport(self.source, self.target)

    def lossy_push(self, stop_revision=None):
        result = branch.BranchPushResult()
        result.source_branch = self.source
        result.target_branch = self.target
        result.old_revno, result.old_revid = self.target.last_revision_info()
        self.source.lock_read()
        try:
            # This just handles simple cases, but that's good enough for tests
            my_history = self.target.revision_history()
            their_history = self.source.revision_history()
            if their_history[:min(len(my_history), len(their_history))] != my_history:
                raise errors.DivergedBranches(self.target, self.source)
            todo = their_history[len(my_history):]
            revidmap = {}
            for revid in todo:
                rev = self.source.repository.get_revision(revid)
                tree = self.source.repository.revision_tree(revid)
                def get_file_with_stat(file_id, path=None):
                    return (tree.get_file(file_id), None)
                tree.get_file_with_stat = get_file_with_stat
                new_revid = self.target.mapping.revision_id_foreign_to_bzr(
                    (str(rev.timestamp), str(rev.timezone), 
                        str(self.target.revno())))
                parent_revno, parent_revid= self.target.last_revision_info()
                if parent_revid == revision.NULL_REVISION:
                    parent_revids = []
                else:
                    parent_revids = [parent_revid]
                builder = self.target.get_commit_builder(parent_revids, 
                        self.target.get_config(), rev.timestamp,
                        rev.timezone, rev.committer, rev.properties,
                        new_revid)
                try:
                    for path, ie in tree.inventory.iter_entries():
                        new_ie = ie.copy()
                        new_ie.revision = None
                        builder.record_entry_contents(new_ie, 
                            [self.target.repository.revision_tree(parent_revid).inventory],
                            path, tree, 
                            (ie.kind, ie.text_size, ie.executable, ie.text_sha1))
                    builder.finish_inventory()
                except:
                    builder.abort()
                    raise
                revidmap[revid] = builder.commit(rev.message)
                self.target.set_last_revision_info(parent_revno+1, 
                    revidmap[revid])
                trace.mutter('lossily pushed revision %s -> %s', 
                    revid, revidmap[revid])
        finally:
            self.source.unlock()
        result.new_revno, result.new_revid = self.target.last_revision_info()
        result.revidmap = revidmap
        return result


class DummyForeignVcsBranchFormat(branch.BzrBranchFormat6):

    def get_format_string(self):
        return "Branch for Testing"

    def __init__(self):
        super(DummyForeignVcsBranchFormat, self).__init__()
        self._matchingbzrdir = DummyForeignVcsDirFormat()

    def open(self, a_bzrdir, name=None, _found=False):
        if not _found:
            raise NotImplementedError
        try:
            transport = a_bzrdir.get_branch_transport(None, name=name)
            control_files = lockable_files.LockableFiles(transport, 'lock',
                                                         lockdir.LockDir)
            return DummyForeignVcsBranch(_format=self,
                              _control_files=control_files,
                              a_bzrdir=a_bzrdir,
                              _repository=a_bzrdir.find_repository())
        except errors.NoSuchFile:
            raise errors.NotBranchError(path=transport.base)


class DummyForeignVcsDirFormat(bzrdir.BzrDirMetaFormat1):
    """BzrDirFormat for the dummy foreign VCS."""

    @classmethod
    def get_format_string(cls):
        return "A Dummy VCS Dir"

    @classmethod
    def get_format_description(cls):
        return "A Dummy VCS Dir"

    @classmethod
    def is_supported(cls):
        return True

    def get_branch_format(self):
        return DummyForeignVcsBranchFormat()

    @classmethod
    def probe_transport(klass, transport):
        """Return the .bzrdir style format present in a directory."""
        if not transport.has('.dummy'):
            raise errors.NotBranchError(path=transport.base)
        return klass()

    def initialize_on_transport(self, transport):
        """Initialize a new bzrdir in the base directory of a Transport."""
        # Since we don't have a .bzr directory, inherit the
        # mode from the root directory
        temp_control = lockable_files.LockableFiles(transport,
                            '', lockable_files.TransportLock)
        temp_control._transport.mkdir('.dummy',
                                      # FIXME: RBC 20060121 don't peek under
                                      # the covers
                                      mode=temp_control._dir_mode)
        del temp_control
        bzrdir_transport = transport.clone('.dummy')
        # NB: no need to escape relative paths that are url safe.
        control_files = lockable_files.LockableFiles(bzrdir_transport,
            self._lock_file_name, self._lock_class)
        control_files.create_lock()
        return self.open(transport, _found=True)

    def _open(self, transport):
        return DummyForeignVcsDir(transport, self)


class DummyForeignVcsDir(bzrdir.BzrDirMeta1):

    def __init__(self, _transport, _format):
        self._format = _format
        self.transport = _transport.clone('.dummy')
        self.root_transport = _transport
        self._mode_check_done = False
        self._control_files = lockable_files.LockableFiles(self.transport,
            "lock", lockable_files.TransportLock)

    def open_branch(self, name=None, unsupported=False, ignore_fallbacks=True):
        if name is not None:
            raise errors.NoColocatedBranchSupport(self)
        return self._format.get_branch_format().open(self, _found=True)

    def cloning_metadir(self, stacked=False):
        """Produce a metadir suitable for cloning with."""
        return bzrdir.format_registry.make_bzrdir("default")

    def sprout(self, url, revision_id=None, force_new_repo=False,
               recurse='down', possible_transports=None,
               accelerator_tree=None, hardlink=False, stacked=False,
               source_branch=None):
        # dirstate doesn't cope with accelerator_trees well 
        # that have a different control dir
        return super(DummyForeignVcsDir, self).sprout(url=url, 
                revision_id=revision_id, force_new_repo=force_new_repo, 
                recurse=recurse, possible_transports=possible_transports, 
                hardlink=hardlink, stacked=stacked, source_branch=source_branch)


def register_dummy_foreign_for_test(testcase):
    bzrdir.BzrDirFormat.register_control_format(DummyForeignVcsDirFormat)
    testcase.addCleanup(bzrdir.BzrDirFormat.unregister_control_format,
                        DummyForeignVcsDirFormat)
    # We need to register the optimiser to make the dummy appears really
    # different from a regular bzr repository.
    branch.InterBranch.register_optimiser(InterToDummyVcsBranch)
    testcase.addCleanup(branch.InterBranch.unregister_optimiser,
                        InterToDummyVcsBranch)


class ForeignVcsRegistryTests(tests.TestCase):
    """Tests for the ForeignVcsRegistry class."""

    def test_parse_revision_id_no_dash(self):
        reg = foreign.ForeignVcsRegistry()
        self.assertRaises(errors.InvalidRevisionId,
                          reg.parse_revision_id, "invalid")

    def test_parse_revision_id_unknown_mapping(self):
        reg = foreign.ForeignVcsRegistry()
        self.assertRaises(errors.InvalidRevisionId,
                          reg.parse_revision_id, "unknown-foreignrevid")

    def test_parse_revision_id(self):
        reg = foreign.ForeignVcsRegistry()
        vcs = DummyForeignVcs()
        reg.register("dummy", vcs, "Dummy VCS")
        self.assertEquals((("some", "foreign", "revid"), DummyForeignVcsMapping(vcs)),
                          reg.parse_revision_id("dummy-v1:some-foreign-revid"))


class ForeignRevisionTests(tests.TestCase):
    """Tests for the ForeignRevision class."""

    def test_create(self):
        mapp = DummyForeignVcsMapping(DummyForeignVcs())
        rev = foreign.ForeignRevision(("a", "foreign", "revid"),
                                      mapp, "roundtripped-revid")
        self.assertEquals("", rev.inventory_sha1)
        self.assertEquals(("a", "foreign", "revid"), rev.foreign_revid)
        self.assertEquals(mapp, rev.mapping)


class WorkingTreeFileUpdateTests(tests.TestCaseWithTransport):
    """Tests for update_workingtree_fileids()."""

    def test_update_workingtree(self):
        wt = self.make_branch_and_tree('br1')
        self.build_tree_contents([('br1/bla', 'original contents\n')])
        wt.add('bla', 'bla-a')
        wt.commit('bla-a')
        root_id = wt.get_root_id()
        target = wt.bzrdir.sprout('br2').open_workingtree()
        target.unversion(['bla-a'])
        target.add('bla', 'bla-b')
        target.commit('bla-b')
        target_basis = target.basis_tree()
        target_basis.lock_read()
        self.addCleanup(target_basis.unlock)
        foreign.update_workingtree_fileids(wt, target_basis)
        wt.lock_read()
        try:
            self.assertEquals(set([root_id, "bla-b"]), set(wt.inventory))
        finally:
            wt.unlock()


class DummyForeignVcsTests(tests.TestCaseWithTransport):
    """Very basic test for DummyForeignVcs."""

    def setUp(self):
        super(DummyForeignVcsTests, self).setUp()
        register_dummy_foreign_for_test(self)

    def test_create(self):
        """Test we can create dummies."""
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
        dir = bzrdir.BzrDir.open("d")
        self.assertEquals("A Dummy VCS Dir", dir._format.get_format_string())
        dir.open_repository()
        dir.open_branch()
        dir.open_workingtree()

    def test_sprout(self):
        """Test we can clone dummies and that the format is not preserved."""
        self.make_branch_and_tree("d", format=DummyForeignVcsDirFormat())
        dir = bzrdir.BzrDir.open("d")
        newdir = dir.sprout("e")
        self.assertNotEquals("A Dummy VCS Dir",
                             newdir._format.get_format_string())

    def test_push_not_supported(self):
        source_tree = self.make_branch_and_tree("source")
        target_tree = self.make_branch_and_tree("target", 
            format=DummyForeignVcsDirFormat())
        self.assertRaises(errors.NoRoundtrippingSupport, 
            source_tree.branch.push, target_tree.branch)

    def test_lossy_push_empty(self):
        source_tree = self.make_branch_and_tree("source")
        target_tree = self.make_branch_and_tree("target", 
            format=DummyForeignVcsDirFormat())
        pushresult = source_tree.branch.lossy_push(target_tree.branch)
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
        self.assertEquals(revision.NULL_REVISION, pushresult.new_revid)
        self.assertEquals({}, pushresult.revidmap)

    def test_lossy_push_simple(self):
        source_tree = self.make_branch_and_tree("source")
        self.build_tree(['source/a', 'source/b'])
        source_tree.add(['a', 'b'])
        revid1 = source_tree.commit("msg")
        target_tree = self.make_branch_and_tree("target", 
            format=DummyForeignVcsDirFormat())
        target_tree.branch.lock_write()
        try:
            pushresult = source_tree.branch.lossy_push(target_tree.branch)
        finally:
            target_tree.branch.unlock()
        self.assertEquals(revision.NULL_REVISION, pushresult.old_revid)
        self.assertEquals({revid1:target_tree.branch.last_revision()}, 
                           pushresult.revidmap)
        self.assertEquals(pushresult.revidmap[revid1], pushresult.new_revid)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.