test_PositionIncrement.py :  » Search » PyLucence » pylucene-3.0.1-1 » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Search » PyLucence 
PyLucence » pylucene 3.0.1 1 » test » test_PositionIncrement.py
# ====================================================================
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
# ====================================================================

from unittest import TestCase,main
from lucene import *


class PositionIncrementTestCase(TestCase):
    """
    Unit tests ported from Java Lucene
    """

    def testSetPosition(self):

        class _analyzer(PythonAnalyzer):
            def tokenStream(_self, fieldName, reader):
                class _tokenStream(PythonTokenStream):
                    def __init__(self_):
                        super(_tokenStream, self_).__init__()
                        self_.TOKENS = ["1", "2", "3", "4", "5"]
                        self_.INCREMENTS = [1, 2, 1, 0, 1]
                        self_.i = 0
                        self_.posIncrAtt = self_.addAttribute(PositionIncrementAttribute.class_)
                        self_.termAtt = self_.addAttribute(TermAttribute.class_)
                        self_.offsetAtt = self_.addAttribute(OffsetAttribute.class_)
                    def incrementToken(self_):
                        if self_.i == len(self_.TOKENS):
                            return False
                        self_.termAtt.setTermBuffer(self_.TOKENS[self_.i])
                        self_.offsetAtt.setOffset(self_.i, self_.i)
                        self_.posIncrAtt.setPositionIncrement(self_.INCREMENTS[self_.i])
                        self_.i += 1
                        return True
                    def end(self):
                        pass
                    def reset(self):
                        pass
                    def close(self):
                        pass
                return _tokenStream()

        analyzer = _analyzer()

        store = RAMDirectory()
        writer = IndexWriter(store, analyzer, True, 
                             IndexWriter.MaxFieldLength.LIMITED)
        d = Document()
        d.add(Field("field", "bogus",
                    Field.Store.YES, Field.Index.ANALYZED))
        writer.addDocument(d)
        writer.optimize()
        writer.close()

        searcher = IndexSearcher(store, True)

        pos = searcher.getIndexReader().termPositions(Term("field", "1"))
        pos.next()
        # first token should be at position 0
        self.assertEqual(0, pos.nextPosition())
    
        pos = searcher.getIndexReader().termPositions(Term("field", "2"))
        pos.next()
        # second token should be at position 2
        self.assertEqual(2, pos.nextPosition())
    
        q = PhraseQuery()
        q.add(Term("field", "1"))
        q.add(Term("field", "2"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # same as previous, just specify positions explicitely.
        q = PhraseQuery() 
        q.add(Term("field", "1"), 0)
        q.add(Term("field", "2"), 1)
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # specifying correct positions should find the phrase.
        q = PhraseQuery()
        q.add(Term("field", "1"), 0)
        q.add(Term("field", "2"), 2)
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "2"))
        q.add(Term("field", "3"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "3"))
        q.add(Term("field", "4"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # phrase query would find it when correct positions are specified. 
        q = PhraseQuery()
        q.add(Term("field", "3"), 0)
        q.add(Term("field", "4"), 0)
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        # phrase query should fail for non existing searched term 
        # even if there exist another searched terms in the same searched
        # position.
        q = PhraseQuery()
        q.add(Term("field", "3"), 0)
        q.add(Term("field", "9"), 0)
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # multi-phrase query should succed for non existing searched term
        # because there exist another searched terms in the same searched
        # position.

        mq = MultiPhraseQuery()
        mq.add([Term("field", "3"), Term("field", "9")], 0)
        hits = searcher.search(mq, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "2"))
        q.add(Term("field", "4"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "3"))
        q.add(Term("field", "5"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "4"))
        q.add(Term("field", "5"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

        q = PhraseQuery()
        q.add(Term("field", "2"))
        q.add(Term("field", "5"))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # should not find "1 2" because there is a gap of 1 in the index
        qp = QueryParser(Version.LUCENE_CURRENT, "field",
                         StopWhitespaceAnalyzer(False))
        q = PhraseQuery.cast_(qp.parse("\"1 2\""))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # omitted stop word cannot help because stop filter swallows the
        # increments.
        q = PhraseQuery.cast_(qp.parse("\"1 stop 2\""))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # query parser alone won't help, because stop filter swallows the
        # increments.
        qp.setEnablePositionIncrements(True)
        q = PhraseQuery.cast_(qp.parse("\"1 stop 2\""))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))

        # stop filter alone won't help, because query parser swallows the
        # increments.
        qp.setEnablePositionIncrements(False)
        q = PhraseQuery.cast_(qp.parse("\"1 stop 2\""))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(0, len(hits))
      
        # when both qp qnd stopFilter propagate increments, we should find
        # the doc.
        qp = QueryParser(Version.LUCENE_CURRENT, "field",
                         StopWhitespaceAnalyzer(True))
        qp.setEnablePositionIncrements(True)
        q = PhraseQuery.cast_(qp.parse("\"1 stop 2\""))
        hits = searcher.search(q, None, 1000).scoreDocs
        self.assertEqual(1, len(hits))

    def testPayloadsPos0(self):

        dir = RAMDirectory()
        writer = IndexWriter(dir, TestPayloadAnalyzer(), True,
                             IndexWriter.MaxFieldLength.LIMITED)

        doc = Document()
        doc.add(Field("content",
                      StringReader("a a b c d e a f g h i j a b k k")))
        writer.addDocument(doc)

        r = writer.getReader()

        tp = r.termPositions(Term("content", "a"))
        count = 0
        self.assert_(tp.next())
        # "a" occurs 4 times
        self.assertEqual(4, tp.freq())

        expected = 0
        self.assertEqual(expected, tp.nextPosition())
        self.assertEqual(1, tp.nextPosition())
        self.assertEqual(3, tp.nextPosition())
        self.assertEqual(6, tp.nextPosition())

        # only one doc has "a"
        self.assert_(not tp.next())

        searcher = IndexSearcher(r)
    
        stq1 = SpanTermQuery(Term("content", "a"))
        stq2 = SpanTermQuery(Term("content", "k"))
        sqs = [stq1, stq2]
        snq = SpanNearQuery(sqs, 30, False)

        count = 0
        sawZero = False

        pspans = snq.getSpans(searcher.getIndexReader())
        while pspans.next():
            payloads = pspans.getPayload()
            sawZero |= pspans.start() == 0

            it = payloads.iterator()
            while it.hasNext():
                count += 1
                it.next()

        self.assertEqual(5, count)
        self.assert_(sawZero)

        spans = snq.getSpans(searcher.getIndexReader())
        count = 0
        sawZero = False
        while spans.next():
            count += 1
            sawZero |= spans.start() == 0

        self.assertEqual(4, count)
        self.assert_(sawZero)
    
        sawZero = False
        psu = PayloadSpanUtil(searcher.getIndexReader())
        pls = psu.getPayloadsForQuery(snq)
        count = pls.size()
        it = pls.iterator()
        while it.hasNext():
            bytes = JArray('byte').cast_(it.next())
            s = bytes.string_
            sawZero |= s == "pos: 0"

        self.assertEqual(5, count)
        self.assert_(sawZero)
        writer.close()
        searcher.getIndexReader().close()
        dir.close()


class StopWhitespaceAnalyzer(PythonAnalyzer):

    def __init__(self, enablePositionIncrements):
        super(StopWhitespaceAnalyzer, self).__init__()

        self.enablePositionIncrements = enablePositionIncrements
        self.a = WhitespaceAnalyzer()

    def tokenStream(self, fieldName, reader):

        ts = self.a.tokenStream(fieldName, reader)
        set = HashSet()
        set.add("stop")

        return StopFilter(self.enablePositionIncrements, ts, set)


class TestPayloadAnalyzer(PythonAnalyzer):

    def tokenStream(self, fieldName, reader):

        result = LowerCaseTokenizer(reader)
        return PayloadFilter(result, fieldName)


class PayloadFilter(PythonTokenFilter):

    def __init__(self, input, fieldName):
        super(PayloadFilter, self).__init__(input)
        self.input = input

        self.fieldName = fieldName
        self.pos = 0
        self.i = 0
        self.posIncrAttr = input.addAttribute(PositionIncrementAttribute.class_)
        self.payloadAttr = input.addAttribute(PayloadAttribute.class_)
        self.termAttr = input.addAttribute(TermAttribute.class_)

    def incrementToken(self):

        if self.input.incrementToken():
            bytes = JArray('byte')("pos: %d" %(self.pos))
            self.payloadAttr.setPayload(Payload(bytes))

            if self.i % 2 == 1:
                posIncr = 1
            else:
                posIncr = 0

            self.posIncrAttr.setPositionIncrement(posIncr)
            self.pos += posIncr
            self.i += 1
            return True

        return False


if __name__ == "__main__":
    import sys, lucene
    lucene.initVM()
    if '-loop' in sys.argv:
        sys.argv.remove('-loop')
        while True:
            try:
                main()
            except:
                pass
    else:
         main()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.