serialize.py :  » Game-2D-3D » PyScrabble » pyscrabble-1.6.2 » pyscrabble » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Game 2D 3D » PyScrabble 
PyScrabble » pyscrabble 1.6.2 » pyscrabble » serialize.py
"""

Custom serializer class to replace pickle.  This does not use eval() when (de)serializing objects and thus 
is safe from arbitrary code execution (hopefully)

Adapted from:
    
    http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/415791 
"""

from cStringIO import StringIO
from struct import pack,unpack
from types import IntType,TupleType,StringType,FloatType,LongType,ListType,DictType,NoneType,BooleanType,UnicodeType
from pyscrabble.game.player import User,Player,PlayerInfo
from pyscrabble.game.pieces import Letter,Move
from pyscrabble.game.game import ScrabbleGameInfo
from pyscrabble.lookup import ServerMessage
from pyscrabble.util import Time,TimeDeltaWrapper,ServerBulletin,PrivateMessage,RingList
from pyscrabble import audit
from pyscrabble.command import helper
from time import struct_time
from datetime import timedelta
import zlib

class EncodeError(Exception): pass
class DecodeError(Exception): pass
class ProtocolError(Exception): pass

HEADER = "SRW3"

protocol = {
    ServerBulletin     : "a",
    BooleanType        : "b",
    PlayerInfo         : "c",
    timedelta          : "d",
    Time               : "e",
    TimeDeltaWrapper   : "f",
    ScrabbleGameInfo   : "g",
    audit.Action       : "h",
    RingList           : "i",
    helper.Command     : "j",
    Letter             : "l",
    Move               : "m",
    Player             : "p",
    PrivateMessage     : "r",
    ServerMessage      : "s",
    struct_time        : "t",
    User               : "u",
    LongType           : "B",
    DictType           : "D",
    FloatType          : "F",
    IntType            : "I",
    ListType           : "L",
    NoneType           : "N",
    StringType         : "S",
    TupleType          : "T",
    UnicodeType        : "U"
}

def x_for(type, data, error):
    result = None
    if type in data:
        result = data[type]
    else:
        for d in data:
            if issubclass(type, d):
                result = data[d]
                break
    if result is not None:
        return result
    else:
        raise error, "%s not found in %s" % (str(type), str(data))

def protocol_for(objType):
    return x_for(objType, protocol, ProtocolError)

def encoder_for(objType):
    return x_for(objType, encoder, EncodeError)

def decoder_for(objType):
    return x_for(objType, decoder, DecodeError)

encoder = {}
class register_encoder_for_type(object):
    """Registers an encoder function, for a type, in the global encoder dictionary."""
    def __init__(self, t):
        self.type = t
    def __call__(self, func):
        encoder[self.type] = func
        return func

#contains dictionary of decoding functions, where the dictionary key is the type prefix used.
decoder = {}
class register_decoder_for_type(object):
    """Registers a decoder function, for a prefix, in the global decoder dictionary."""
    def __init__(self, t):
        self.prefix = protocol_for(t)
    def __call__(self, func):
        decoder[self.prefix] = func
        return func

## <encoding functions> ##
@register_encoder_for_type(DictType)
def enc_dict_type(obj):
    data = "".join([encoder_for(type(i))(i) for i in obj.items()])
    return "%s%s%s" % ("D", pack("!L", len(data)), data)

@register_encoder_for_type(struct_time)
def enc_struct_time_type(obj):
    data = "".join([encoder_for(type(i))(i) for i in obj])
    return "%s%s%s" % (protocol[struct_time], pack("!L", len(data)), data)
    
@register_encoder_for_type(TupleType)
@register_encoder_for_type(ListType)
def enc_list_type(obj):
    data = "".join([encoder_for(type(i))(i) for i in obj])
    return "%s%s%s" % (protocol_for(type(obj)), pack("!L", len(data)), data)

@register_encoder_for_type(IntType)
def enc_int_type(obj):
    return "%s%s" % (protocol[IntType], pack("!i", obj))

@register_encoder_for_type(FloatType)
def enc_float_type(obj):
    return "%s%s" % (protocol[FloatType], pack("!d", obj))

@register_encoder_for_type(LongType)
def enc_long_type(obj):
    obj = hex(obj)[2:-1]
    return "%s%s%s" % (protocol[LongType], pack("!L", len(obj)), obj)

@register_encoder_for_type(UnicodeType)
def enc_unicode_type(obj):
    obj = obj.encode('utf-8')
    return "%s%s%s" % (protocol[UnicodeType], pack("!L", len(obj)), obj)


@register_encoder_for_type(StringType)
def enc_string_type(obj):
    return "%s%s%s" % (protocol[StringType], pack("!L", len(obj)), obj)

@register_encoder_for_type(NoneType)
def enc_none_type(obj):
    return protocol[NoneType]

@register_encoder_for_type(BooleanType)
def enc_bool_type(obj):
    return protocol[BooleanType] + str(int(obj))

@register_encoder_for_type(User)
@register_encoder_for_type(Player)
@register_encoder_for_type(Move)
@register_encoder_for_type(Letter)
@register_encoder_for_type(ScrabbleGameInfo)
@register_encoder_for_type(ServerMessage)
@register_encoder_for_type(PrivateMessage)
@register_encoder_for_type(ServerBulletin)
@register_encoder_for_type(PlayerInfo)
@register_encoder_for_type(Time)
@register_encoder_for_type(TimeDeltaWrapper)
@register_encoder_for_type(audit.Action)
@register_encoder_for_type(RingList)
@register_encoder_for_type(helper.Command)
def enc_obj_type(obj):
    data = "".join([encoder_for(type(i))(i) for i in obj.__dict__.items()])
    return "%s%s%s" % (protocol_for(type(obj)), pack("!L", len(data)), data)

@register_encoder_for_type(timedelta)
def enc_time_delta_type(obj):
    data = (obj.days, obj.seconds)
    data = "".join([encoder_for(type(i))(i) for i in data])
    return "%s%s%s" % (protocol_for(type(obj)), pack("!L", len(data)), data)
    

def dumps(obj, compress=False):
    """Encode simple Python types into a binary string."""
    option = "N"
    if compress: option = "Z"
    
    data = encoder_for(type(obj))(obj)
        
    if compress: 
        data = zlib.compress(data)
    x = "%s%s%s" % (HEADER, option, data)
    return x
    
        
## </encoding functions> ##

## <decoding functions> ##
def build_sequence(data, cast=list):
    size = unpack('!L', data.read(4))[0]
    items = []
    data_tell = data.tell
    data_read = data.read
    items_append = items.append
    start_position = data.tell()
    while (data_tell() - start_position) < size:
        T = data_read(1)
        value = decoder_for(T)(data)
        items_append(value)
    return cast(items)

@register_decoder_for_type(struct_time)
@register_decoder_for_type(TupleType)
def dec_tuple_type(data):
    return build_sequence(data, cast=tuple)

@register_decoder_for_type(ListType)
def dec_list_type(data):
    return build_sequence(data, cast=list)

@register_decoder_for_type(DictType)
def dec_dict_type(data):
    return build_sequence(data, cast=dict)

@register_decoder_for_type(LongType)
def dec_long_type(data):
    size = unpack('!L', data.read(4))[0]
    value = long(data.read(size),16)
    return value

@register_decoder_for_type(StringType)
def dec_string_type(data):
    size = unpack('!L', data.read(4))[0]
    value = str(data.read(size))
    return value

@register_decoder_for_type(FloatType)
def dec_float_type(data):
    value = unpack('!d', data.read(8))[0]
    return value

@register_decoder_for_type(IntType)
def dec_int_type(data):
    value = unpack('!i', data.read(4))[0]
    return value

@register_decoder_for_type(NoneType)
def dec_none_type(data):
    return None

@register_decoder_for_type(BooleanType)
def dec_bool_type(data):
    value = int(data.read(1))
    return bool(value)

@register_decoder_for_type(UnicodeType)
def dec_unicode_type(data):
    size = unpack('!L', data.read(4))[0]
    value = data.read(size).decode('utf-8')
    return value

@register_decoder_for_type(User)
def dec_user_type(data):
    u = User()
    u.__dict__ = dec_dict_type(data)
    return u

@register_decoder_for_type(struct_time)
def dec_struct_time_type(data):
    return build_sequence(data, cast=tuple)

@register_decoder_for_type(Move)
def dec_move_type(data):
    obj = Move()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(Letter)
def dec_letter_type(data):
    obj = Letter()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(ScrabbleGameInfo)
def dec_sg_type(data):
    obj = ScrabbleGameInfo()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(Player)
def dec_player_type(data):
    obj = Player()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(ServerMessage)
def dec_server_message_type(data):
    obj = ServerMessage()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(PrivateMessage)
def dec_private_message_type(data):
    obj = PrivateMessage()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(ServerBulletin)
def dec_bulletin_type(data):
    obj = ServerBulletin()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(PlayerInfo)
def dec_player_info_type(data):
    obj = PlayerInfo()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(timedelta)
def dec_time_delta_type(data):
    days,seconds = dec_tuple_type(data)
    obj = timedelta(days=days, seconds=seconds)
    return obj

@register_decoder_for_type(Time)
def dec_time_type(data):
    obj = Time()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(TimeDeltaWrapper)
def dec_time_delta_wrapper_type(data):
    obj = TimeDeltaWrapper()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(audit.Action)
def dec_audit_action_type(data):
    d = dec_dict_type(data)
    obj = audit.fromType( d["type"] )
    obj.__dict__ = d
    return obj

@register_decoder_for_type(RingList)
def dec_ringlist_type(data):
    obj = RingList()
    obj.__dict__ = dec_dict_type(data)
    return obj

@register_decoder_for_type(helper.Command)
def dec_command_type(data):
    d = dec_dict_type(data)
    obj = helper.fromType( d["type"] )
    obj.__dict__ = d
    return obj


def loads(data):
    """
    Decode a binary string into the original Python types.
    """
    buffer = StringIO(data)
    header = buffer.read(len(HEADER))
    try:
        assert header == HEADER
    except AssertionError:
        print 'Header is',header
        raise
    option = buffer.read(1)
    if option == "Z":
        buffer = StringIO(zlib.decompress(buffer.read()))
    
    t = buffer.read(1)
    
    value = decoder_for(t)(buffer)
        
    return value
## </decoding functions> ##
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.