test__dirstate_helpers.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » test__dirstate_helpers.py
# Copyright (C) 2007-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Tests for the compiled dirstate helpers."""

import bisect
import os
import time

from bzrlib import (
    dirstate,
    errors,
    osutils,
    tests,
    )
from bzrlib.tests import (
    test_dirstate,
    test_osutils,
    )

try:
    from bzrlib import _dirstate_helpers_pyx
    has_dirstate_helpers_pyx = True
except ImportError:
    has_dirstate_helpers_pyx = False


compiled_dirstate_helpers_feature = tests.ModuleAvailableFeature(
                                'bzrlib._dirstate_helpers_pyx')


def load_tests(basic_tests, module, loader):
    # FIXME: we should also parametrize against SHA1Provider !
    suite = loader.suiteClass()
    remaining_tests = basic_tests

    dir_reader_scenarios = test_osutils.dir_reader_scenarios()

    ue_scenarios = [('dirstate_Python',
                     {'update_entry': dirstate.py_update_entry})]
    if compiled_dirstate_helpers_feature.available():
        update_entry = compiled_dirstate_helpers_feature.module.update_entry
        pyrex_scenario = ('dirstate_Pyrex', {'update_entry': update_entry})
        ue_scenarios.append(pyrex_scenario)
    process_entry_tests, remaining_tests = tests.split_suite_by_condition(
        remaining_tests, tests.condition_isinstance(TestUpdateEntry))
    tests.multiply_tests(process_entry_tests,
                         tests.multiply_scenarios(dir_reader_scenarios,
                                                  ue_scenarios),
                         suite)

    pe_scenarios = [('dirstate_Python',
                     {'_process_entry': dirstate.ProcessEntryPython})]
    if compiled_dirstate_helpers_feature.available():
        process_entry = compiled_dirstate_helpers_feature.module.ProcessEntryC
        pyrex_scenario = ('dirstate_Pyrex', {'_process_entry': process_entry})
        pe_scenarios.append(pyrex_scenario)
    process_entry_tests, remaining_tests = tests.split_suite_by_condition(
        remaining_tests, tests.condition_isinstance(TestProcessEntry))
    tests.multiply_tests(process_entry_tests,
                         tests.multiply_scenarios(dir_reader_scenarios,
                                                  pe_scenarios),
                         suite)

    dir_reader_tests, remaining_tests = tests.split_suite_by_condition(
        remaining_tests, tests.condition_isinstance(
            test_dirstate.TestCaseWithDirState))
    tests.multiply_tests(dir_reader_tests, dir_reader_scenarios, suite)
    suite.addTest(remaining_tests)

    return suite


class TestBisectPathMixin(object):
    """Test that _bisect_path_*() returns the expected values.

    _bisect_path_* is intended to work like bisect.bisect_*() except it
    knows it is working on paths that are sorted by ('path', 'to', 'foo')
    chunks rather than by raw 'path/to/foo'.

    Test Cases should inherit from this and override ``get_bisect_path`` return
    their implementation, and ``get_bisect`` to return the matching
    bisect.bisect_* function.
    """

    def get_bisect_path(self):
        """Return an implementation of _bisect_path_*"""
        raise NotImplementedError

    def get_bisect(self):
        """Return a version of bisect.bisect_*.

        Also, for the 'exists' check, return the offset to the real values.
        For example bisect_left returns the index of an entry, while
        bisect_right returns the index *after* an entry

        :return: (bisect_func, offset)
        """
        raise NotImplementedError

    def assertBisect(self, paths, split_paths, path, exists=True):
        """Assert that bisect_split works like bisect_left on the split paths.

        :param paths: A list of path names
        :param split_paths: A list of path names that are already split up by directory
            ('path/to/foo' => ('path', 'to', 'foo'))
        :param path: The path we are indexing.
        :param exists: The path should be present, so make sure the
            final location actually points to the right value.

        All other arguments will be passed along.
        """
        bisect_path = self.get_bisect_path()
        self.assertIsInstance(paths, list)
        bisect_path_idx = bisect_path(paths, path)
        split_path = self.split_for_dirblocks([path])[0]
        bisect_func, offset = self.get_bisect()
        bisect_split_idx = bisect_func(split_paths, split_path)
        self.assertEqual(bisect_split_idx, bisect_path_idx,
                         '%s disagreed. %s != %s'
                         ' for key %r'
                         % (bisect_path.__name__,
                            bisect_split_idx, bisect_path_idx, path)
                         )
        if exists:
            self.assertEqual(path, paths[bisect_path_idx+offset])

    def split_for_dirblocks(self, paths):
        dir_split_paths = []
        for path in paths:
            dirname, basename = os.path.split(path)
            dir_split_paths.append((dirname.split('/'), basename))
        dir_split_paths.sort()
        return dir_split_paths

    def test_simple(self):
        """In the simple case it works just like bisect_left"""
        paths = ['', 'a', 'b', 'c', 'd']
        split_paths = self.split_for_dirblocks(paths)
        for path in paths:
            self.assertBisect(paths, split_paths, path, exists=True)
        self.assertBisect(paths, split_paths, '_', exists=False)
        self.assertBisect(paths, split_paths, 'aa', exists=False)
        self.assertBisect(paths, split_paths, 'bb', exists=False)
        self.assertBisect(paths, split_paths, 'cc', exists=False)
        self.assertBisect(paths, split_paths, 'dd', exists=False)
        self.assertBisect(paths, split_paths, 'a/a', exists=False)
        self.assertBisect(paths, split_paths, 'b/b', exists=False)
        self.assertBisect(paths, split_paths, 'c/c', exists=False)
        self.assertBisect(paths, split_paths, 'd/d', exists=False)

    def test_involved(self):
        """This is where bisect_path_* diverges slightly."""
        # This is the list of paths and their contents
        # a/
        #   a/
        #     a
        #     z
        #   a-a/
        #     a
        #   a-z/
        #     z
        #   a=a/
        #     a
        #   a=z/
        #     z
        #   z/
        #     a
        #     z
        #   z-a
        #   z-z
        #   z=a
        #   z=z
        # a-a/
        #   a
        # a-z/
        #   z
        # a=a/
        #   a
        # a=z/
        #   z
        # This is the exact order that is stored by dirstate
        # All children in a directory are mentioned before an children of
        # children are mentioned.
        # So all the root-directory paths, then all the
        # first sub directory, etc.
        paths = [# content of '/'
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
                 # content of 'a/'
                 'a/a', 'a/a-a', 'a/a-z',
                 'a/a=a', 'a/a=z',
                 'a/z', 'a/z-a', 'a/z-z',
                 'a/z=a', 'a/z=z',
                 # content of 'a/a/'
                 'a/a/a', 'a/a/z',
                 # content of 'a/a-a'
                 'a/a-a/a',
                 # content of 'a/a-z'
                 'a/a-z/z',
                 # content of 'a/a=a'
                 'a/a=a/a',
                 # content of 'a/a=z'
                 'a/a=z/z',
                 # content of 'a/z/'
                 'a/z/a', 'a/z/z',
                 # content of 'a-a'
                 'a-a/a',
                 # content of 'a-z'
                 'a-z/z',
                 # content of 'a=a'
                 'a=a/a',
                 # content of 'a=z'
                 'a=z/z',
                ]
        split_paths = self.split_for_dirblocks(paths)
        sorted_paths = []
        for dir_parts, basename in split_paths:
            if dir_parts == ['']:
                sorted_paths.append(basename)
            else:
                sorted_paths.append('/'.join(dir_parts + [basename]))

        self.assertEqual(sorted_paths, paths)

        for path in paths:
            self.assertBisect(paths, split_paths, path, exists=True)


class TestBisectPathLeft(tests.TestCase, TestBisectPathMixin):
    """Run all Bisect Path tests against _bisect_path_left."""

    def get_bisect_path(self):
        from bzrlib._dirstate_helpers_py import _bisect_path_left
        return _bisect_path_left

    def get_bisect(self):
        return bisect.bisect_left, 0


class TestCompiledBisectPathLeft(TestBisectPathLeft):
    """Run all Bisect Path tests against _bisect_path_lect"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_bisect_path(self):
        from bzrlib._dirstate_helpers_pyx import _bisect_path_left
        return _bisect_path_left


class TestBisectPathRight(tests.TestCase, TestBisectPathMixin):
    """Run all Bisect Path tests against _bisect_path_right"""

    def get_bisect_path(self):
        from bzrlib._dirstate_helpers_py import _bisect_path_right
        return _bisect_path_right

    def get_bisect(self):
        return bisect.bisect_right, -1


class TestCompiledBisectPathRight(TestBisectPathRight):
    """Run all Bisect Path tests against _bisect_path_right"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_bisect_path(self):
        from bzrlib._dirstate_helpers_pyx import _bisect_path_right
        return _bisect_path_right


class TestBisectDirblock(tests.TestCase):
    """Test that bisect_dirblock() returns the expected values.

    bisect_dirblock is intended to work like bisect.bisect_left() except it
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
    'to', 'foo') chunks rather than by raw 'path/to/foo'.

    This test is parameterized by calling get_bisect_dirblock(). Child test
    cases can override this function to test against a different
    implementation.
    """

    def get_bisect_dirblock(self):
        """Return an implementation of bisect_dirblock"""
        from bzrlib._dirstate_helpers_py import bisect_dirblock
        return bisect_dirblock

    def assertBisect(self, dirblocks, split_dirblocks, path, *args, **kwargs):
        """Assert that bisect_split works like bisect_left on the split paths.

        :param dirblocks: A list of (path, [info]) pairs.
        :param split_dirblocks: A list of ((split, path), [info]) pairs.
        :param path: The path we are indexing.

        All other arguments will be passed along.
        """
        bisect_dirblock = self.get_bisect_dirblock()
        self.assertIsInstance(dirblocks, list)
        bisect_split_idx = bisect_dirblock(dirblocks, path, *args, **kwargs)
        split_dirblock = (path.split('/'), [])
        bisect_left_idx = bisect.bisect_left(split_dirblocks, split_dirblock,
                                             *args)
        self.assertEqual(bisect_left_idx, bisect_split_idx,
                         'bisect_split disagreed. %s != %s'
                         ' for key %r'
                         % (bisect_left_idx, bisect_split_idx, path)
                         )

    def paths_to_dirblocks(self, paths):
        """Convert a list of paths into dirblock form.

        Also, ensure that the paths are in proper sorted order.
        """
        dirblocks = [(path, []) for path in paths]
        split_dirblocks = [(path.split('/'), []) for path in paths]
        self.assertEqual(sorted(split_dirblocks), split_dirblocks)
        return dirblocks, split_dirblocks

    def test_simple(self):
        """In the simple case it works just like bisect_left"""
        paths = ['', 'a', 'b', 'c', 'd']
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
        for path in paths:
            self.assertBisect(dirblocks, split_dirblocks, path)
        self.assertBisect(dirblocks, split_dirblocks, '_')
        self.assertBisect(dirblocks, split_dirblocks, 'aa')
        self.assertBisect(dirblocks, split_dirblocks, 'bb')
        self.assertBisect(dirblocks, split_dirblocks, 'cc')
        self.assertBisect(dirblocks, split_dirblocks, 'dd')
        self.assertBisect(dirblocks, split_dirblocks, 'a/a')
        self.assertBisect(dirblocks, split_dirblocks, 'b/b')
        self.assertBisect(dirblocks, split_dirblocks, 'c/c')
        self.assertBisect(dirblocks, split_dirblocks, 'd/d')

    def test_involved(self):
        """This is where bisect_left diverges slightly."""
        paths = ['', 'a',
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
                 'a-a', 'a-z',
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
                 'z-a', 'z-z',
                ]
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
        for path in paths:
            self.assertBisect(dirblocks, split_dirblocks, path)

    def test_involved_cached(self):
        """This is where bisect_left diverges slightly."""
        paths = ['', 'a',
                 'a/a', 'a/a/a', 'a/a/z', 'a/a-a', 'a/a-z',
                 'a/z', 'a/z/a', 'a/z/z', 'a/z-a', 'a/z-z',
                 'a-a', 'a-z',
                 'z', 'z/a/a', 'z/a/z', 'z/a-a', 'z/a-z',
                 'z/z', 'z/z/a', 'z/z/z', 'z/z-a', 'z/z-z',
                 'z-a', 'z-z',
                ]
        cache = {}
        dirblocks, split_dirblocks = self.paths_to_dirblocks(paths)
        for path in paths:
            self.assertBisect(dirblocks, split_dirblocks, path, cache=cache)


class TestCompiledBisectDirblock(TestBisectDirblock):
    """Test that bisect_dirblock() returns the expected values.

    bisect_dirblock is intended to work like bisect.bisect_left() except it
    knows it is working on dirblocks and that dirblocks are sorted by ('path',
    'to', 'foo') chunks rather than by raw 'path/to/foo'.

    This runs all the normal tests that TestBisectDirblock did, but uses the
    compiled version.
    """

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_bisect_dirblock(self):
        from bzrlib._dirstate_helpers_pyx import bisect_dirblock
        return bisect_dirblock


class TestCmpByDirs(tests.TestCase):
    """Test an implementation of cmp_by_dirs()

    cmp_by_dirs() compares 2 paths by their directory sections, rather than as
    plain strings.

    Child test cases can override ``get_cmp_by_dirs`` to test a specific
    implementation.
    """

    def get_cmp_by_dirs(self):
        """Get a specific implementation of cmp_by_dirs."""
        from bzrlib._dirstate_helpers_py import cmp_by_dirs
        return cmp_by_dirs

    def assertCmpByDirs(self, expected, str1, str2):
        """Compare the two strings, in both directions.

        :param expected: The expected comparison value. -1 means str1 comes
            first, 0 means they are equal, 1 means str2 comes first
        :param str1: string to compare
        :param str2: string to compare
        """
        cmp_by_dirs = self.get_cmp_by_dirs()
        if expected == 0:
            self.assertEqual(str1, str2)
            self.assertEqual(0, cmp_by_dirs(str1, str2))
            self.assertEqual(0, cmp_by_dirs(str2, str1))
        elif expected > 0:
            self.assertPositive(cmp_by_dirs(str1, str2))
            self.assertNegative(cmp_by_dirs(str2, str1))
        else:
            self.assertNegative(cmp_by_dirs(str1, str2))
            self.assertPositive(cmp_by_dirs(str2, str1))

    def test_cmp_empty(self):
        """Compare against the empty string."""
        self.assertCmpByDirs(0, '', '')
        self.assertCmpByDirs(1, 'a', '')
        self.assertCmpByDirs(1, 'ab', '')
        self.assertCmpByDirs(1, 'abc', '')
        self.assertCmpByDirs(1, 'abcd', '')
        self.assertCmpByDirs(1, 'abcde', '')
        self.assertCmpByDirs(1, 'abcdef', '')
        self.assertCmpByDirs(1, 'abcdefg', '')
        self.assertCmpByDirs(1, 'abcdefgh', '')
        self.assertCmpByDirs(1, 'abcdefghi', '')
        self.assertCmpByDirs(1, 'test/ing/a/path/', '')

    def test_cmp_same_str(self):
        """Compare the same string"""
        self.assertCmpByDirs(0, 'a', 'a')
        self.assertCmpByDirs(0, 'ab', 'ab')
        self.assertCmpByDirs(0, 'abc', 'abc')
        self.assertCmpByDirs(0, 'abcd', 'abcd')
        self.assertCmpByDirs(0, 'abcde', 'abcde')
        self.assertCmpByDirs(0, 'abcdef', 'abcdef')
        self.assertCmpByDirs(0, 'abcdefg', 'abcdefg')
        self.assertCmpByDirs(0, 'abcdefgh', 'abcdefgh')
        self.assertCmpByDirs(0, 'abcdefghi', 'abcdefghi')
        self.assertCmpByDirs(0, 'testing a long string', 'testing a long string')
        self.assertCmpByDirs(0, 'x'*10000, 'x'*10000)
        self.assertCmpByDirs(0, 'a/b', 'a/b')
        self.assertCmpByDirs(0, 'a/b/c', 'a/b/c')
        self.assertCmpByDirs(0, 'a/b/c/d', 'a/b/c/d')
        self.assertCmpByDirs(0, 'a/b/c/d/e', 'a/b/c/d/e')

    def test_simple_paths(self):
        """Compare strings that act like normal string comparison"""
        self.assertCmpByDirs(-1, 'a', 'b')
        self.assertCmpByDirs(-1, 'aa', 'ab')
        self.assertCmpByDirs(-1, 'ab', 'bb')
        self.assertCmpByDirs(-1, 'aaa', 'aab')
        self.assertCmpByDirs(-1, 'aab', 'abb')
        self.assertCmpByDirs(-1, 'abb', 'bbb')
        self.assertCmpByDirs(-1, 'aaaa', 'aaab')
        self.assertCmpByDirs(-1, 'aaab', 'aabb')
        self.assertCmpByDirs(-1, 'aabb', 'abbb')
        self.assertCmpByDirs(-1, 'abbb', 'bbbb')
        self.assertCmpByDirs(-1, 'aaaaa', 'aaaab')
        self.assertCmpByDirs(-1, 'a/a', 'a/b')
        self.assertCmpByDirs(-1, 'a/b', 'b/b')
        self.assertCmpByDirs(-1, 'a/a/a', 'a/a/b')
        self.assertCmpByDirs(-1, 'a/a/b', 'a/b/b')
        self.assertCmpByDirs(-1, 'a/b/b', 'b/b/b')
        self.assertCmpByDirs(-1, 'a/a/a/a', 'a/a/a/b')
        self.assertCmpByDirs(-1, 'a/a/a/b', 'a/a/b/b')
        self.assertCmpByDirs(-1, 'a/a/b/b', 'a/b/b/b')
        self.assertCmpByDirs(-1, 'a/b/b/b', 'b/b/b/b')
        self.assertCmpByDirs(-1, 'a/a/a/a/a', 'a/a/a/a/b')

    def test_tricky_paths(self):
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/cc/ef')
        self.assertCmpByDirs(1, 'ab/cd/ef', 'ab/c/ef')
        self.assertCmpByDirs(-1, 'ab/cd/ef', 'ab/cd-ef')
        self.assertCmpByDirs(-1, 'ab/cd', 'ab/cd-')
        self.assertCmpByDirs(-1, 'ab/cd', 'ab-cd')

    def test_cmp_unicode_not_allowed(self):
        cmp_by_dirs = self.get_cmp_by_dirs()
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', 'str')
        self.assertRaises(TypeError, cmp_by_dirs, 'str', u'Unicode')
        self.assertRaises(TypeError, cmp_by_dirs, u'Unicode', u'Unicode')

    def test_cmp_non_ascii(self):
        self.assertCmpByDirs(-1, '\xc2\xb5', '\xc3\xa5') # u'\xb5', u'\xe5'
        self.assertCmpByDirs(-1, 'a', '\xc3\xa5') # u'a', u'\xe5'
        self.assertCmpByDirs(-1, 'b', '\xc2\xb5') # u'b', u'\xb5'
        self.assertCmpByDirs(-1, 'a/b', 'a/\xc3\xa5') # u'a/b', u'a/\xe5'
        self.assertCmpByDirs(-1, 'b/a', 'b/\xc2\xb5') # u'b/a', u'b/\xb5'


class TestCompiledCmpByDirs(TestCmpByDirs):
    """Test the pyrex implementation of cmp_by_dirs"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_cmp_by_dirs(self):
        from bzrlib._dirstate_helpers_pyx import cmp_by_dirs
        return cmp_by_dirs


class TestCmpPathByDirblock(tests.TestCase):
    """Test an implementation of _cmp_path_by_dirblock()

    _cmp_path_by_dirblock() compares two paths using the sort order used by
    DirState. All paths in the same directory are sorted together.

    Child test cases can override ``get_cmp_path_by_dirblock`` to test a specific
    implementation.
    """

    def get_cmp_path_by_dirblock(self):
        """Get a specific implementation of _cmp_path_by_dirblock."""
        from bzrlib._dirstate_helpers_py import _cmp_path_by_dirblock
        return _cmp_path_by_dirblock

    def assertCmpPathByDirblock(self, paths):
        """Compare all paths and make sure they evaluate to the correct order.

        This does N^2 comparisons. It is assumed that ``paths`` is properly
        sorted list.

        :param paths: a sorted list of paths to compare
        """
        # First, make sure the paths being passed in are correct
        def _key(p):
            dirname, basename = os.path.split(p)
            return dirname.split('/'), basename
        self.assertEqual(sorted(paths, key=_key), paths)

        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
        for idx1, path1 in enumerate(paths):
            for idx2, path2 in enumerate(paths):
                cmp_val = cmp_path_by_dirblock(path1, path2)
                if idx1 < idx2:
                    self.assertTrue(cmp_val < 0,
                        '%s did not state that %r came before %r, cmp=%s'
                        % (cmp_path_by_dirblock.__name__,
                           path1, path2, cmp_val))
                elif idx1 > idx2:
                    self.assertTrue(cmp_val > 0,
                        '%s did not state that %r came after %r, cmp=%s'
                        % (cmp_path_by_dirblock.__name__,
                           path1, path2, cmp_val))
                else: # idx1 == idx2
                    self.assertTrue(cmp_val == 0,
                        '%s did not state that %r == %r, cmp=%s'
                        % (cmp_path_by_dirblock.__name__,
                           path1, path2, cmp_val))

    def test_cmp_simple_paths(self):
        """Compare against the empty string."""
        self.assertCmpPathByDirblock(['', 'a', 'ab', 'abc', 'a/b/c', 'b/d/e'])
        self.assertCmpPathByDirblock(['kl', 'ab/cd', 'ab/ef', 'gh/ij'])

    def test_tricky_paths(self):
        self.assertCmpPathByDirblock([
            # Contents of ''
            '', 'a', 'a-a', 'a=a', 'b',
            # Contents of 'a'
            'a/a', 'a/a-a', 'a/a=a', 'a/b',
            # Contents of 'a/a'
            'a/a/a', 'a/a/a-a', 'a/a/a=a',
            # Contents of 'a/a/a'
            'a/a/a/a', 'a/a/a/b',
            # Contents of 'a/a/a-a',
            'a/a/a-a/a', 'a/a/a-a/b',
            # Contents of 'a/a/a=a',
            'a/a/a=a/a', 'a/a/a=a/b',
            # Contents of 'a/a-a'
            'a/a-a/a',
            # Contents of 'a/a-a/a'
            'a/a-a/a/a', 'a/a-a/a/b',
            # Contents of 'a/a=a'
            'a/a=a/a',
            # Contents of 'a/b'
            'a/b/a', 'a/b/b',
            # Contents of 'a-a',
            'a-a/a', 'a-a/b',
            # Contents of 'a=a',
            'a=a/a', 'a=a/b',
            # Contents of 'b',
            'b/a', 'b/b',
            ])
        self.assertCmpPathByDirblock([
                 # content of '/'
                 '', 'a', 'a-a', 'a-z', 'a=a', 'a=z',
                 # content of 'a/'
                 'a/a', 'a/a-a', 'a/a-z',
                 'a/a=a', 'a/a=z',
                 'a/z', 'a/z-a', 'a/z-z',
                 'a/z=a', 'a/z=z',
                 # content of 'a/a/'
                 'a/a/a', 'a/a/z',
                 # content of 'a/a-a'
                 'a/a-a/a',
                 # content of 'a/a-z'
                 'a/a-z/z',
                 # content of 'a/a=a'
                 'a/a=a/a',
                 # content of 'a/a=z'
                 'a/a=z/z',
                 # content of 'a/z/'
                 'a/z/a', 'a/z/z',
                 # content of 'a-a'
                 'a-a/a',
                 # content of 'a-z'
                 'a-z/z',
                 # content of 'a=a'
                 'a=a/a',
                 # content of 'a=z'
                 'a=z/z',
                ])

    def test_unicode_not_allowed(self):
        cmp_path_by_dirblock = self.get_cmp_path_by_dirblock()
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', 'str')
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'str', u'Uni')
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'Uni', u'Uni')
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', 'x/str')
        self.assertRaises(TypeError, cmp_path_by_dirblock, 'x/str', u'x/Uni')
        self.assertRaises(TypeError, cmp_path_by_dirblock, u'x/Uni', u'x/Uni')

    def test_nonascii(self):
        self.assertCmpPathByDirblock([
            # content of '/'
            '', 'a', '\xc2\xb5', '\xc3\xa5',
            # content of 'a'
            'a/a', 'a/\xc2\xb5', 'a/\xc3\xa5',
            # content of 'a/a'
            'a/a/a', 'a/a/\xc2\xb5', 'a/a/\xc3\xa5',
            # content of 'a/\xc2\xb5'
            'a/\xc2\xb5/a', 'a/\xc2\xb5/\xc2\xb5', 'a/\xc2\xb5/\xc3\xa5',
            # content of 'a/\xc3\xa5'
            'a/\xc3\xa5/a', 'a/\xc3\xa5/\xc2\xb5', 'a/\xc3\xa5/\xc3\xa5',
            # content of '\xc2\xb5'
            '\xc2\xb5/a', '\xc2\xb5/\xc2\xb5', '\xc2\xb5/\xc3\xa5',
            # content of '\xc2\xe5'
            '\xc3\xa5/a', '\xc3\xa5/\xc2\xb5', '\xc3\xa5/\xc3\xa5',
            ])


class TestCompiledCmpPathByDirblock(TestCmpPathByDirblock):
    """Test the pyrex implementation of _cmp_path_by_dirblock"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_cmp_by_dirs(self):
        from bzrlib._dirstate_helpers_pyx import _cmp_path_by_dirblock
        return _cmp_path_by_dirblock


class TestMemRChr(tests.TestCase):
    """Test memrchr functionality"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def assertMemRChr(self, expected, s, c):
        from bzrlib._dirstate_helpers_pyx import _py_memrchr
        self.assertEqual(expected, _py_memrchr(s, c))

    def test_missing(self):
        self.assertMemRChr(None, '', 'a')
        self.assertMemRChr(None, '', 'c')
        self.assertMemRChr(None, 'abcdefghijklm', 'q')
        self.assertMemRChr(None, 'aaaaaaaaaaaaaaaaaaaaaaa', 'b')

    def test_single_entry(self):
        self.assertMemRChr(0, 'abcdefghijklm', 'a')
        self.assertMemRChr(1, 'abcdefghijklm', 'b')
        self.assertMemRChr(2, 'abcdefghijklm', 'c')
        self.assertMemRChr(10, 'abcdefghijklm', 'k')
        self.assertMemRChr(11, 'abcdefghijklm', 'l')
        self.assertMemRChr(12, 'abcdefghijklm', 'm')

    def test_multiple(self):
        self.assertMemRChr(10, 'abcdefjklmabcdefghijklm', 'a')
        self.assertMemRChr(11, 'abcdefjklmabcdefghijklm', 'b')
        self.assertMemRChr(12, 'abcdefjklmabcdefghijklm', 'c')
        self.assertMemRChr(20, 'abcdefjklmabcdefghijklm', 'k')
        self.assertMemRChr(21, 'abcdefjklmabcdefghijklm', 'l')
        self.assertMemRChr(22, 'abcdefjklmabcdefghijklm', 'm')
        self.assertMemRChr(22, 'aaaaaaaaaaaaaaaaaaaaaaa', 'a')

    def test_with_nulls(self):
        self.assertMemRChr(10, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'a')
        self.assertMemRChr(11, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'b')
        self.assertMemRChr(12, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'c')
        self.assertMemRChr(20, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'k')
        self.assertMemRChr(21, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'l')
        self.assertMemRChr(22, 'abc\0\0\0jklmabc\0\0\0ghijklm', 'm')
        self.assertMemRChr(22, 'aaa\0\0\0aaaaaaa\0\0\0aaaaaaa', 'a')
        self.assertMemRChr(9, '\0\0\0\0\0\0\0\0\0\0', '\0')


class TestReadDirblocks(test_dirstate.TestCaseWithDirState):
    """Test an implementation of _read_dirblocks()

    _read_dirblocks() reads in all of the dirblock information from the disk
    file.

    Child test cases can override ``get_read_dirblocks`` to test a specific
    implementation.
    """

    def get_read_dirblocks(self):
        from bzrlib._dirstate_helpers_py import _read_dirblocks
        return _read_dirblocks

    def test_smoketest(self):
        """Make sure that we can create and read back a simple file."""
        tree, state, expected = self.create_basic_dirstate()
        del tree
        state._read_header_if_needed()
        self.assertEqual(dirstate.DirState.NOT_IN_MEMORY,
                         state._dirblock_state)
        read_dirblocks = self.get_read_dirblocks()
        read_dirblocks(state)
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)

    def test_trailing_garbage(self):
        tree, state, expected = self.create_basic_dirstate()
        # On Linux, we can write extra data as long as we haven't read yet, but
        # on Win32, if you've opened the file with FILE_SHARE_READ, trying to
        # open it in append mode will fail.
        state.unlock()
        f = open('dirstate', 'ab')
        try:
            # Add bogus trailing garbage
            f.write('bogus\n')
        finally:
            f.close()
            state.lock_read()
        e = self.assertRaises(errors.DirstateCorrupt,
                              state._read_dirblocks_if_needed)
        # Make sure we mention the bogus characters in the error
        self.assertContainsRe(str(e), 'bogus')


class TestCompiledReadDirblocks(TestReadDirblocks):
    """Test the pyrex implementation of _read_dirblocks"""

    _test_needs_features = [compiled_dirstate_helpers_feature]

    def get_read_dirblocks(self):
        from bzrlib._dirstate_helpers_pyx import _read_dirblocks
        return _read_dirblocks


class TestUsingCompiledIfAvailable(tests.TestCase):
    """Check that any compiled functions that are available are the default.

    It is possible to have typos, etc in the import line, such that
    _dirstate_helpers_pyx is actually available, but the compiled functions are
    not being used.
    """

    def test_bisect_dirblock(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import bisect_dirblock
        else:
            from bzrlib._dirstate_helpers_py import bisect_dirblock
        self.assertIs(bisect_dirblock, dirstate.bisect_dirblock)

    def test__bisect_path_left(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import _bisect_path_left
        else:
            from bzrlib._dirstate_helpers_py import _bisect_path_left
        self.assertIs(_bisect_path_left, dirstate._bisect_path_left)

    def test__bisect_path_right(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import _bisect_path_right
        else:
            from bzrlib._dirstate_helpers_py import _bisect_path_right
        self.assertIs(_bisect_path_right, dirstate._bisect_path_right)

    def test_cmp_by_dirs(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import cmp_by_dirs
        else:
            from bzrlib._dirstate_helpers_py import cmp_by_dirs
        self.assertIs(cmp_by_dirs, dirstate.cmp_by_dirs)

    def test__read_dirblocks(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import _read_dirblocks
        else:
            from bzrlib._dirstate_helpers_py import _read_dirblocks
        self.assertIs(_read_dirblocks, dirstate._read_dirblocks)

    def test_update_entry(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import update_entry
        else:
            from bzrlib.dirstate import update_entry
        self.assertIs(update_entry, dirstate.update_entry)

    def test_process_entry(self):
        if compiled_dirstate_helpers_feature.available():
            from bzrlib._dirstate_helpers_pyx import ProcessEntryC
            self.assertIs(ProcessEntryC, dirstate._process_entry)
        else:
            from bzrlib.dirstate import ProcessEntryPython
            self.assertIs(ProcessEntryPython, dirstate._process_entry)


class TestUpdateEntry(test_dirstate.TestCaseWithDirState):
    """Test the DirState.update_entry functions"""

    # Set by load_tests
    update_entry = None

    def setUp(self):
        super(TestUpdateEntry, self).setUp()
        self.overrideAttr(dirstate, 'update_entry', self.update_entry)

    def get_state_with_a(self):
        """Create a DirState tracking a single object named 'a'"""
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
        self.addCleanup(state.unlock)
        state.add('a', 'a-id', 'file', None, '')
        entry = state._get_entry(0, path_utf8='a')
        return state, entry

    def test_observed_sha1_cachable(self):
        state, entry = self.get_state_with_a()
        atime = time.time() - 10
        self.build_tree(['a'])
        statvalue = os.lstat('a')
        statvalue = test_dirstate._FakeStat(statvalue.st_size, atime, atime,
            statvalue.st_dev, statvalue.st_ino, statvalue.st_mode)
        state._observed_sha1(entry, "foo", statvalue)
        self.assertEqual('foo', entry[1][0][1])
        packed_stat = dirstate.pack_stat(statvalue)
        self.assertEqual(packed_stat, entry[1][0][4])

    def test_observed_sha1_not_cachable(self):
        state, entry = self.get_state_with_a()
        oldval = entry[1][0][1]
        oldstat = entry[1][0][4]
        self.build_tree(['a'])
        statvalue = os.lstat('a')
        state._observed_sha1(entry, "foo", statvalue)
        self.assertEqual(oldval, entry[1][0][1])
        self.assertEqual(oldstat, entry[1][0][4])

    def test_update_entry(self):
        state, _ = self.get_state_with_a()
        tree = self.make_branch_and_tree('tree')
        tree.lock_write()
        empty_revid = tree.commit('empty')
        self.build_tree(['tree/a'])
        tree.add(['a'], ['a-id'])
        with_a_id = tree.commit('with_a')
        self.addCleanup(tree.unlock)
        state.set_parent_trees(
            [(empty_revid, tree.branch.repository.revision_tree(empty_revid))],
            [])
        entry = state._get_entry(0, path_utf8='a')
        self.build_tree(['a'])
        # Add one where we don't provide the stat or sha already
        self.assertEqual(('', 'a', 'a-id'), entry[0])
        self.assertEqual(('f', '', 0, False, dirstate.DirState.NULLSTAT),
                         entry[1][0])
        # Flush the buffers to disk
        state.save()
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)

        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual(None, link_or_sha1)

        # The dirblock entry should not have cached the file's sha1 (too new)
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
                         entry[1][0])
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
                         state._dirblock_state)
        mode = stat_value.st_mode
        self.assertEqual([('is_exec', mode, False)], state._log)

        state.save()
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)

        # If we do it again right away, we don't know if the file has changed
        # so we will re-read the file. Roll the clock back so the file is
        # guaranteed to look too new.
        state.adjust_time(-10)
        del state._log[:]

        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual([('is_exec', mode, False)], state._log)
        self.assertEqual(None, link_or_sha1)
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
                         state._dirblock_state)
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
                         entry[1][0])
        state.save()

        # If it is cachable (the clock has moved forward) but new it still
        # won't calculate the sha or cache it.
        state.adjust_time(+20)
        del state._log[:]
        link_or_sha1 = dirstate.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual(None, link_or_sha1)
        self.assertEqual([('is_exec', mode, False)], state._log)
        self.assertEqual(('f', '', 14, False, dirstate.DirState.NULLSTAT),
                         entry[1][0])

        # If the file is no longer new, and the clock has been moved forward
        # sufficiently, it will cache the sha.
        del state._log[:]
        state.set_parent_trees(
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
            [])
        entry = state._get_entry(0, path_utf8='a')

        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
                         link_or_sha1)
        self.assertEqual([('is_exec', mode, False), ('sha1', 'a')],
                          state._log)
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
                         entry[1][0])

        # Subsequent calls will just return the cached value
        del state._log[:]
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6',
                         link_or_sha1)
        self.assertEqual([], state._log)
        self.assertEqual(('f', link_or_sha1, 14, False, packed_stat),
                         entry[1][0])

    def test_update_entry_symlink(self):
        """Update entry should read symlinks."""
        self.requireFeature(tests.SymlinkFeature)
        state, entry = self.get_state_with_a()
        state.save()
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)
        os.symlink('target', 'a')

        state.adjust_time(-10) # Make the symlink look new
        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('target', link_or_sha1)
        self.assertEqual([('read_link', 'a', '')], state._log)
        # Dirblock is not updated (the link is too new)
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
                         entry[1])
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
                         state._dirblock_state)

        # Because the stat_value looks new, we should re-read the target
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('target', link_or_sha1)
        self.assertEqual([('read_link', 'a', ''),
                          ('read_link', 'a', ''),
                         ], state._log)
        self.assertEqual([('l', '', 6, False, dirstate.DirState.NULLSTAT)],
                         entry[1])
        state.adjust_time(+20) # Skip into the future, all files look old
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('target', link_or_sha1)
        # We need to re-read the link because only now can we cache it
        self.assertEqual([('read_link', 'a', ''),
                          ('read_link', 'a', ''),
                          ('read_link', 'a', ''),
                         ], state._log)
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
                         entry[1])

        # Another call won't re-read the link
        self.assertEqual([('read_link', 'a', ''),
                          ('read_link', 'a', ''),
                          ('read_link', 'a', ''),
                         ], state._log)
        link_or_sha1 = self.update_entry(state, entry, abspath='a',
                                          stat_value=stat_value)
        self.assertEqual('target', link_or_sha1)
        self.assertEqual([('l', 'target', 6, False, packed_stat)],
                         entry[1])

    def do_update_entry(self, state, entry, abspath):
        stat_value = os.lstat(abspath)
        return self.update_entry(state, entry, abspath, stat_value)

    def test_update_entry_dir(self):
        state, entry = self.get_state_with_a()
        self.build_tree(['a/'])
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))

    def test_update_entry_dir_unchanged(self):
        state, entry = self.get_state_with_a()
        self.build_tree(['a/'])
        state.adjust_time(+20)
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
                         state._dirblock_state)
        state.save()
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)
        self.assertIs(None, self.do_update_entry(state, entry, 'a'))
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)

    def test_update_entry_file_unchanged(self):
        state, _ = self.get_state_with_a()
        tree = self.make_branch_and_tree('tree')
        tree.lock_write()
        self.build_tree(['tree/a'])
        tree.add(['a'], ['a-id'])
        with_a_id = tree.commit('witha')
        self.addCleanup(tree.unlock)
        state.set_parent_trees(
            [(with_a_id, tree.branch.repository.revision_tree(with_a_id))],
            [])
        entry = state._get_entry(0, path_utf8='a')
        self.build_tree(['a'])
        sha1sum = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
        state.adjust_time(+20)
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
        self.assertEqual(dirstate.DirState.IN_MEMORY_MODIFIED,
                         state._dirblock_state)
        state.save()
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)
        self.assertEqual(sha1sum, self.do_update_entry(state, entry, 'a'))
        self.assertEqual(dirstate.DirState.IN_MEMORY_UNMODIFIED,
                         state._dirblock_state)

    def test_update_entry_tree_reference(self):
        state = test_dirstate.InstrumentedDirState.initialize('dirstate')
        self.addCleanup(state.unlock)
        state.add('r', 'r-id', 'tree-reference', None, '')
        self.build_tree(['r/'])
        entry = state._get_entry(0, path_utf8='r')
        self.do_update_entry(state, entry, 'r')
        entry = state._get_entry(0, path_utf8='r')
        self.assertEqual('t', entry[1][0][0])

    def create_and_test_file(self, state, entry):
        """Create a file at 'a' and verify the state finds it during update.

        The state should already be versioning *something* at 'a'. This makes
        sure that state.update_entry recognizes it as a file.
        """
        self.build_tree(['a'])
        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)

        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
        self.assertEqual(None, link_or_sha1)
        self.assertEqual([('f', '', 14, False, dirstate.DirState.NULLSTAT)],
                         entry[1])
        return packed_stat

    def create_and_test_dir(self, state, entry):
        """Create a directory at 'a' and verify the state finds it.

        The state should already be versioning *something* at 'a'. This makes
        sure that state.update_entry recognizes it as a directory.
        """
        self.build_tree(['a/'])
        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)

        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
        self.assertIs(None, link_or_sha1)
        self.assertEqual([('d', '', 0, False, packed_stat)], entry[1])

        return packed_stat

    # FIXME: Add unicode version
    def create_and_test_symlink(self, state, entry):
        """Create a symlink at 'a' and verify the state finds it.

        The state should already be versioning *something* at 'a'. This makes
        sure that state.update_entry recognizes it as a symlink.

        This should not be called if this platform does not have symlink
        support.
        """
        # caller should care about skipping test on platforms without symlinks
        os.symlink('path/to/foo', 'a')

        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)

        link_or_sha1 = self.do_update_entry(state, entry, abspath='a')
        self.assertEqual('path/to/foo', link_or_sha1)
        self.assertEqual([('l', 'path/to/foo', 11, False, packed_stat)],
                         entry[1])
        return packed_stat

    def test_update_file_to_dir(self):
        """If a file changes to a directory we return None for the sha.
        We also update the inventory record.
        """
        state, entry = self.get_state_with_a()
        # The file sha1 won't be cached unless the file is old
        state.adjust_time(+10)
        self.create_and_test_file(state, entry)
        os.remove('a')
        self.create_and_test_dir(state, entry)

    def test_update_file_to_symlink(self):
        """File becomes a symlink"""
        self.requireFeature(tests.SymlinkFeature)
        state, entry = self.get_state_with_a()
        # The file sha1 won't be cached unless the file is old
        state.adjust_time(+10)
        self.create_and_test_file(state, entry)
        os.remove('a')
        self.create_and_test_symlink(state, entry)

    def test_update_dir_to_file(self):
        """Directory becoming a file updates the entry."""
        state, entry = self.get_state_with_a()
        # The file sha1 won't be cached unless the file is old
        state.adjust_time(+10)
        self.create_and_test_dir(state, entry)
        os.rmdir('a')
        self.create_and_test_file(state, entry)

    def test_update_dir_to_symlink(self):
        """Directory becomes a symlink"""
        self.requireFeature(tests.SymlinkFeature)
        state, entry = self.get_state_with_a()
        # The symlink target won't be cached if it isn't old
        state.adjust_time(+10)
        self.create_and_test_dir(state, entry)
        os.rmdir('a')
        self.create_and_test_symlink(state, entry)

    def test_update_symlink_to_file(self):
        """Symlink becomes a file"""
        self.requireFeature(tests.SymlinkFeature)
        state, entry = self.get_state_with_a()
        # The symlink and file info won't be cached unless old
        state.adjust_time(+10)
        self.create_and_test_symlink(state, entry)
        os.remove('a')
        self.create_and_test_file(state, entry)

    def test_update_symlink_to_dir(self):
        """Symlink becomes a directory"""
        self.requireFeature(tests.SymlinkFeature)
        state, entry = self.get_state_with_a()
        # The symlink target won't be cached if it isn't old
        state.adjust_time(+10)
        self.create_and_test_symlink(state, entry)
        os.remove('a')
        self.create_and_test_dir(state, entry)

    def test__is_executable_win32(self):
        state, entry = self.get_state_with_a()
        self.build_tree(['a'])

        # Make sure we are using the win32 implementation of _is_executable
        state._is_executable = state._is_executable_win32

        # The file on disk is not executable, but we are marking it as though
        # it is. With _is_executable_win32 we ignore what is on disk.
        entry[1][0] = ('f', '', 0, True, dirstate.DirState.NULLSTAT)

        stat_value = os.lstat('a')
        packed_stat = dirstate.pack_stat(stat_value)

        state.adjust_time(-10) # Make sure everything is new
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)

        # The row is updated, but the executable bit stays set.
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
                         entry[1])

        # Make the disk object look old enough to cache (but it won't cache the
        # sha as it is a new file).
        state.adjust_time(+20)
        digest = 'b50e5406bb5e153ebbeb20268fcf37c87e1ecfb6'
        self.update_entry(state, entry, abspath='a', stat_value=stat_value)
        self.assertEqual([('f', '', 14, True, dirstate.DirState.NULLSTAT)],
            entry[1])

    def _prepare_tree(self):
        # Create a tree
        text = 'Hello World\n'
        tree = self.make_branch_and_tree('tree')
        self.build_tree_contents([('tree/a file', text)])
        tree.add('a file', 'a-file-id')
        # Note: dirstate does not sha prior to the first commit
        # so commit now in order for the test to work
        tree.commit('first')
        return tree, text

    def test_sha1provider_sha1_used(self):
        tree, text = self._prepare_tree()
        state = dirstate.DirState.from_tree(tree, 'dirstate',
            UppercaseSHA1Provider())
        self.addCleanup(state.unlock)
        expected_sha = osutils.sha_string(text.upper() + "foo")
        entry = state._get_entry(0, path_utf8='a file')
        state._sha_cutoff_time()
        state._cutoff_time += 10
        sha1 = self.update_entry(state, entry, 'tree/a file',
                                 os.lstat('tree/a file'))
        self.assertEqual(expected_sha, sha1)

    def test_sha1provider_stat_and_sha1_used(self):
        tree, text = self._prepare_tree()
        tree.lock_write()
        self.addCleanup(tree.unlock)
        state = tree._current_dirstate()
        state._sha1_provider = UppercaseSHA1Provider()
        # If we used the standard provider, it would look like nothing has
        # changed
        file_ids_changed = [change[0] for change
                            in tree.iter_changes(tree.basis_tree())]
        self.assertEqual(['a-file-id'], file_ids_changed)


class UppercaseSHA1Provider(dirstate.SHA1Provider):
    """A custom SHA1Provider."""

    def sha1(self, abspath):
        return self.stat_and_sha1(abspath)[1]

    def stat_and_sha1(self, abspath):
        file_obj = file(abspath, 'rb')
        try:
            statvalue = os.fstat(file_obj.fileno())
            text = ''.join(file_obj.readlines())
            sha1 = osutils.sha_string(text.upper() + "foo")
        finally:
            file_obj.close()
        return statvalue, sha1


class TestProcessEntry(test_dirstate.TestCaseWithDirState):

    # Set by load_tests
    _process_entry = None

    def setUp(self):
        super(TestProcessEntry, self).setUp()
        self.overrideAttr(dirstate, '_process_entry', self._process_entry)

    def assertChangedFileIds(self, expected, tree):
        tree.lock_read()
        try:
            file_ids = [info[0] for info
                        in tree.iter_changes(tree.basis_tree())]
        finally:
            tree.unlock()
        self.assertEqual(sorted(expected), sorted(file_ids))

    def test_exceptions_raised(self):
        # This is a direct test of bug #495023, it relies on osutils.is_inside
        # getting called in an inner function. Which makes it a bit brittle,
        # but at least it does reproduce the bug.
        tree = self.make_branch_and_tree('tree')
        self.build_tree(['tree/file', 'tree/dir/', 'tree/dir/sub',
                         'tree/dir2/', 'tree/dir2/sub2'])
        tree.add(['file', 'dir', 'dir/sub', 'dir2', 'dir2/sub2'])
        tree.commit('first commit')
        tree.lock_read()
        self.addCleanup(tree.unlock)
        basis_tree = tree.basis_tree()
        def is_inside_raises(*args, **kwargs):
            raise RuntimeError('stop this')
        self.overrideAttr(osutils, 'is_inside', is_inside_raises)
        self.assertListRaises(RuntimeError, tree.iter_changes, basis_tree)

    def test_simple_changes(self):
        tree = self.make_branch_and_tree('tree')
        self.build_tree(['tree/file'])
        tree.add(['file'], ['file-id'])
        self.assertChangedFileIds([tree.get_root_id(), 'file-id'], tree)
        tree.commit('one')
        self.assertChangedFileIds([], tree)

    def test_sha1provider_stat_and_sha1_used(self):
        tree = self.make_branch_and_tree('tree')
        self.build_tree(['tree/file'])
        tree.add(['file'], ['file-id'])
        tree.commit('one')
        tree.lock_write()
        self.addCleanup(tree.unlock)
        state = tree._current_dirstate()
        state._sha1_provider = UppercaseSHA1Provider()
        self.assertChangedFileIds(['file-id'], tree)

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.