test_numpy.py :  » Database » PyTables » tables-2.1.2 » tables » tests » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Database » PyTables 
PyTables » tables 2.1.2 » tables » tests » test_numpy.py
import sys
import unittest
import os
import tempfile
import numpy

from numpy import *

import tables
from tables import *
from tables.tests import common
from tables.tests.common import allequal

# To delete the internal attributes automagically
unittest.TestCase.tearDown = common.cleanup

typecodes = ['b', 'h', 'i', 'l', 'q', 'f', 'd']
# UInt64 checking disabled on win platforms
# because this type is not supported
if sys.platform != 'win32':
    typecodes += ['B', 'H', 'I', 'L', 'Q', 'F', 'D']
else:
    typecodes += ['B', 'H', 'I', 'L', 'F', 'D']
typecodes += ['b1']   # boolean

byteorder = {'little': '<', 'big': '>'}[sys.byteorder]

class BasicTestCase(unittest.TestCase):
    """Basic test for all the supported typecodes present in NumPy.
    All of them are included on PyTables.
    """
    endiancheck = 0

    def WriteRead(self, testArray):
        if common.verbose:
            print '\n', '-=' * 30
            print "Running test for array with typecode '%s'" % \
                  testArray.dtype.char,
            print "for class check:", self.title

        # Create an instance of HDF5 Table
        self.file = tempfile.mktemp(".h5")
        self.fileh = openFile(self.file, mode = "w")
        self.root = self.fileh.root
        # Create the array under root and name 'somearray'
        a = testArray
        self.fileh.createArray(self.root, 'somearray', a, "Some array")

        # Close the file
        self.fileh.close()

        # Re-open the file in read-only mode
        self.fileh = openFile(self.file, mode = "r")
        self.root = self.fileh.root

        # Read the saved array
        b = self.root.somearray.read()
        # For cases that read returns a python type instead of a numpy type
        if not hasattr(b, "shape"):
            b = array(b, dtype=a.dtype.str)

        # Compare them. They should be equal.
        #if not allequal(a,b, "numpy") and common.verbose:
        if common.verbose:
            print "Array written:", a
            print "Array written shape:", a.shape
            print "Array written itemsize:", a.itemsize
            print "Array written type:", a.dtype.char
            print "Array read:", b
            print "Array read shape:", b.shape
            print "Array read itemsize:", b.itemsize
            print "Array read type:", b.dtype.char

        type_ = self.root.somearray.atom.type
        # Check strictly the array equality
        assert type(a) == type(b)
        assert a.shape == b.shape
        assert a.shape == self.root.somearray.shape
        assert a.dtype == b.dtype
        if a.dtype.char[0] == "S":
            assert type_ == "string"
        else:
            assert a.dtype.base.name == type_

        assert allequal(a,b, "numpy")
        self.fileh.close()
        # Then, delete the file
        os.remove(self.file)
        return

    def test00_char(self):
        "Data integrity during recovery (character objects)"

        a = array(self.tupleChar,'S'+str(len(self.tupleChar)))
        self.WriteRead(a)
        return

    def test01_char_nc(self):
        "Data integrity during recovery (non-contiguous character objects)"

        a = array(self.tupleChar,'S'+str(len(self.tupleChar)))
        if a.shape == ():
            b = a               # We cannot use the indexing notation
        else:
            b = a[::2]
            # Ensure that this numarray string is non-contiguous
            if a.shape[0] > 2:
                assert b.flags['CONTIGUOUS'] == False
        self.WriteRead(b)
        return

    def test02_types(self):
        "Data integrity during recovery (numerical types)"

        for typecode in typecodes:
            if self.tupleInt.shape:
                a = self.tupleInt.astype(typecode)
            else:
                # shape is the empty tuple ()
                a = array(self.tupleInt, dtype=typecode)
            self.WriteRead(a)

        return

    def test03_types_nc(self):
        "Data integrity during recovery (non-contiguous numerical types)"

        for typecode in typecodes:
            if self.tupleInt.shape:
                a = self.tupleInt.astype(typecode)
            else:
                # shape is the empty tuple ()
                a = array(self.tupleInt, dtype=typecode)
            # This should not be tested for the rank-0 case
            if len(a.shape) == 0:
                return
            b = a[::2]
            # Ensure that this array is non-contiguous (for non-trivial case)
            if a.shape[0] > 2:
                assert b.flags['CONTIGUOUS'] == False
            self.WriteRead(b)

        return


class Basic0DOneTestCase(BasicTestCase):
    # Rank-0 case
    title = "Rank-0 case 1"
    tupleInt = array(3)
    tupleChar = "4"

class Basic0DTwoTestCase(BasicTestCase):
    # Rank-0 case
    title = "Rank-0 case 2"
    tupleInt = array(33)
    tupleChar = "44"

class Basic1DOneTestCase(BasicTestCase):
    # 1D case
    title = "Rank-1 case 1"
    tupleInt = array((3,))
    tupleChar = ("a",)

class Basic1DTwoTestCase(BasicTestCase):
    # 1D case
    title = "Rank-1 case 2"
    tupleInt = array((0, 4))
    tupleChar = ("aaa",)

class Basic1DThreeTestCase(BasicTestCase):
    # 1D case
    title = "Rank-1 case 3"
    tupleInt = array((3, 4, 5))
    tupleChar = ("aaaa", "bbb",)

class Basic2DTestCase(BasicTestCase):
    # 2D case
    title = "Rank-2 case 1"
    #tupleInt = reshape(array(arange((4)**2)), (4,)*2)
    tupleInt = ones((4,)*2)
    tupleChar = [["aaa","ddddd"],["d","ss"],["s","tt"]]

class Basic10DTestCase(BasicTestCase):
    # 10D case
    title = "Rank-10 case 1"
    #tupleInt = reshape(array(arange((2)**10)), (2,)*10)
    tupleInt = ones((2,)*10)
    #tupleChar = reshape(array([1],dtype="S1"),(1,)*10)
    # The next tuple consumes far more time, so this
    # test should be run in common.heavy mode.
    tupleChar = array(tupleInt, dtype="S1")


# class Basic32DTestCase(BasicTestCase):
#     # 32D case (maximum)
#     tupleInt = reshape(array((22,)), (1,)*32)
#     # Strings seems to be very slow with somewhat large dimensions
#     # This should not be run unless the numarray people address this problem
#     # F. Alted 2006-01-04
#     tupleChar = array(tupleInt, dtype="S1")


class GroupsArrayTestCase(unittest.TestCase):
    """This test class checks combinations of arrays with groups.
    It also uses arrays ranks which ranges until 10.
    """

    def test00_iterativeGroups(self):
        """Checking combinations of arrays with groups
        It also uses arrays ranks which ranges until 10.
        """

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test00_iterativeGroups..." % \
                  self.__class__.__name__

        # Open a new empty HDF5 file
        file = tempfile.mktemp(".h5")
        fileh = openFile(file, mode = "w")
        # Get the root group
        group = fileh.root

        i = 1
        for typecode in typecodes:
            # Create an array of typecode, with incrementally bigger ranges
            a = ones((2,) * i, typecode)
            # Save it on the HDF5 file
            dsetname = 'array_' + typecode
            if common.verbose:
                print "Creating dataset:", group._g_join(dsetname)
            hdfarray = fileh.createArray(group, dsetname, a, "Large array")
            # Create a new group
            group = fileh.createGroup(group, 'group' + str(i))
            # increment the range for next iteration
            i += 1

        # Close the file
        fileh.close()

        # Open the previous HDF5 file in read-only mode
        fileh = openFile(file, mode = "r")
        # Get the root group
        group = fileh.root

        # Get the metadata on the previosly saved arrays
        for i in range(1,len(typecodes)):
            # Create an array for later comparison
            a = ones((2,) * i, typecodes[i - 1])
            # Get the dset object hanging from group
            dset = getattr(group, 'array_' + typecodes[i-1])
            # Get the actual array
            b = dset.read()
            if not allequal(a,b, "numpy") and common.verbose:
                print "Array a original. Shape: ==>", a.shape
                print "Array a original. Data: ==>", a
                print "Info from dataset:", dset._v_pathname
                print "  shape ==>", dset.shape,
                print "  dtype ==> %s" % dset.dtype
                print "Array b read from file. Shape: ==>", b.shape,
                print ". Type ==> %s" % b.dtype.char

            assert a.shape == b.shape
            if dtype('l').itemsize == 4:
                if (a.dtype.char == "i" or a.dtype.char == "l"):
                    # Special expection. We have no way to distinguish between
                    # "l" and "i" typecode, and we can consider them the same
                    # to all practical effects
                    assert b.dtype.char == "l" or b.dtype.char == "i"
                elif (a.dtype.char == "I" or a.dtype.char == "L"):
                    # Special expection. We have no way to distinguish between
                    # "L" and "I" typecode, and we can consider them the same
                    # to all practical effects
                    assert b.dtype.char == "L" or b.dtype.char == "I"
                else:
                    assert allequal(a,b, "numpy")
            elif dtype('l').itemsize == 8:
                if (a.dtype.char == "q" or a.dtype.char == "l"):
                    # Special expection. We have no way to distinguish between
                    # "q" and "l" typecode in 64-bit platforms, and we can
                    # consider them the same to all practical effects
                    assert b.dtype.char == "l" or b.dtype.char == "q"
                elif (a.dtype.char == "Q" or a.dtype.char == "L"):
                    # Special expection. We have no way to distinguish between
                    # "Q" and "L" typecode in 64-bit platforms, and we can
                    # consider them the same to all practical effects
                    assert b.dtype.char == "L" or b.dtype.char == "Q"
                else:
                    assert allequal(a,b, "numpy")

            # Iterate over the next group
            group = getattr(group, 'group' + str(i))

        # Close the file
        fileh.close()

        # Then, delete the file
        os.remove(file)

    def test01_largeRankArrays(self):
        """Checking creation of large rank arrays (0 < rank <= 32)
        It also uses arrays ranks which ranges until maxrank.
        """

        # maximum level of recursivity (deepest group level) achieved:
        # maxrank = 32 (for a effective maximum rank of 32)
        # This limit is due to a limit in the HDF5 library.
        minrank = 1
        maxrank = 32

        if common.verbose:
            print '\n', '-=' * 30
            print "Running %s.test01_largeRankArrays..." % \
                  self.__class__.__name__
            print "Maximum rank for tested arrays:", maxrank
        # Open a new empty HDF5 file
        file = tempfile.mktemp(".h5")
        fileh = openFile(file, mode = "w")
        group = fileh.root
        if common.verbose:
            print "Rank array writing progress: ",
        for rank in range(minrank, maxrank + 1):
            # Create an array of integers, with incrementally bigger ranges
            a = ones((1,) * rank, 'i')
            if common.verbose:
                print "%3d," % (rank),
            fileh.createArray(group, "array", a, "Rank: %s" % rank)
            group = fileh.createGroup(group, 'group' + str(rank))
        # Flush the buffers
        fileh.flush()
        # Close the file
        fileh.close()

        # Open the previous HDF5 file in read-only mode
        fileh = openFile(file, mode = "r")
        group = fileh.root
        if common.verbose:
            print
            print "Rank array reading progress: "
        # Get the metadata on the previosly saved arrays
        for rank in range(minrank, maxrank + 1):
            # Create an array for later comparison
            a = ones((1,) * rank, 'i')
            # Get the actual array
            b = group.array.read()
            if common.verbose:
                print "%3d," % (rank),
            if not a.tolist() == b.tolist() and common.verbose:
                print "Info from dataset:", dset._v_pathname
                print "  Shape: ==>", dset.shape,
                print "  typecode ==> %c" % dset.typecode
                print "Array b read from file. Shape: ==>", b.shape,
                print ". Type ==> %c" % b.dtype.char
            assert a.shape == b.shape
            if a.dtype.char == "i":
                # Special expection. We have no way to distinguish between
                # "l" and "i" typecode, and we can consider them the same
                # to all practical effects
                assert b.dtype.char == "l" or b.dtype.char == "i"
            else:
                assert a.dtype.char == b.dtype.char

            assert a == b

            # Iterate over the next group
            group = fileh.getNode(group, 'group' + str(rank))

        if common.verbose:
            print # This flush the stdout buffer
        # Close the file
        fileh.close()
        # Delete the file
        os.remove(file)


# Test Record class
class Record(IsDescription):
    var1  = StringCol(itemsize=4, dflt="abcd", pos=0)
    var2  = StringCol(itemsize=1, dflt="a", pos=1)
    var3  = BoolCol(dflt=1)  # Typecode == '1' in Numeric. 'B' in numarray
    var4  = Int8Col(dflt=1)
    var5  = UInt8Col(dflt=1)
    var6  = Int16Col(dflt=1)
    var7  = UInt16Col(dflt=1)
    var8  = Int32Col(dflt=1)
    var9  = UInt32Col(dflt=1)
    var10 = Int64Col(dflt=1)
    var11 = Float32Col(dflt=1.0)
    var12 = Float64Col(dflt=1.0)
    var13 = ComplexCol(itemsize=8, dflt=(1.+0.j))
    var14 = ComplexCol(itemsize=16, dflt=(1.+0.j))


class TableReadTestCase(common.PyTablesTestCase):
    nrows = 100

    def setUp(self):

        # Create an instance of an HDF5 Table
        self.file = tempfile.mktemp(".h5")
        fileh = openFile(self.file, "w")
        table = fileh.createTable(fileh.root, 'table', Record)
        for i in range(self.nrows):
            table.row.append()  # Fill 100 rows with default values
        fileh.close()
        self.fileh = openFile(self.file, "a")  # allow flavor changes

    def tearDown(self):
        self.fileh.close()
        os.remove(self.file)
        common.cleanup(self)


    def test01_readTableChar(self):
        """Checking column conversion into NumPy in read(). Char flavor"""

        table = self.fileh.root.table
        table.flavor = "numpy"
        for colname in table.colnames:
            numcol = table.read(field=colname)
            typecol = table.coltypes[colname]
            itemsizecol = table.description._v_dtypes[colname].base.itemsize
            nctypecode = numcol.dtype.char
            if typecol == "string":
                if itemsizecol > 1:
                    orignumcol = array(['abcd']*self.nrows, dtype='S4')
                else:
                    orignumcol = array(['a']*self.nrows, dtype='S1')
                if common.verbose:
                    print "Typecode of NumPy column read:", nctypecode
                    print "Should look like:", 'c'
                    print "Itemsize of column:", itemsizecol
                    print "Shape of NumPy column read:", numcol.shape
                    print "Should look like:", orignumcol.shape
                    print "First 3 elements of read col:", numcol[:3]
                # Check that both NumPy objects are equal
                assert allequal(numcol, orignumcol, "numpy")

    def test01_readTableNum(self):
        """Checking column conversion into NumPy in read(). NumPy flavor"""

        table = self.fileh.root.table
        table.flavor = "numpy"
        for colname in table.colnames:
            numcol = table.read(field=colname)
            typecol = table.coltypes[colname]
            nctypecode = typeNA[numcol.dtype.char[0]]
            if typecol != "string":
                if common.verbose:
                    print "Typecode of NumPy column read:", nctypecode
                    print "Should look like:", typecol
                orignumcol = ones(shape=self.nrows, dtype=numcol.dtype.char)
                # Check that both NumPy objects are equal
                assert allequal(numcol, orignumcol, "numpy")


    def test02_readCoordsChar(self):
        """Column conversion into NumPy in readCoords(). Chars"""

        table = self.fileh.root.table
        table.flavor = "numpy"
        coords = (1,2,3)
        self.nrows = len(coords)
        for colname in table.colnames:
            numcol = table.readCoordinates(coords, field=colname)
            typecol = table.coltypes[colname]
            itemsizecol = table.description._v_dtypes[colname].base.itemsize
            nctypecode = numcol.dtype.char
            if typecol == "string":
                if itemsizecol > 1:
                    orignumcol = array(['abcd']*self.nrows, dtype='S4')
                else:
                    orignumcol = array(['a']*self.nrows, dtype='S1')
                if common.verbose:
                    print "Typecode of NumPy column read:", nctypecode
                    print "Should look like:", 'c'
                    print "Itemsize of column:", itemsizecol
                    print "Shape of NumPy column read:", numcol.shape
                    print "Should look like:", orignumcol.shape
                    print "First 3 elements of read col:", numcol[:3]
                # Check that both NumPy objects are equal
                assert allequal(numcol, orignumcol, "numpy")

    def test02_readCoordsNum(self):
        """Column conversion into NumPy in readCoordinates(). NumPy."""

        table = self.fileh.root.table
        table.flavor = "numpy"
        coords = (1,2,3)
        self.nrows = len(coords)
        for colname in table.colnames:
            numcol = table.readCoordinates(coords, field=colname)
            typecol = table.coltypes[colname]
            type_ = numcol.dtype.type
            if typecol != "string":
                if typecol == "int64":
                    return
                if common.verbose:
                    print "Type of read NumPy column:", type_
                    print "Should look like:", typecol
                orignumcol = ones(shape=self.nrows, dtype=numcol.dtype.char)
                # Check that both NumPy objects are equal
                assert allequal(numcol, orignumcol, "numpy")

    def test03_getIndexNumPy(self):
        """Getting table rows specifyied as NumPy scalar integers."""

        table = self.fileh.root.table
        coords = numpy.array([1,2,3], dtype='int8')
        for colname in table.colnames:
            numcol = [ table[coord][colname] for coord in coords ]
            typecol = table.coltypes[colname]
            if typecol != "string":
                if typecol == "int64":
                    return
                numcol = numpy.array(numcol, typecol)
                if common.verbose:
                    type_ = numcol.dtype.type
                    print "Type of read NumPy column:", type_
                    print "Should look like:", typecol
                orignumcol = ones(shape=len(numcol), dtype=numcol.dtype.char)
                # Check that both NumPy objects are equal
                assert allequal(numcol, orignumcol, "numpy")

    def test04_setIndexNumPy(self):
        """Setting table rows specifyied as NumPy integers."""

        self.fileh.close()
        self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        table.flavor = "numpy"
        coords = numpy.array([1,2,3], dtype='int8')
        # Modify row 1
        # From PyTables 2.0 on, assignments to records can be done
        # only as tuples (see http://projects.scipy.org/scipy/numpy/ticket/315)
        #table[coords[0]] = ["aasa","x"]+[232]*12
        table[coords[0]] = tuple(["aasa","x"]+[232]*12)
        #record = list(table[coords[0]])
        record = table.read(coords[0])
        if common.verbose:
            print """Original row:
['aasa', 'x', 232, -24, 232, 232, 1, 232L, 232, (232+0j), 232.0, 232L, (232+0j), 232.0]
"""
            print "Read row:\n", record
        assert record['var1'] == 'aasa'
        assert record['var2'] == 'x'
        assert record['var3'] == True
        assert record['var4'] == -24
        assert record['var7'] == 232


# The declaration of the nested table:
class Info(IsDescription):
    _v_pos = 3
    Name = StringCol(itemsize=2)
    Value = ComplexCol(itemsize=16)

class TestTDescr(IsDescription):

    """A description that has several nested columns."""

    x = Int32Col(dflt=0, shape=2, pos=0) #0
    y = FloatCol(dflt=1, shape=(2,2))
    z = UInt8Col(dflt=1)
    z3 = EnumCol({'r':4, 'g':2, 'b':1}, 'r', 'int32', shape=2)
    color = StringCol(itemsize=4, dflt="ab", pos=2)
    info = Info()
    class Info(IsDescription): #1
        _v_pos = 1
        name = StringCol(itemsize=2)
        value = ComplexCol(itemsize=16, pos=0) #0
        y2 = FloatCol(pos=1) #1
        z2 = UInt8Col()
        class Info2(IsDescription):
            y3 = Time64Col(shape=2)
            name = StringCol(itemsize=2)
            value = ComplexCol(itemsize=16, shape=2)


class TableNativeFlavorTestCase(common.PyTablesTestCase):
    nrows = 100

    def setUp(self):

        # Create an instance of an HDF5 Table
        self.file = tempfile.mktemp(".h5")
        fileh = openFile(self.file, "w")
        table = fileh.createTable(fileh.root, 'table', TestTDescr,
                                  expectedrows=self.nrows)
        table.flavor = "numpy"
        for i in range(self.nrows):
            table.row.append()  # Fill 100 rows with default values
        table.flush()
        self.fileh = fileh

    def tearDown(self):
        self.fileh.close()
        os.remove(self.file)
        common.cleanup(self)


    def test01a_basicTableRead(self):
        """Checking the return of a NumPy in read()."""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        data = table[:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the value of some columns
        # A flat column
        col = table.cols.x[:3]
        assert isinstance(col, ndarray)
        npcol = zeros((3,2), dtype="int32")
        assert allequal(col, npcol, "numpy")
        # A nested column
        col = table.cols.Info[:3]
        assert isinstance(col, ndarray)
        dtype = [('value', 'c16'),
                 ('y2', 'f8'),
                 ('Info2',
                  [('name', 'S2'),
                   ('value', 'c16', (2,)),
                   ('y3', 'f8', (2,))]),
                 ('name', 'S2'),
                 ('z2', 'u1')]
        npcol = zeros((3,), dtype=dtype)
        assert col.dtype.descr == npcol.dtype.descr
        if common.verbose:
            print "col-->", col
            print "npcol-->", npcol
        # A copy() is needed in case the buffer can be in different segments
        assert col.copy().data == npcol.data

    def test01b_basicTableRead(self):
        """Checking the return of a NumPy in read() (strided version)."""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        data = table[::3]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the value of some columns
        # A flat column
        col = table.cols.x[:9:3]
        assert isinstance(col, ndarray)
        npcol = zeros((3,2), dtype="int32")
        assert allequal(col, npcol, "numpy")
        # A nested column
        col = table.cols.Info[:9:3]
        assert isinstance(col, ndarray)
        dtype = [('value', '%sc16' % byteorder),
                 ('y2', '%sf8' % byteorder),
                 ('Info2',
                  [('name', '|S2'),
                   ('value', '%sc16' % byteorder, (2,)),
                   ('y3', '%sf8' % byteorder, (2,))]),
                 ('name', '|S2'),
                 ('z2', '|u1')]
        npcol = zeros((3,), dtype=dtype)
        assert col.dtype.descr == npcol.dtype.descr
        if common.verbose:
            print "col-->", col
            print "npcol-->", npcol
        # A copy() is needed in case the buffer can be in different segments
        assert col.copy().data == npcol.data

    def test02_getWhereList(self):
        """Checking the return of NumPy in getWhereList method."""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        data = table.getWhereList('z == 1')
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 100
        # Finally, check that the contents are ok
        assert allequal(data, arange(100, dtype="i8"), "numpy")

    def test03a_readWhere(self):
        """Checking the return of NumPy in readWhere method (strings)."""

        table = self.fileh.root.table
        if tables.is_pro:
            table.cols.color.createIndex()
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.readWhere('color == "ab"')
        if common.verbose:
            print "Type of read:", type(data)
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == self.nrows

    def test03b_readWhere(self):
        """Checking the return of NumPy in readWhere method (numeric)."""

        table = self.fileh.root.table
        if tables.is_pro:
            table.cols.z.createIndex()
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.readWhere('z == 0')
        if common.verbose:
            print "Type of read:", type(data)
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 0

    def test04a_createTable(self):
        """Checking the Table creation from a numpy recarray."""

        dtype = [('value', '%sc16' % byteorder),
                 ('y2', '%sf8' % byteorder),
                 ('Info2',
                  [('name', '|S2'),
                   ('value', '%sc16' % byteorder, (2,)),
                   ('y3', '%sf8' % byteorder, (2,))]),
                 ('name', '|S2'),
                 ('z2', '|u1')]
        npdata = zeros((3,), dtype=dtype)
        table = self.fileh.createTable(self.fileh.root, 'table2', npdata)
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table2
        data = table[:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npdata.dtype.descr
        if common.verbose:
            print "npdata-->", npdata
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == npdata.data

    def test04b_appendTable(self):
        """Checking appending a numpy recarray."""

        table = self.fileh.root.table
        npdata = table[3:6]
        table.append(npdata)
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table[-3:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "Last 3 elements of read:", data[-3:]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npdata.dtype.descr
        if common.verbose:
            print "npdata-->", npdata
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == npdata.data

    def test05a_assignColumn(self):
        """Checking assigning to a column."""

        table = self.fileh.root.table
        table.cols.z[:] = zeros((100,), dtype='u1')
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.cols.z[:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 100
        # Finally, check that the contents are ok
        assert allequal(data, zeros((100,), dtype="u1"), "numpy")

    def test05b_modifyingColumns(self):
        """Checking modifying several columns at once."""

        table = self.fileh.root.table
        xcol = ones((3,2), 'int32')
        ycol = zeros((3,2,2), 'float64')
        zcol = zeros((3,), 'uint8')
        table.modifyColumns(3, 6, 1, [xcol, ycol, zcol], ['x', 'y', 'z'])
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.cols.y[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == ycol.data

    def test05c_modifyingColumns(self):
        """Checking modifying several columns using a single numpy buffer."""

        table = self.fileh.root.table
        dtype=[('x', 'i4', (2,)), ('y', 'f8', (2, 2)), ('z', 'u1')]
        nparray = zeros((3,), dtype=dtype)
        table.modifyColumns(3, 6, 1, nparray, ['x', 'y', 'z'])
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        ycol = zeros((3, 2, 2), 'float64')
        data = table.cols.y[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == ycol.data

    def test06a_assignNestedColumn(self):
        """Checking assigning a nested column (using modifyColumn)."""

        table = self.fileh.root.table
        dtype = [('value', '%sc16' % byteorder),
                 ('y2', '%sf8' % byteorder),
                 ('Info2',
                  [('name', '|S2'),
                   ('value', '%sc16' % byteorder, (2,)),
                   ('y3', '%sf8' % byteorder, (2,))]),
                 ('name', '|S2'),
                 ('z2', '|u1')]
        npdata = zeros((3,), dtype=dtype)
        data = table.cols.Info[3:6]
        table.modifyColumn(3, 6, 1, column=npdata, colname='Info')
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.cols.Info[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npdata.dtype.descr
        if common.verbose:
            print "npdata-->", npdata
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == npdata.data

    def test06b_assignNestedColumn(self):
        """Checking assigning a nested column (using the .cols accessor)."""

        table = self.fileh.root.table
        dtype = [('value', '%sc16' % byteorder),
                 ('y2', '%sf8' % byteorder),
                 ('Info2',
                  [('name', '|S2'),
                   ('value', '%sc16' % byteorder, (2,)),
                   ('y3', '%sf8' % byteorder, (2,))]),
                 ('name', '|S2'),
                 ('z2', '|u1')]
        npdata = zeros((3,), dtype=dtype)
#         self.assertRaises(NotImplementedError,
#                           table.cols.Info.__setitem__, slice(3,6,1),  npdata)
        table.cols.Info[3:6] = npdata
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        data = table.cols.Info[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npdata.dtype.descr
        if common.verbose:
            print "npdata-->", npdata
            print "data-->", data
        # A copy() is needed in case the buffer would be in different segments
        assert data.copy().data == npdata.data

    def test07a_modifyingRows(self):
        """Checking modifying several rows at once (using modifyRows)."""

        table = self.fileh.root.table
        # Read a chunk of the table
        chunk = table[0:3]
        # Modify it somewhat
        chunk['y'][:] = -1
        table.modifyRows(3, 6, 1, rows=chunk)
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        ycol = zeros((3,2,2), 'float64')-1
        data = table.cols.y[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        assert allequal(ycol, data, "numpy")

    def test07b_modifyingRows(self):
        """Checking modifying several rows at once (using cols accessor)."""

        table = self.fileh.root.table
        # Read a chunk of the table
        chunk = table[0:3]
        # Modify it somewhat
        chunk['y'][:] = -1
        table.cols[3:6] = chunk
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        # Check that some column has been actually modified
        ycol = zeros((3,2,2), 'float64')-1
        data = table.cols.y[3:6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        assert allequal(ycol, data, "numpy")

    def test08a_modifyingRows(self):
        """Checking modifying just one row at once (using modifyRows)."""

        table = self.fileh.root.table
        # Read a chunk of the table
        chunk = table[3]
        # Modify it somewhat
        chunk['y'][:] = -1
        table.modifyRows(6, 7, 1, chunk)
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        # Check that some column has been actually modified
        ycol = zeros((2,2), 'float64')-1
        data = table.cols.y[6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        assert allequal(ycol, data, "numpy")

    def test08b_modifyingRows(self):
        """Checking modifying just one row at once (using cols accessor)."""

        table = self.fileh.root.table
        # Read a chunk of the table
        chunk = table[3]
        # Modify it somewhat
        chunk['y'][:] = -1
        table.cols[6] = chunk
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            table = self.fileh.root.table
        # Check that some column has been actually modified
        ycol = zeros((2,2), 'float64')-1
        data = table.cols.y[6]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
            print "Length of the data read:", len(data)
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == ycol.dtype.descr
        if common.verbose:
            print "ycol-->", ycol
            print "data-->", data
        assert allequal(ycol, data, "numpy")

    def test09a_getStrings(self):
        """Checking the return of string columns with spaces."""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        rdata = table.getWhereList('color == "ab"')
        data = table.readCoordinates(rdata)
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 100
        # Finally, check that the contents are ok
        for idata in data['color']:
            assert idata == array("ab", dtype="|S4")

    def test09b_getStrings(self):
        """Checking the return of string columns with spaces. (modify)"""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        for i in range(50):
            table.cols.color[i] = "a  "
        table.flush()
        data = table[:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 100
        # Finally, check that the contents are ok
        for i in range(100):
            idata = data['color'][i]
            if i >= 50:
                assert idata == array("ab", dtype="|S4")
            else:
                assert idata == array("a  ", dtype="|S4")

    def test09c_getStrings(self):
        """Checking the return of string columns with spaces. (append)"""

        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        table = self.fileh.root.table
        row = table.row
        for i in range(50):
            row["color"] = "a  "   # note the trailing spaces
            row.append()
        table.flush()
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
        data = self.fileh.root.table[:]
        if common.verbose:
            print "Type of read:", type(data)
            print "Description of the record:", data.dtype.descr
            print "First 3 elements of read:", data[:3]
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check that all columns have been selected
        assert len(data) == 150
        # Finally, check that the contents are ok
        # Finally, check that the contents are ok
        for i in range(150):
            idata = data['color'][i]
            if i < 100:
                assert idata == array("ab", dtype="|S4")
            else:
                assert idata == array("a  ", dtype="|S4")

class TableNativeFlavorOpenTestCase(TableNativeFlavorTestCase):
    close = 0

class TableNativeFlavorCloseTestCase(TableNativeFlavorTestCase):
    close = 1

class AttributesTestCase(common.PyTablesTestCase):

    def setUp(self):

        # Create an instance of an HDF5 Table
        self.file = tempfile.mktemp(".h5")
        self.fileh = openFile(self.file, "w")
        groups = self.fileh.createGroup(self.fileh.root, 'group')

    def tearDown(self):
        self.fileh.close()
        os.remove(self.file)
        common.cleanup(self)

    def test01_writeAttribute(self):
        """Checking the creation of a numpy attribute."""
        group = self.fileh.root.group
        g_attrs = group._v_attrs
        g_attrs.numpy1 = zeros((1,1), dtype='int16')
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            group = self.fileh.root.group
            g_attrs = group._v_attrs
        # Check that we can retrieve a numpy object
        data = g_attrs.numpy1
        npcomp = zeros((1,1), dtype='int16')
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npcomp.dtype.descr
        if common.verbose:
            print "npcomp-->", npcomp
            print "data-->", data
        assert allequal(npcomp, data, "numpy")

    def test02_updateAttribute(self):
        """Checking the modification of a numpy attribute."""

        group = self.fileh.root.group
        g_attrs = group._v_attrs
        g_attrs.numpy1 = zeros((1,2), dtype='int16')
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            group = self.fileh.root.group
            g_attrs = group._v_attrs
        # Update this attribute
        g_attrs.numpy1 = ones((1,2), dtype='int16')
        # Check that we can retrieve a numpy object
        data = g_attrs.numpy1
        npcomp = ones((1,2), dtype='int16')
        # Check that both NumPy objects are equal
        assert isinstance(data, ndarray)
        # Check the type
        assert data.dtype.descr == npcomp.dtype.descr
        if common.verbose:
            print "npcomp-->", npcomp
            print "data-->", data
        assert allequal(npcomp, data, "numpy")

class AttributesOpenTestCase(AttributesTestCase):
    close = 0

class AttributesCloseTestCase(AttributesTestCase):
    close = 1

class StrlenTestCase(common.PyTablesTestCase):

    def setUp(self):

        # Create an instance of an HDF5 Table
        self.file = tempfile.mktemp(".h5")
        self.fileh = openFile(self.file, "w")
        group = self.fileh.createGroup(self.fileh.root, 'group')
        tablelayout = {'Text': StringCol(itemsize=1000),}
        self.table = self.fileh.createTable(group, 'table', tablelayout)
        self.table.flavor = 'numpy'
        row = self.table.row
        row['Text'] = 'Hello Francesc!'
        row.append()
        row['Text'] = 'Hola Francesc!'
        row.append()
        self.table.flush()

    def tearDown(self):
        self.fileh.close()
        os.remove(self.file)
        common.cleanup(self)

    def test01(self):
        """Checking the lengths of strings (read field)."""
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            self.table = self.fileh.root.group.table
        # Get both strings
        str1 = self.table.col('Text')[0]
        str2 = self.table.col('Text')[1]
        if common.verbose:
            print "string1-->", str1
            print "string2-->", str2
        # Check that both NumPy objects are equal
        assert len(str1) == len('Hello Francesc!')
        assert len(str2) == len('Hola Francesc!')
        assert str1 == 'Hello Francesc!'
        assert str2 == 'Hola Francesc!'

    def test02(self):
        """Checking the lengths of strings (read recarray)."""
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            self.table = self.fileh.root.group.table
        # Get both strings
        str1 = self.table[:]['Text'][0]
        str2 = self.table[:]['Text'][1]
        # Check that both NumPy objects are equal
        assert len(str1) == len('Hello Francesc!')
        assert len(str2) == len('Hola Francesc!')
        assert str1 == 'Hello Francesc!'
        assert str2 == 'Hola Francesc!'


    def test03(self):
        """Checking the lengths of strings (read recarray, row by row)."""
        if self.close:
            self.fileh.close()
            self.fileh = openFile(self.file, "a")
            self.table = self.fileh.root.group.table
        # Get both strings
        str1 = self.table[0]['Text']
        str2 = self.table[1]['Text']
        # Check that both NumPy objects are equal
        assert len(str1) == len('Hello Francesc!')
        assert len(str2) == len('Hola Francesc!')
        assert str1 == 'Hello Francesc!'
        assert str2 == 'Hola Francesc!'


class StrlenOpenTestCase(StrlenTestCase):
    close = 0

class StrlenCloseTestCase(StrlenTestCase):
    close = 1


#--------------------------------------------------------

def suite():
    theSuite = unittest.TestSuite()
    niter = 1

    #theSuite.addTest(unittest.makeSuite(StrlenOpenTestCase))
    #theSuite.addTest(unittest.makeSuite(Basic0DOneTestCase))
    #theSuite.addTest(unittest.makeSuite(GroupsArrayTestCase))
    for i in range(niter):
        theSuite.addTest(unittest.makeSuite(Basic0DOneTestCase))
        theSuite.addTest(unittest.makeSuite(Basic0DTwoTestCase))
        theSuite.addTest(unittest.makeSuite(Basic1DOneTestCase))
        theSuite.addTest(unittest.makeSuite(Basic1DTwoTestCase))
        theSuite.addTest(unittest.makeSuite(Basic1DThreeTestCase))
        theSuite.addTest(unittest.makeSuite(Basic2DTestCase))
        theSuite.addTest(unittest.makeSuite(GroupsArrayTestCase))
        theSuite.addTest(unittest.makeSuite(TableReadTestCase))
        theSuite.addTest(unittest.makeSuite(TableNativeFlavorOpenTestCase))
        theSuite.addTest(unittest.makeSuite(TableNativeFlavorCloseTestCase))
        theSuite.addTest(unittest.makeSuite(AttributesOpenTestCase))
        theSuite.addTest(unittest.makeSuite(AttributesCloseTestCase))
        theSuite.addTest(unittest.makeSuite(StrlenOpenTestCase))
        theSuite.addTest(unittest.makeSuite(StrlenCloseTestCase))
        if common.heavy:
            theSuite.addTest(unittest.makeSuite(Basic10DTestCase))
            # The 32 dimensions case takes forever to run!!
            # theSuite.addTest(unittest.makeSuite(Basic32DTestCase))
    return theSuite


if __name__ == '__main__':
    unittest.main( defaultTest='suite' )
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.