test_groupcompress.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » test_groupcompress.py
# Copyright (C) 2008, 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Tests for group compression."""

import zlib

from bzrlib import (
    btree_index,
    groupcompress,
    errors,
    index as _mod_index,
    osutils,
    tests,
    trace,
    versionedfile,
    )
from bzrlib.osutils import sha_string
from bzrlib.tests.test__groupcompress import compiled_groupcompress_feature


def load_tests(standard_tests, module, loader):
    """Parameterize tests for all versions of groupcompress."""
    to_adapt, result = tests.split_suite_by_condition(
        standard_tests, tests.condition_isinstance(TestAllGroupCompressors))
    scenarios = [
        ('python', {'compressor': groupcompress.PythonGroupCompressor}),
        ]
    if compiled_groupcompress_feature.available():
        scenarios.append(('C',
            {'compressor': groupcompress.PyrexGroupCompressor}))
    return tests.multiply_tests(to_adapt, scenarios, result)


class TestGroupCompressor(tests.TestCase):

    def _chunks_to_repr_lines(self, chunks):
        return '\n'.join(map(repr, ''.join(chunks).split('\n')))

    def assertEqualDiffEncoded(self, expected, actual):
        """Compare the actual content to the expected content.

        :param expected: A group of chunks that we expect to see
        :param actual: The measured 'chunks'

        We will transform the chunks back into lines, and then run 'repr()'
        over them to handle non-ascii characters.
        """
        self.assertEqualDiff(self._chunks_to_repr_lines(expected),
                             self._chunks_to_repr_lines(actual))


class TestAllGroupCompressors(TestGroupCompressor):
    """Tests for GroupCompressor"""

    compressor = None # Set by multiply_tests

    def test_empty_delta(self):
        compressor = self.compressor()
        self.assertEqual([], compressor.chunks)

    def test_one_nosha_delta(self):
        # diff against NUKK
        compressor = self.compressor()
        sha1, start_point, end_point, _ = compressor.compress(('label',),
            'strange\ncommon\n', None)
        self.assertEqual(sha_string('strange\ncommon\n'), sha1)
        expected_lines = 'f' '\x0f' 'strange\ncommon\n'
        self.assertEqual(expected_lines, ''.join(compressor.chunks))
        self.assertEqual(0, start_point)
        self.assertEqual(sum(map(len, expected_lines)), end_point)

    def test_empty_content(self):
        compressor = self.compressor()
        # Adding empty bytes should return the 'null' record
        sha1, start_point, end_point, kind = compressor.compress(('empty',),
                                                                 '', None)
        self.assertEqual(0, start_point)
        self.assertEqual(0, end_point)
        self.assertEqual('fulltext', kind)
        self.assertEqual(groupcompress._null_sha1, sha1)
        self.assertEqual(0, compressor.endpoint)
        self.assertEqual([], compressor.chunks)
        # Even after adding some content
        compressor.compress(('content',), 'some\nbytes\n', None)
        self.assertTrue(compressor.endpoint > 0)
        sha1, start_point, end_point, kind = compressor.compress(('empty2',),
                                                                 '', None)
        self.assertEqual(0, start_point)
        self.assertEqual(0, end_point)
        self.assertEqual('fulltext', kind)
        self.assertEqual(groupcompress._null_sha1, sha1)

    def test_extract_from_compressor(self):
        # Knit fetching will try to reconstruct texts locally which results in
        # reading something that is in the compressor stream already.
        compressor = self.compressor()
        sha1_1, _, _, _ = compressor.compress(('label',),
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
        expected_lines = list(compressor.chunks)
        sha1_2, _, end_point, _ = compressor.compress(('newlabel',),
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
        # get the first out
        self.assertEqual(('strange\ncommon long line\n'
                          'that needs a 16 byte match\n', sha1_1),
                         compressor.extract(('label',)))
        # and the second
        self.assertEqual(('common long line\nthat needs a 16 byte match\n'
                          'different\n', sha1_2),
                         compressor.extract(('newlabel',)))

    def test_pop_last(self):
        compressor = self.compressor()
        _, _, _, _ = compressor.compress(('key1',),
            'some text\nfor the first entry\n', None)
        expected_lines = list(compressor.chunks)
        _, _, _, _ = compressor.compress(('key2',),
            'some text\nfor the second entry\n', None)
        compressor.pop_last()
        self.assertEqual(expected_lines, compressor.chunks)


class TestPyrexGroupCompressor(TestGroupCompressor):

    _test_needs_features = [compiled_groupcompress_feature]
    compressor = groupcompress.PyrexGroupCompressor

    def test_stats(self):
        compressor = self.compressor()
        compressor.compress(('label',),
                            'strange\n'
                            'common very very long line\n'
                            'plus more text\n', None)
        compressor.compress(('newlabel',),
                            'common very very long line\n'
                            'plus more text\n'
                            'different\n'
                            'moredifferent\n', None)
        compressor.compress(('label3',),
                            'new\n'
                            'common very very long line\n'
                            'plus more text\n'
                            'different\n'
                            'moredifferent\n', None)
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)

    def test_two_nosha_delta(self):
        compressor = self.compressor()
        sha1_1, _, _, _ = compressor.compress(('label',),
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
        expected_lines = list(compressor.chunks)
        sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
        self.assertEqual(sha_string('common long line\n'
                                    'that needs a 16 byte match\n'
                                    'different\n'), sha1_2)
        expected_lines.extend([
            # 'delta', delta length
            'd\x0f',
            # source and target length
            '\x36',
            # copy the line common
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
            # add the line different, and the trailing newline
            '\x0adifferent\n', # insert 10 bytes
            ])
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
        self.assertEqual(sum(map(len, expected_lines)), end_point)

    def test_three_nosha_delta(self):
        # The first interesting test: make a change that should use lines from
        # both parents.
        compressor = self.compressor()
        sha1_1, _, _, _ = compressor.compress(('label',),
            'strange\ncommon very very long line\nwith some extra text\n', None)
        sha1_2, _, _, _ = compressor.compress(('newlabel',),
            'different\nmoredifferent\nand then some more\n', None)
        expected_lines = list(compressor.chunks)
        sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
            'new\ncommon very very long line\nwith some extra text\n'
            'different\nmoredifferent\nand then some more\n',
            None)
        self.assertEqual(
            sha_string('new\ncommon very very long line\nwith some extra text\n'
                       'different\nmoredifferent\nand then some more\n'),
            sha1_3)
        expected_lines.extend([
            # 'delta', delta length
            'd\x0b',
            # source and target length
            '\x5f'
            # insert new
            '\x03new',
            # Copy of first parent 'common' range
            '\x91\x09\x31' # copy, offset 0x09, 0x31 bytes
            # Copy of second parent 'different' range
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
            ])
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
        self.assertEqual(sum(map(len, expected_lines)), end_point)


class TestPythonGroupCompressor(TestGroupCompressor):

    compressor = groupcompress.PythonGroupCompressor

    def test_stats(self):
        compressor = self.compressor()
        compressor.compress(('label',),
                            'strange\n'
                            'common very very long line\n'
                            'plus more text\n', None)
        compressor.compress(('newlabel',),
                            'common very very long line\n'
                            'plus more text\n'
                            'different\n'
                            'moredifferent\n', None)
        compressor.compress(('label3',),
                            'new\n'
                            'common very very long line\n'
                            'plus more text\n'
                            'different\n'
                            'moredifferent\n', None)
        self.assertAlmostEqual(1.9, compressor.ratio(), 1)

    def test_two_nosha_delta(self):
        compressor = self.compressor()
        sha1_1, _, _, _ = compressor.compress(('label',),
            'strange\ncommon long line\nthat needs a 16 byte match\n', None)
        expected_lines = list(compressor.chunks)
        sha1_2, start_point, end_point, _ = compressor.compress(('newlabel',),
            'common long line\nthat needs a 16 byte match\ndifferent\n', None)
        self.assertEqual(sha_string('common long line\n'
                                    'that needs a 16 byte match\n'
                                    'different\n'), sha1_2)
        expected_lines.extend([
            # 'delta', delta length
            'd\x0f',
            # target length
            '\x36',
            # copy the line common
            '\x91\x0a\x2c', #copy, offset 0x0a, len 0x2c
            # add the line different, and the trailing newline
            '\x0adifferent\n', # insert 10 bytes
            ])
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
        self.assertEqual(sum(map(len, expected_lines)), end_point)

    def test_three_nosha_delta(self):
        # The first interesting test: make a change that should use lines from
        # both parents.
        compressor = self.compressor()
        sha1_1, _, _, _ = compressor.compress(('label',),
            'strange\ncommon very very long line\nwith some extra text\n', None)
        sha1_2, _, _, _ = compressor.compress(('newlabel',),
            'different\nmoredifferent\nand then some more\n', None)
        expected_lines = list(compressor.chunks)
        sha1_3, start_point, end_point, _ = compressor.compress(('label3',),
            'new\ncommon very very long line\nwith some extra text\n'
            'different\nmoredifferent\nand then some more\n',
            None)
        self.assertEqual(
            sha_string('new\ncommon very very long line\nwith some extra text\n'
                       'different\nmoredifferent\nand then some more\n'),
            sha1_3)
        expected_lines.extend([
            # 'delta', delta length
            'd\x0c',
            # target length
            '\x5f'
            # insert new
            '\x04new\n',
            # Copy of first parent 'common' range
            '\x91\x0a\x30' # copy, offset 0x0a, 0x30 bytes
            # Copy of second parent 'different' range
            '\x91\x3c\x2b' # copy, offset 0x3c, 0x2b bytes
            ])
        self.assertEqualDiffEncoded(expected_lines, compressor.chunks)
        self.assertEqual(sum(map(len, expected_lines)), end_point)


class TestGroupCompressBlock(tests.TestCase):

    def make_block(self, key_to_text):
        """Create a GroupCompressBlock, filling it with the given texts."""
        compressor = groupcompress.GroupCompressor()
        start = 0
        for key in sorted(key_to_text):
            compressor.compress(key, key_to_text[key], None)
        locs = dict((key, (start, end)) for key, (start, _, end, _)
                    in compressor.labels_deltas.iteritems())
        block = compressor.flush()
        raw_bytes = block.to_bytes()
        # Go through from_bytes(to_bytes()) so that we start with a compressed
        # content object
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)

    def test_from_empty_bytes(self):
        self.assertRaises(ValueError,
                          groupcompress.GroupCompressBlock.from_bytes, '')

    def test_from_minimal_bytes(self):
        block = groupcompress.GroupCompressBlock.from_bytes(
            'gcb1z\n0\n0\n')
        self.assertIsInstance(block, groupcompress.GroupCompressBlock)
        self.assertIs(None, block._content)
        self.assertEqual('', block._z_content)
        block._ensure_content()
        self.assertEqual('', block._content)
        self.assertEqual('', block._z_content)
        block._ensure_content() # Ensure content is safe to call 2x

    def test_from_invalid(self):
        self.assertRaises(ValueError,
                          groupcompress.GroupCompressBlock.from_bytes,
                          'this is not a valid header')

    def test_from_bytes(self):
        content = ('a tiny bit of content\n')
        z_content = zlib.compress(content)
        z_bytes = (
            'gcb1z\n' # group compress block v1 plain
            '%d\n' # Length of compressed content
            '%d\n' # Length of uncompressed content
            '%s'   # Compressed content
            ) % (len(z_content), len(content), z_content)
        block = groupcompress.GroupCompressBlock.from_bytes(
            z_bytes)
        self.assertEqual(z_content, block._z_content)
        self.assertIs(None, block._content)
        self.assertEqual(len(z_content), block._z_content_length)
        self.assertEqual(len(content), block._content_length)
        block._ensure_content()
        self.assertEqual(z_content, block._z_content)
        self.assertEqual(content, block._content)

    def test_to_bytes(self):
        content = ('this is some content\n'
                   'this content will be compressed\n')
        gcb = groupcompress.GroupCompressBlock()
        gcb.set_content(content)
        bytes = gcb.to_bytes()
        self.assertEqual(gcb._z_content_length, len(gcb._z_content))
        self.assertEqual(gcb._content_length, len(content))
        expected_header =('gcb1z\n' # group compress block v1 zlib
                          '%d\n' # Length of compressed content
                          '%d\n' # Length of uncompressed content
                         ) % (gcb._z_content_length, gcb._content_length)
        self.assertStartsWith(bytes, expected_header)
        remaining_bytes = bytes[len(expected_header):]
        raw_bytes = zlib.decompress(remaining_bytes)
        self.assertEqual(content, raw_bytes)

        # we should get the same results if using the chunked version
        gcb = groupcompress.GroupCompressBlock()
        gcb.set_chunked_content(['this is some content\n'
                                 'this content will be compressed\n'],
                                 len(content))
        old_bytes = bytes
        bytes = gcb.to_bytes()
        self.assertEqual(old_bytes, bytes)

    def test_partial_decomp(self):
        content_chunks = []
        # We need a sufficient amount of data so that zlib.decompress has
        # partial decompression to work with. Most auto-generated data
        # compresses a bit too well, we want a combination, so we combine a sha
        # hash with compressible data.
        for i in xrange(2048):
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
            content_chunks.append(next_content)
            next_sha1 = osutils.sha_string(next_content)
            content_chunks.append(next_sha1 + '\n')
        content = ''.join(content_chunks)
        self.assertEqual(158634, len(content))
        z_content = zlib.compress(content)
        self.assertEqual(57182, len(z_content))
        block = groupcompress.GroupCompressBlock()
        block._z_content = z_content
        block._z_content_length = len(z_content)
        block._compressor_name = 'zlib'
        block._content_length = 158634
        self.assertIs(None, block._content)
        block._ensure_content(100)
        self.assertIsNot(None, block._content)
        # We have decompressed at least 100 bytes
        self.assertTrue(len(block._content) >= 100)
        # We have not decompressed the whole content
        self.assertTrue(len(block._content) < 158634)
        self.assertEqualDiff(content[:len(block._content)], block._content)
        # ensuring content that we already have shouldn't cause any more data
        # to be extracted
        cur_len = len(block._content)
        block._ensure_content(cur_len - 10)
        self.assertEqual(cur_len, len(block._content))
        # Now we want a bit more content
        cur_len += 10
        block._ensure_content(cur_len)
        self.assertTrue(len(block._content) >= cur_len)
        self.assertTrue(len(block._content) < 158634)
        self.assertEqualDiff(content[:len(block._content)], block._content)
        # And now lets finish
        block._ensure_content(158634)
        self.assertEqualDiff(content, block._content)
        # And the decompressor is finalized
        self.assertIs(None, block._z_content_decompressor)

    def test__ensure_all_content(self):
        content_chunks = []
        # We need a sufficient amount of data so that zlib.decompress has
        # partial decompression to work with. Most auto-generated data
        # compresses a bit too well, we want a combination, so we combine a sha
        # hash with compressible data.
        for i in xrange(2048):
            next_content = '%d\nThis is a bit of duplicate text\n' % (i,)
            content_chunks.append(next_content)
            next_sha1 = osutils.sha_string(next_content)
            content_chunks.append(next_sha1 + '\n')
        content = ''.join(content_chunks)
        self.assertEqual(158634, len(content))
        z_content = zlib.compress(content)
        self.assertEqual(57182, len(z_content))
        block = groupcompress.GroupCompressBlock()
        block._z_content = z_content
        block._z_content_length = len(z_content)
        block._compressor_name = 'zlib'
        block._content_length = 158634
        self.assertIs(None, block._content)
        # The first _ensure_content got all of the required data
        block._ensure_content(158634)
        self.assertEqualDiff(content, block._content)
        # And we should have released the _z_content_decompressor since it was
        # fully consumed
        self.assertIs(None, block._z_content_decompressor)

    def test__dump(self):
        dup_content = 'some duplicate content\nwhich is sufficiently long\n'
        key_to_text = {('1',): dup_content + '1 unique\n',
                       ('2',): dup_content + '2 extra special\n'}
        locs, block = self.make_block(key_to_text)
        self.assertEqual([('f', len(key_to_text[('1',)])),
                          ('d', 21, len(key_to_text[('2',)]),
                           [('c', 2, len(dup_content)),
                            ('i', len('2 extra special\n'), '')
                           ]),
                         ], block._dump())


class TestCaseWithGroupCompressVersionedFiles(
        tests.TestCaseWithMemoryTransport):

    def make_test_vf(self, create_graph, keylength=1, do_cleanup=True,
                     dir='.', inconsistency_fatal=True):
        t = self.get_transport(dir)
        t.ensure_base()
        vf = groupcompress.make_pack_factory(graph=create_graph,
            delta=False, keylength=keylength,
            inconsistency_fatal=inconsistency_fatal)(t)
        if do_cleanup:
            self.addCleanup(groupcompress.cleanup_pack_group, vf)
        return vf


class TestGroupCompressVersionedFiles(TestCaseWithGroupCompressVersionedFiles):

    def make_g_index(self, name, ref_lists=0, nodes=[]):
        builder = btree_index.BTreeBuilder(ref_lists)
        for node, references, value in nodes:
            builder.add_node(node, references, value)
        stream = builder.finish()
        trans = self.get_transport()
        size = trans.put_file(name, stream)
        return btree_index.BTreeGraphIndex(trans, name, size)

    def make_g_index_missing_parent(self):
        graph_index = self.make_g_index('missing_parent', 1,
            [(('parent', ), '2 78 2 10', ([],)),
             (('tip', ), '2 78 2 10',
              ([('parent', ), ('missing-parent', )],)),
              ])
        return graph_index

    def test_get_record_stream_as_requested(self):
        # Consider promoting 'as-requested' to general availability, and
        # make this a VF interface test
        vf = self.make_test_vf(False, dir='source')
        vf.add_lines(('a',), (), ['lines\n'])
        vf.add_lines(('b',), (), ['lines\n'])
        vf.add_lines(('c',), (), ['lines\n'])
        vf.add_lines(('d',), (), ['lines\n'])
        vf.writer.end()
        keys = [record.key for record in vf.get_record_stream(
                    [('a',), ('b',), ('c',), ('d',)],
                    'as-requested', False)]
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
        keys = [record.key for record in vf.get_record_stream(
                    [('b',), ('a',), ('d',), ('c',)],
                    'as-requested', False)]
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)

        # It should work even after being repacked into another VF
        vf2 = self.make_test_vf(False, dir='target')
        vf2.insert_record_stream(vf.get_record_stream(
                    [('b',), ('a',), ('d',), ('c',)], 'as-requested', False))
        vf2.writer.end()

        keys = [record.key for record in vf2.get_record_stream(
                    [('a',), ('b',), ('c',), ('d',)],
                    'as-requested', False)]
        self.assertEqual([('a',), ('b',), ('c',), ('d',)], keys)
        keys = [record.key for record in vf2.get_record_stream(
                    [('b',), ('a',), ('d',), ('c',)],
                    'as-requested', False)]
        self.assertEqual([('b',), ('a',), ('d',), ('c',)], keys)

    def test_insert_record_stream_reuses_blocks(self):
        vf = self.make_test_vf(True, dir='source')
        def grouped_stream(revision_ids, first_parents=()):
            parents = first_parents
            for revision_id in revision_ids:
                key = (revision_id,)
                record = versionedfile.FulltextContentFactory(
                    key, parents, None,
                    'some content that is\n'
                    'identical except for\n'
                    'revision_id:%s\n' % (revision_id,))
                yield record
                parents = (key,)
        # One group, a-d
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
        # Second group, e-h
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
                                               first_parents=(('d',),)))
        block_bytes = {}
        stream = vf.get_record_stream([(r,) for r in 'abcdefgh'],
                                      'unordered', False)
        num_records = 0
        for record in stream:
            if record.key in [('a',), ('e',)]:
                self.assertEqual('groupcompress-block', record.storage_kind)
            else:
                self.assertEqual('groupcompress-block-ref',
                                 record.storage_kind)
            block_bytes[record.key] = record._manager._block._z_content
            num_records += 1
        self.assertEqual(8, num_records)
        for r in 'abcd':
            key = (r,)
            self.assertIs(block_bytes[key], block_bytes[('a',)])
            self.assertNotEqual(block_bytes[key], block_bytes[('e',)])
        for r in 'efgh':
            key = (r,)
            self.assertIs(block_bytes[key], block_bytes[('e',)])
            self.assertNotEqual(block_bytes[key], block_bytes[('a',)])
        # Now copy the blocks into another vf, and ensure that the blocks are
        # preserved without creating new entries
        vf2 = self.make_test_vf(True, dir='target')
        # ordering in 'groupcompress' order, should actually swap the groups in
        # the target vf, but the groups themselves should not be disturbed.
        def small_size_stream():
            for record in vf.get_record_stream([(r,) for r in 'abcdefgh'],
                                               'groupcompress', False):
                record._manager._full_enough_block_size = \
                    record._manager._block._content_length
                yield record
                        
        vf2.insert_record_stream(small_size_stream())
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                       'groupcompress', False)
        vf2.writer.end()
        num_records = 0
        for record in stream:
            num_records += 1
            self.assertEqual(block_bytes[record.key],
                             record._manager._block._z_content)
        self.assertEqual(8, num_records)

    def test_insert_record_stream_packs_on_the_fly(self):
        vf = self.make_test_vf(True, dir='source')
        def grouped_stream(revision_ids, first_parents=()):
            parents = first_parents
            for revision_id in revision_ids:
                key = (revision_id,)
                record = versionedfile.FulltextContentFactory(
                    key, parents, None,
                    'some content that is\n'
                    'identical except for\n'
                    'revision_id:%s\n' % (revision_id,))
                yield record
                parents = (key,)
        # One group, a-d
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
        # Second group, e-h
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
                                               first_parents=(('d',),)))
        # Now copy the blocks into another vf, and see that the
        # insert_record_stream rebuilt a new block on-the-fly because of
        # under-utilization
        vf2 = self.make_test_vf(True, dir='target')
        vf2.insert_record_stream(vf.get_record_stream(
            [(r,) for r in 'abcdefgh'], 'groupcompress', False))
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                       'groupcompress', False)
        vf2.writer.end()
        num_records = 0
        # All of the records should be recombined into a single block
        block = None
        for record in stream:
            num_records += 1
            if block is None:
                block = record._manager._block
            else:
                self.assertIs(block, record._manager._block)
        self.assertEqual(8, num_records)

    def test__insert_record_stream_no_reuse_block(self):
        vf = self.make_test_vf(True, dir='source')
        def grouped_stream(revision_ids, first_parents=()):
            parents = first_parents
            for revision_id in revision_ids:
                key = (revision_id,)
                record = versionedfile.FulltextContentFactory(
                    key, parents, None,
                    'some content that is\n'
                    'identical except for\n'
                    'revision_id:%s\n' % (revision_id,))
                yield record
                parents = (key,)
        # One group, a-d
        vf.insert_record_stream(grouped_stream(['a', 'b', 'c', 'd']))
        # Second group, e-h
        vf.insert_record_stream(grouped_stream(['e', 'f', 'g', 'h'],
                                               first_parents=(('d',),)))
        vf.writer.end()
        self.assertEqual(8, len(list(vf.get_record_stream(
                                        [(r,) for r in 'abcdefgh'],
                                        'unordered', False))))
        # Now copy the blocks into another vf, and ensure that the blocks are
        # preserved without creating new entries
        vf2 = self.make_test_vf(True, dir='target')
        # ordering in 'groupcompress' order, should actually swap the groups in
        # the target vf, but the groups themselves should not be disturbed.
        list(vf2._insert_record_stream(vf.get_record_stream(
            [(r,) for r in 'abcdefgh'], 'groupcompress', False),
            reuse_blocks=False))
        vf2.writer.end()
        # After inserting with reuse_blocks=False, we should have everything in
        # a single new block.
        stream = vf2.get_record_stream([(r,) for r in 'abcdefgh'],
                                       'groupcompress', False)
        block = None
        for record in stream:
            if block is None:
                block = record._manager._block
            else:
                self.assertIs(block, record._manager._block)

    def test_add_missing_noncompression_parent_unvalidated_index(self):
        unvalidated = self.make_g_index_missing_parent()
        combined = _mod_index.CombinedGraphIndex([unvalidated])
        index = groupcompress._GCGraphIndex(combined,
            is_locked=lambda: True, parents=True,
            track_external_parent_refs=True)
        index.scan_unvalidated_index(unvalidated)
        self.assertEqual(
            frozenset([('missing-parent',)]), index.get_missing_parents())

    def test_track_external_parent_refs(self):
        g_index = self.make_g_index('empty', 1, [])
        mod_index = btree_index.BTreeBuilder(1, 1)
        combined = _mod_index.CombinedGraphIndex([g_index, mod_index])
        index = groupcompress._GCGraphIndex(combined,
            is_locked=lambda: True, parents=True,
            add_callback=mod_index.add_nodes,
            track_external_parent_refs=True)
        index.add_records([
            (('new-key',), '2 10 2 10', [(('parent-1',), ('parent-2',))])])
        self.assertEqual(
            frozenset([('parent-1',), ('parent-2',)]),
            index.get_missing_parents())

    def make_source_with_b(self, a_parent, path):
        source = self.make_test_vf(True, dir=path)
        source.add_lines(('a',), (), ['lines\n'])
        if a_parent:
            b_parents = (('a',),)
        else:
            b_parents = ()
        source.add_lines(('b',), b_parents, ['lines\n'])
        return source

    def do_inconsistent_inserts(self, inconsistency_fatal):
        target = self.make_test_vf(True, dir='target',
                                   inconsistency_fatal=inconsistency_fatal)
        for x in range(2):
            source = self.make_source_with_b(x==1, 'source%s' % x)
            target.insert_record_stream(source.get_record_stream(
                [('b',)], 'unordered', False))

    def test_inconsistent_redundant_inserts_warn(self):
        """Should not insert a record that is already present."""
        warnings = []
        def warning(template, args):
            warnings.append(template % args)
        _trace_warning = trace.warning
        trace.warning = warning
        try:
            self.do_inconsistent_inserts(inconsistency_fatal=False)
        finally:
            trace.warning = _trace_warning
        self.assertEqual(["inconsistent details in skipped record: ('b',)"
                          " ('42 32 0 8', ((),)) ('74 32 0 8', ((('a',),),))"],
                         warnings)

    def test_inconsistent_redundant_inserts_raises(self):
        e = self.assertRaises(errors.KnitCorrupt, self.do_inconsistent_inserts,
                              inconsistency_fatal=True)
        self.assertContainsRe(str(e), "Knit.* corrupt: inconsistent details"
                              " in add_records:"
                              " \('b',\) \('42 32 0 8', \(\(\),\)\) \('74 32"
                              " 0 8', \(\(\('a',\),\),\)\)")

    def test_clear_cache(self):
        vf = self.make_source_with_b(True, 'source')
        vf.writer.end()
        for record in vf.get_record_stream([('a',), ('b',)], 'unordered',
                                           True):
            pass
        self.assertTrue(len(vf._group_cache) > 0)
        vf.clear_cache()
        self.assertEqual(0, len(vf._group_cache))



class StubGCVF(object):
    def __init__(self, canned_get_blocks=None):
        self._group_cache = {}
        self._canned_get_blocks = canned_get_blocks or []
    def _get_blocks(self, read_memos):
        return iter(self._canned_get_blocks)
    

class Test_BatchingBlockFetcher(TestCaseWithGroupCompressVersionedFiles):
    """Simple whitebox unit tests for _BatchingBlockFetcher."""
    
    def test_add_key_new_read_memo(self):
        """Adding a key with an uncached read_memo new to this batch adds that
        read_memo to the list of memos to fetch.
        """
        # locations are: index_memo, ignored, parents, ignored
        # where index_memo is: (idx, offset, len, factory_start, factory_end)
        # and (idx, offset, size) is known as the 'read_memo', identifying the
        # raw bytes needed.
        read_memo = ('fake index', 100, 50)
        locations = {
            ('key',): (read_memo + (None, None), None, None, None)}
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
        total_size = batcher.add_key(('key',))
        self.assertEqual(50, total_size)
        self.assertEqual([('key',)], batcher.keys)
        self.assertEqual([read_memo], batcher.memos_to_get)

    def test_add_key_duplicate_read_memo(self):
        """read_memos that occur multiple times in a batch will only be fetched
        once.
        """
        read_memo = ('fake index', 100, 50)
        # Two keys, both sharing the same read memo (but different overall
        # index_memos).
        locations = {
            ('key1',): (read_memo + (0, 1), None, None, None),
            ('key2',): (read_memo + (1, 2), None, None, None)}
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), locations)
        total_size = batcher.add_key(('key1',))
        total_size = batcher.add_key(('key2',))
        self.assertEqual(50, total_size)
        self.assertEqual([('key1',), ('key2',)], batcher.keys)
        self.assertEqual([read_memo], batcher.memos_to_get)

    def test_add_key_cached_read_memo(self):
        """Adding a key with a cached read_memo will not cause that read_memo
        to be added to the list to fetch.
        """
        read_memo = ('fake index', 100, 50)
        gcvf = StubGCVF()
        gcvf._group_cache[read_memo] = 'fake block'
        locations = {
            ('key',): (read_memo + (None, None), None, None, None)}
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
        total_size = batcher.add_key(('key',))
        self.assertEqual(0, total_size)
        self.assertEqual([('key',)], batcher.keys)
        self.assertEqual([], batcher.memos_to_get)

    def test_yield_factories_empty(self):
        """An empty batch yields no factories."""
        batcher = groupcompress._BatchingBlockFetcher(StubGCVF(), {})
        self.assertEqual([], list(batcher.yield_factories()))

    def test_yield_factories_calls_get_blocks(self):
        """Uncached memos are retrieved via get_blocks."""
        read_memo1 = ('fake index', 100, 50)
        read_memo2 = ('fake index', 150, 40)
        gcvf = StubGCVF(
            canned_get_blocks=[
                (read_memo1, groupcompress.GroupCompressBlock()),
                (read_memo2, groupcompress.GroupCompressBlock())])
        locations = {
            ('key1',): (read_memo1 + (None, None), None, None, None),
            ('key2',): (read_memo2 + (None, None), None, None, None)}
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
        batcher.add_key(('key1',))
        batcher.add_key(('key2',))
        factories = list(batcher.yield_factories(full_flush=True))
        self.assertLength(2, factories)
        keys = [f.key for f in factories]
        kinds = [f.storage_kind for f in factories]
        self.assertEqual([('key1',), ('key2',)], keys)
        self.assertEqual(['groupcompress-block', 'groupcompress-block'], kinds)

    def test_yield_factories_flushing(self):
        """yield_factories holds back on yielding results from the final block
        unless passed full_flush=True.
        """
        fake_block = groupcompress.GroupCompressBlock()
        read_memo = ('fake index', 100, 50)
        gcvf = StubGCVF()
        gcvf._group_cache[read_memo] = fake_block
        locations = {
            ('key',): (read_memo + (None, None), None, None, None)}
        batcher = groupcompress._BatchingBlockFetcher(gcvf, locations)
        batcher.add_key(('key',))
        self.assertEqual([], list(batcher.yield_factories()))
        factories = list(batcher.yield_factories(full_flush=True))
        self.assertLength(1, factories)
        self.assertEqual(('key',), factories[0].key)
        self.assertEqual('groupcompress-block', factories[0].storage_kind)


class TestLazyGroupCompress(tests.TestCaseWithTransport):

    _texts = {
        ('key1',): "this is a text\n"
                   "with a reasonable amount of compressible bytes\n"
                   "which can be shared between various other texts\n",
        ('key2',): "another text\n"
                   "with a reasonable amount of compressible bytes\n"
                   "which can be shared between various other texts\n",
        ('key3',): "yet another text which won't be extracted\n"
                   "with a reasonable amount of compressible bytes\n"
                   "which can be shared between various other texts\n",
        ('key4',): "this will be extracted\n"
                   "but references most of its bytes from\n"
                   "yet another text which won't be extracted\n"
                   "with a reasonable amount of compressible bytes\n"
                   "which can be shared between various other texts\n",
    }
    def make_block(self, key_to_text):
        """Create a GroupCompressBlock, filling it with the given texts."""
        compressor = groupcompress.GroupCompressor()
        start = 0
        for key in sorted(key_to_text):
            compressor.compress(key, key_to_text[key], None)
        locs = dict((key, (start, end)) for key, (start, _, end, _)
                    in compressor.labels_deltas.iteritems())
        block = compressor.flush()
        raw_bytes = block.to_bytes()
        return locs, groupcompress.GroupCompressBlock.from_bytes(raw_bytes)

    def add_key_to_manager(self, key, locations, block, manager):
        start, end = locations[key]
        manager.add_factory(key, (), start, end)

    def make_block_and_full_manager(self, texts):
        locations, block = self.make_block(texts)
        manager = groupcompress._LazyGroupContentManager(block)
        for key in sorted(texts):
            self.add_key_to_manager(key, locations, block, manager)
        return block, manager

    def test_get_fulltexts(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        self.add_key_to_manager(('key1',), locations, block, manager)
        self.add_key_to_manager(('key2',), locations, block, manager)
        result_order = []
        for record in manager.get_record_stream():
            result_order.append(record.key)
            text = self._texts[record.key]
            self.assertEqual(text, record.get_bytes_as('fulltext'))
        self.assertEqual([('key1',), ('key2',)], result_order)

        # If we build the manager in the opposite order, we should get them
        # back in the opposite order
        manager = groupcompress._LazyGroupContentManager(block)
        self.add_key_to_manager(('key2',), locations, block, manager)
        self.add_key_to_manager(('key1',), locations, block, manager)
        result_order = []
        for record in manager.get_record_stream():
            result_order.append(record.key)
            text = self._texts[record.key]
            self.assertEqual(text, record.get_bytes_as('fulltext'))
        self.assertEqual([('key2',), ('key1',)], result_order)

    def test__wire_bytes_no_keys(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        wire_bytes = manager._wire_bytes()
        block_length = len(block.to_bytes())
        # We should have triggered a strip, since we aren't using any content
        stripped_block = manager._block.to_bytes()
        self.assertTrue(block_length > len(stripped_block))
        empty_z_header = zlib.compress('')
        self.assertEqual('groupcompress-block\n'
                         '8\n' # len(compress(''))
                         '0\n' # len('')
                         '%d\n'# compressed block len
                         '%s'  # zheader
                         '%s'  # block
                         % (len(stripped_block), empty_z_header,
                            stripped_block),
                         wire_bytes)

    def test__wire_bytes(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        self.add_key_to_manager(('key1',), locations, block, manager)
        self.add_key_to_manager(('key4',), locations, block, manager)
        block_bytes = block.to_bytes()
        wire_bytes = manager._wire_bytes()
        (storage_kind, z_header_len, header_len,
         block_len, rest) = wire_bytes.split('\n', 4)
        z_header_len = int(z_header_len)
        header_len = int(header_len)
        block_len = int(block_len)
        self.assertEqual('groupcompress-block', storage_kind)
        self.assertEqual(34, z_header_len)
        self.assertEqual(26, header_len)
        self.assertEqual(len(block_bytes), block_len)
        z_header = rest[:z_header_len]
        header = zlib.decompress(z_header)
        self.assertEqual(header_len, len(header))
        entry1 = locations[('key1',)]
        entry4 = locations[('key4',)]
        self.assertEqualDiff('key1\n'
                             '\n'  # no parents
                             '%d\n' # start offset
                             '%d\n' # end offset
                             'key4\n'
                             '\n'
                             '%d\n'
                             '%d\n'
                             % (entry1[0], entry1[1],
                                entry4[0], entry4[1]),
                            header)
        z_block = rest[z_header_len:]
        self.assertEqual(block_bytes, z_block)

    def test_from_bytes(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        self.add_key_to_manager(('key1',), locations, block, manager)
        self.add_key_to_manager(('key4',), locations, block, manager)
        wire_bytes = manager._wire_bytes()
        self.assertStartsWith(wire_bytes, 'groupcompress-block\n')
        manager = groupcompress._LazyGroupContentManager.from_bytes(wire_bytes)
        self.assertIsInstance(manager, groupcompress._LazyGroupContentManager)
        self.assertEqual(2, len(manager._factories))
        self.assertEqual(block._z_content, manager._block._z_content)
        result_order = []
        for record in manager.get_record_stream():
            result_order.append(record.key)
            text = self._texts[record.key]
            self.assertEqual(text, record.get_bytes_as('fulltext'))
        self.assertEqual([('key1',), ('key4',)], result_order)

    def test__check_rebuild_no_changes(self):
        block, manager = self.make_block_and_full_manager(self._texts)
        manager._check_rebuild_block()
        self.assertIs(block, manager._block)

    def test__check_rebuild_only_one(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        # Request just the first key, which should trigger a 'strip' action
        self.add_key_to_manager(('key1',), locations, block, manager)
        manager._check_rebuild_block()
        self.assertIsNot(block, manager._block)
        self.assertTrue(block._content_length > manager._block._content_length)
        # We should be able to still get the content out of this block, though
        # it should only have 1 entry
        for record in manager.get_record_stream():
            self.assertEqual(('key1',), record.key)
            self.assertEqual(self._texts[record.key],
                             record.get_bytes_as('fulltext'))

    def test__check_rebuild_middle(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        # Request a small key in the middle should trigger a 'rebuild'
        self.add_key_to_manager(('key4',), locations, block, manager)
        manager._check_rebuild_block()
        self.assertIsNot(block, manager._block)
        self.assertTrue(block._content_length > manager._block._content_length)
        for record in manager.get_record_stream():
            self.assertEqual(('key4',), record.key)
            self.assertEqual(self._texts[record.key],
                             record.get_bytes_as('fulltext'))

    def test_check_is_well_utilized_all_keys(self):
        block, manager = self.make_block_and_full_manager(self._texts)
        self.assertFalse(manager.check_is_well_utilized())
        # Though we can fake it by changing the recommended minimum size
        manager._full_enough_block_size = block._content_length
        self.assertTrue(manager.check_is_well_utilized())
        # Setting it just above causes it to fail
        manager._full_enough_block_size = block._content_length + 1
        self.assertFalse(manager.check_is_well_utilized())
        # Setting the mixed-block size doesn't do anything, because the content
        # is considered to not be 'mixed'
        manager._full_enough_mixed_block_size = block._content_length
        self.assertFalse(manager.check_is_well_utilized())

    def test_check_is_well_utilized_mixed_keys(self):
        texts = {}
        f1k1 = ('f1', 'k1')
        f1k2 = ('f1', 'k2')
        f2k1 = ('f2', 'k1')
        f2k2 = ('f2', 'k2')
        texts[f1k1] = self._texts[('key1',)]
        texts[f1k2] = self._texts[('key2',)]
        texts[f2k1] = self._texts[('key3',)]
        texts[f2k2] = self._texts[('key4',)]
        block, manager = self.make_block_and_full_manager(texts)
        self.assertFalse(manager.check_is_well_utilized())
        manager._full_enough_block_size = block._content_length
        self.assertTrue(manager.check_is_well_utilized())
        manager._full_enough_block_size = block._content_length + 1
        self.assertFalse(manager.check_is_well_utilized())
        manager._full_enough_mixed_block_size = block._content_length
        self.assertTrue(manager.check_is_well_utilized())

    def test_check_is_well_utilized_partial_use(self):
        locations, block = self.make_block(self._texts)
        manager = groupcompress._LazyGroupContentManager(block)
        manager._full_enough_block_size = block._content_length
        self.add_key_to_manager(('key1',), locations, block, manager)
        self.add_key_to_manager(('key2',), locations, block, manager)
        # Just using the content from key1 and 2 is not enough to be considered
        # 'complete'
        self.assertFalse(manager.check_is_well_utilized())
        # However if we add key3, then we have enough, as we only require 75%
        # consumption
        self.add_key_to_manager(('key4',), locations, block, manager)
        self.assertTrue(manager.check_is_well_utilized())
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.