test_content_filters.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » per_workingtree » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » per_workingtree » test_content_filters.py
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Tests for content filtering conformance"""

import os

from bzrlib.bzrdir import BzrDir
from bzrlib.filters import ContentFilter
from bzrlib.switch import switch
from bzrlib.workingtree import WorkingTree
from bzrlib.tests.per_workingtree import TestCaseWithWorkingTree


def _converter_helper(chunks, fn):
    result = []
    for chunk in chunks:
        result.append(getattr(chunk, fn)())
    return iter(result)


def _swapcase(chunks, context=None):
    """A converter that swaps the case of text."""
    return _converter_helper(chunks, 'swapcase')


def _uppercase(chunks, context=None):
    """A converter that converts text to uppercase."""
    return _converter_helper(chunks, 'upper')


def _lowercase(chunks, context=None):
    """A converter that converts text to lowercase."""
    return _converter_helper(chunks, 'lower')


_trailer_string = '\nend string\n'


def _append_text(chunks, context=None):
    """A content filter that appends a string to the end of the file.

    This tests filters that change the length."""
    return chunks + [_trailer_string]


def _remove_appended_text(chunks, context=None):
    """Remove the appended text."""

    text = ''.join(chunks)
    if text.endswith(_trailer_string):
        text = text[:-len(_trailer_string)]
    return [text]


class TestWorkingTreeWithContentFilters(TestCaseWithWorkingTree):

    def create_cf_tree(self, txt_reader, txt_writer, dir='.'):
        tree = self.make_branch_and_tree(dir)
        def _content_filter_stack(path=None, file_id=None):
            if path.endswith('.txt'):
                return [ContentFilter(txt_reader, txt_writer)]
            else:
                return []
        tree._content_filter_stack = _content_filter_stack
        self.build_tree_contents([
            (dir + '/file1.txt', 'Foo Txt'),
            (dir + '/file2.bin', 'Foo Bin')])
        tree.add(['file1.txt', 'file2.bin'])
        tree.commit('commit raw content')
        txt_fileid = tree.path2id('file1.txt')
        bin_fileid = tree.path2id('file2.bin')
        return tree, txt_fileid, bin_fileid

    def create_cf_tree_with_two_revisions(self, txt_reader, txt_writer,
        dir='.'):
        tree = self.make_branch_and_tree(dir)
        def _content_filter_stack(path=None, file_id=None):
            if path.endswith('.txt'):
                return [ContentFilter(txt_reader, txt_writer)]
            else:
                return []
        tree._content_filter_stack = _content_filter_stack
        self.build_tree_contents([
            (dir + '/file1.txt', 'Foo Txt'),
            (dir + '/file2.bin', 'Foo Bin'),
            (dir + '/file3.txt', 'Bar Txt'),
            ])
        tree.add(['file1.txt', 'file2.bin', 'file3.txt'])
        tree.commit('commit raw content')
        fileid_1 = tree.path2id('file1.txt')
        fileid_2 = tree.path2id('file2.bin')
        fileid_3 = tree.path2id('file3.txt')
        # Commit another revision with various changes. We make sure
        # the change includes a modification, an addition and a deletion.
        # Renames are more complex and need a separate set of tests later.
        self.build_tree_contents([
            (dir + '/file1.txt', 'Foo ROCKS!'),
            (dir + '/file4.txt', 'Hello World'),
            ])
        tree.add(['file4.txt'])
        tree.remove(['file3.txt'], keep_files=False)
        tree.commit("change, add and rename stuff")
        fileid_4 = tree.path2id('file4.txt')
        return tree, fileid_1, fileid_2, fileid_3, fileid_4

    def patch_in_content_filter(self):
        # Patch in a custom, symmetric content filter stack. It's pretty gross
        # that we need to monkey-patch a class method to do this, bit it's
        # the easiest way currently given we don't have easy access to the
        # WorkingTree after it is created but before the filter stack is used
        # to populate content.
        def new_stack(tree, path=None, file_id=None):
            if path.endswith('.txt'):
                return [ContentFilter(_swapcase, _swapcase)]
            else:
                return []
        self.overrideAttr(WorkingTree, '_content_filter_stack', new_stack)

    def assert_basis_content(self, expected_content, branch, file_id):
        # Note: We need to use try/finally here instead of addCleanup()
        # as the latter leaves the read lock in place too long
        basis = branch.basis_tree()
        basis.lock_read()
        try:
            self.assertEqual(expected_content, basis.get_file_text(file_id))
        finally:
            basis.unlock()

    def test_symmetric_content_filtering(self):
        # test handling when read then write gives back the initial content
        tree, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_swapcase, txt_writer=_swapcase)
        # Check that the basis tree has the expected content
        basis = tree.basis_tree()
        basis.lock_read()
        self.addCleanup(basis.unlock)
        if tree.supports_content_filtering():
            expected = "fOO tXT"
        else:
            expected = "Foo Txt"
        self.assertEqual(expected, basis.get_file_text(txt_fileid))
        self.assertEqual('Foo Bin', basis.get_file_text(bin_fileid))
        # Check that the working tree has the original content
        tree.lock_read()
        self.addCleanup(tree.unlock)
        self.assertEqual('Foo Txt', tree.get_file(txt_fileid,
            filtered=False).read())
        self.assertEqual('Foo Bin', tree.get_file(bin_fileid,
            filtered=False).read())

    def test_readonly_content_filtering(self):
        # test handling with a read filter but no write filter
        tree, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_uppercase, txt_writer=None)
        # Check that the basis tree has the expected content
        basis = tree.basis_tree()
        basis.lock_read()
        self.addCleanup(basis.unlock)
        if tree.supports_content_filtering():
            expected = "FOO TXT"
        else:
            expected = "Foo Txt"
        self.assertEqual(expected, basis.get_file_text(txt_fileid))
        self.assertEqual('Foo Bin', basis.get_file_text(bin_fileid))
        # We expect the workingtree content to be unchanged (for now at least)
        tree.lock_read()
        self.addCleanup(tree.unlock)
        self.assertEqual('Foo Txt', tree.get_file(txt_fileid,
            filtered=False).read())
        self.assertEqual('Foo Bin', tree.get_file(bin_fileid,
            filtered=False).read())

    def test_branch_source_filtered_target_not(self):
        # Create a source branch with content filtering
        source, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_uppercase, txt_writer=_lowercase, dir='source')
        if not source.supports_content_filtering():
            return
        self.assertFileEqual("Foo Txt", 'source/file1.txt')
        self.assert_basis_content("FOO TXT", source, txt_fileid)

        # Now branch it
        self.run_bzr('branch source target')
        target = WorkingTree.open('target')
        # Even though the content in source and target are different
        # due to different filters, iter_changes should be clean
        self.assertFileEqual("FOO TXT", 'target/file1.txt')
        changes = target.changes_from(source.basis_tree())
        self.assertFalse(changes.has_changed())

    def test_branch_source_not_filtered_target_is(self):
        # Create a source branch with content filtering
        source, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=None, txt_writer=None, dir='source')
        if not source.supports_content_filtering():
            return
        self.assertFileEqual("Foo Txt", 'source/file1.txt')
        self.assert_basis_content("Foo Txt", source, txt_fileid)

        # Now patch in content filtering and branch the source
        self.patch_in_content_filter()
        self.run_bzr('branch source target')
        target = WorkingTree.open('target')
        # Even though the content in source and target are different
        # due to different filters, iter_changes should be clean
        self.assertFileEqual("fOO tXT", 'target/file1.txt')
        changes = target.changes_from(source.basis_tree())
        self.assertFalse(changes.has_changed())

    def test_path_content_summary(self):
        """path_content_summary should always talk about the canonical form."""
        # see https://bugs.launchpad.net/bzr/+bug/415508
        #
        # set up a tree where the canonical form has a string added to the
        # end
        source, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_append_text,
            txt_writer=_remove_appended_text,
            dir='source')
        if not source.supports_content_filtering():
            return
        source.lock_read()
        self.addCleanup(source.unlock)

        expected_canonical_form = 'Foo Txt\nend string\n'
        self.assertEquals(source.get_file(txt_fileid, filtered=True).read(),
            expected_canonical_form)
        self.assertEquals(source.get_file(txt_fileid, filtered=False).read(),
            'Foo Txt')

        # results are: kind, size, executable, sha1_or_link_target
        result = source.path_content_summary('file1.txt')

        self.assertEquals(result,
            ('file', None, False, None))

        # we could give back the length of the canonical form, but in general
        # that will be expensive to compute, so it's acceptable to just return
        # None.

    def test_content_filtering_applied_on_pull(self):
        # Create a source branch with two revisions
        source, fileid_1, fileid_2, fileid_3, fileid_4 = \
            self.create_cf_tree_with_two_revisions(txt_reader=None,
            txt_writer=None, dir='source')
        if not source.supports_content_filtering():
            return
        self.assertFileEqual("Foo ROCKS!", 'source/file1.txt')
        self.assert_basis_content("Foo ROCKS!", source, fileid_1)

        # Now patch in content filtering and branch from revision 1
        self.patch_in_content_filter()
        self.run_bzr('branch -r1 source target')
        target = WorkingTree.open('target')
        self.assert_basis_content("Foo Txt", target, fileid_1)
        self.assertFileEqual("fOO tXT", 'target/file1.txt')
        self.assert_basis_content("Foo Bin", target, fileid_2)
        self.assertFileEqual("Foo Bin", 'target/file2.bin')
        self.assert_basis_content("Bar Txt", target, fileid_3)
        self.assertFileEqual("bAR tXT", 'target/file3.txt')

        # Pull the latter change and check the target tree is updated
        self.run_bzr('pull -d target')
        self.assert_basis_content("Foo ROCKS!", target, fileid_1)
        self.assertFileEqual("fOO rocks!", 'target/file1.txt')
        self.assert_basis_content("Foo Bin", target, fileid_2)
        self.assert_basis_content("Hello World", target, fileid_4)
        self.assertFileEqual("hELLO wORLD", 'target/file4.txt')

    def test_content_filtering_applied_on_merge(self):
        # Create a source branch with two revisions
        source, fileid_1, fileid_2, fileid_3, fileid_4 = \
            self.create_cf_tree_with_two_revisions(txt_reader=None,
            txt_writer=None, dir='source')
        if not source.supports_content_filtering():
            return
        self.assert_basis_content("Foo ROCKS!", source, fileid_1)
        self.assertFileEqual("Foo ROCKS!", 'source/file1.txt')
        self.assert_basis_content("Foo Bin", source, fileid_2)
        self.assert_basis_content("Hello World", source, fileid_4)
        self.assertFileEqual("Hello World", 'source/file4.txt')

        # Now patch in content filtering and branch from revision 1
        self.patch_in_content_filter()
        self.run_bzr('branch -r1 source target')
        target = WorkingTree.open('target')
        self.assert_basis_content("Foo Txt", target, fileid_1)
        self.assertFileEqual("fOO tXT", 'target/file1.txt')
        self.assertFileEqual("Foo Bin", 'target/file2.bin')
        self.assertFileEqual("bAR tXT", 'target/file3.txt')

        # Merge the latter change and check the target tree is updated
        self.run_bzr('merge -d target source')
        self.assertFileEqual("fOO rocks!", 'target/file1.txt')
        self.assertFileEqual("hELLO wORLD", 'target/file4.txt')

        # Commit the merge and check the right content is stored
        target.commit("merge file1.txt changes from source")
        self.assert_basis_content("Foo ROCKS!", target, fileid_1)
        self.assert_basis_content("Hello World", target, fileid_4)

    def test_content_filtering_applied_on_switch(self):
        # Create a source branch with two revisions
        source, fileid_1, fileid_2, fileid_3, fileid_4 = \
            self.create_cf_tree_with_two_revisions(txt_reader=None,
            txt_writer=None, dir='branch-a')
        if not source.supports_content_filtering():
            return

        # Now patch in content filtering and branch from revision 1
        self.patch_in_content_filter()
        self.run_bzr('branch -r1 branch-a branch-b')

        # Now create a lightweight checkout referring to branch-b
        self.run_bzr('checkout --lightweight branch-b checkout')
        self.assertFileEqual("fOO tXT", 'checkout/file1.txt')

        # Switch it to branch-b and check the tree is updated
        checkout_control_dir = BzrDir.open_containing('checkout')[0]
        switch(checkout_control_dir, source.branch)
        self.assertFileEqual("fOO rocks!", 'checkout/file1.txt')
        self.assertFileEqual("hELLO wORLD", 'checkout/file4.txt')

    def test_content_filtering_applied_on_revert_delete(self):
        # Create a source branch with content filtering
        source, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_uppercase, txt_writer=_lowercase, dir='source')
        if not source.supports_content_filtering():
            return
        self.assertFileEqual("Foo Txt", 'source/file1.txt')
        self.assert_basis_content("FOO TXT", source, txt_fileid)

        # Now delete the file, revert it and check the content
        os.unlink('source/file1.txt')
        self.assertFalse(os.path.exists('source/file1.txt'))
        source.revert(['file1.txt'])
        self.assertTrue(os.path.exists('source/file1.txt'))
        # Note: we don't get back exactly what was in the tree
        # previously because lower(upper(text)) is a lossy transformation
        self.assertFileEqual("foo txt", 'source/file1.txt')

    def test_content_filtering_applied_on_revert_rename(self):
        # Create a source branch with content filtering
        source, txt_fileid, bin_fileid = self.create_cf_tree(
            txt_reader=_uppercase, txt_writer=_lowercase, dir='source')
        if not source.supports_content_filtering():
            return
        self.assertFileEqual("Foo Txt", 'source/file1.txt')
        self.assert_basis_content("FOO TXT", source, txt_fileid)

        # Now modify & rename a file, revert it and check the content
        self.build_tree_contents([
            ('source/file1.txt', 'Foo Txt with new content')])
        source.rename_one('file1.txt', 'file1.bin')
        self.assertTrue(os.path.exists('source/file1.bin'))
        self.assertFalse(os.path.exists('source/file1.txt'))
        self.assertFileEqual("Foo Txt with new content", 'source/file1.bin')
        source.revert(['file1.bin'])
        self.assertFalse(os.path.exists('source/file1.bin'))
        self.assertTrue(os.path.exists('source/file1.txt'))
        # Note: we don't get back exactly what was in the tree
        # previously because lower(upper(text)) is a lossy transformation
        self.assertFileEqual("foo txt", 'source/file1.txt')
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.