#-----------------------------------------------------------------||||||||||||--
# Name: scOSCMessage.py
# Purpose: for Building SuperCollider specific OSC messages...
# it's a bit weird in its implementation of OSC protocol...
# don't ask how I figured this out, I've been reading hex all
# night.
# - Deals also with converting time formats between OSC, which is
# once again idiosyncratic.
# - Deals also with sending bundles of OSC messages.
# - Deals with some binary stuff (struct) that isn't omplemented
# in the wiretap OSC implementation. I'm trying to keep files
# from wiretap untouched in case they upgrade their modules.
#
# Author: Jonathan Saggau (using a class from wiretap (LGPL))
#
# Copyright: (c) 2004 Jonathan Saggau
# some parts Copyright (C) 2002 Daniel Holth, Clinton McChesney
# License: GPL
#-----------------------------------------------------------------||||||||||||--
from athenaCL.libATH.osc import OSC
import time, struct, math
#-----------------------------------------------------------------||||||||||||--
### F(n) defs for converting time to OTC network bytes
from math import modf
currentTime = time.time
seconds1900To1970 = 2208988800L
# (70*365*24*60*60) + (17*24*60*60) 17 leap Years Holy crap...
# From the Supercollider source! ...
kSecondsToOSC = 4294967296L # pow(2,32)/1
kMicrosToOSC = 4294.967296 # pow(2,32)/1e6
kNanosToOSC = 4.294967296 # pow(2,32)/1e9
kOSCtoSecs = 2.328306436538696e-10 # 1/pow(2,32)
kOSCtoNanos = 0.2328306436538696 # 1e9/pow(2,32)
#
def fractionalPart(floater):
"""Returns the Fractional part (_fractionalPart(1.5) == .5) of a
number"""
return(modf(float(floater))[0])
# float conversion is there just in case I get an int or something
def fractionalPartToInt(floater):
fpart = fractionalPart(floater)
powersOfTen = len(str(fpart)) - 2
return int(fpart * (10**powersOfTen))
def integerPart(floater): return(int(floater))
def byteLength(CommandBinary): # TO DO
"""returns the length in bytes of the command"""
pass
def convertStringToOSCBinary(stringin): # from OSC.py.OSCArgument
OSCstringLength = math.ceil((len(stringin)+1) / 4.0) * 4
return(struct.pack(">%ds" % (OSCstringLength), stringin))
def convertIntToNetworkLongInt(longInt):
return struct.pack('>Q', longInt)
def convertIntToNetworkInt(shortInt):
return struct.pack('>i', shortInt)
def convertNetworkToLongInt(byteString):
out = struct.unpack('>Q', byteString)
#print out
return out[0]
def oscSecondFraction(fPartOfTime):
return int((fPartOfTime)*(2**32))
def timeSince1970ToOscTime(timeSince1970):
since1900 = timeSince1970 + seconds1900To1970
return long(since1900 * kSecondsToOSC)
def currentOscTime():
return(timeSince1970ToOscTime(currentTime()))
def intervalToOscTime(interval, latency = 0):
# in case I want to do network latency later
return(timeSince1970ToOscTime(currentTime() + interval - latency))
def lengthOfMsg(msg): # byte length of message
return len(msg)
### Test Code for converting time to OSC network bytes
def testOscTime():
x = .5
print x
y = oscSecondFraction(.5)
print y
z = convertIntToNetworkLongInt(y)
print z
a = convertNetworkToLongInt(z)
print a
b = timeSince1970ToOscTime(1.1)
print b
c = intervalToOscTime(.5)
#-----------------------------------------------------------------||||||||||||--
# below from wiretap (OSC.py) modified to return more information
def decodeSCOSC(data):
"""Converts a typetagged OSC message to a Python list."""
table = {"i":OSC.readInt, "f":OSC.readFloat, "s":OSC.readString, "b":OSC.readBlob, "d":OSC.readDouble}
# gonna maybe need OSC.readDouble
decoded = []
address, rest = OSC.readString(data)
typetags = ""
if address == "#bundle":
time, rest = OSC.readLong(rest)
decoded.append(address)
decoded.append(time)
while len(rest)>0:
length, rest = OSC.readInt(rest)
decoded.append(OSC.decodeOSC(rest[:length]))
rest = rest[length:]
elif len(rest)>0:
typetags, rest = OSC.readString(rest)
decoded.append(address)
decoded.append(typetags)
if(typetags[0] == ","):
#print typetags
for tag in typetags[1:]:
try:
value, rest = table[tag](rest)
decoded.append(value)
except:
print "%s probably not in tags list" %tag
print "check scOSCMessage.py - def decodeSCOSC():"
else:
print "Oops, typetag lacks the magic ,"
returnList = [decoded[0]]
returnList.extend(decoded[2:])
return returnList
#-----------------------------------------------------------------||||||||||||--
class oscBundleBuilder:
"""builds OSC bundles from message lists
bundleFormat(binary) =
'#bundle' time(Long) lengthofCommand1(int) command1(oscbinarymessage)\
lengthofcommmand2 command2... length of commandn commandn
"""
def __init__(self, whenInSecondsSince1970 = None, msgListList = None):
"COnstructor"
self.msgBuilder = scOSCMessage() # instantiate a new message builder
self.binaryData = convertStringToOSCBinary('#bundle')
# tagg the message as a bundle (binary)
if whenInSecondsSince1970 != None:
self.timeLongInt = timeSince1970ToOscTime(whenInSecondsSince1970)
else:
self.timeLongInt = currentOscTime()
# set the time tag
self.msgListList = msgListList
def new(self, whenInSecondsSince1970, msgListList):
"resets the object w/ new data"
self.msgBuilder = scOSCMessage()
self.binaryData = convertStringToOSCBinary('#bundle')
if whenInSecondsSince1970 <= currentTime():
self.timeLongInt = 1L # this tells SC to do it NOW, ie asap
else:
self.timeLongInt = timeSince1970ToOscTime(whenInSecondsSince1970)
self.msgListList = msgListList
def processMsg(self):
self.binaryData += convertIntToNetworkLongInt(self.timeLongInt)
for eachMsg in self.msgListList:
self.msgBuilder.clearData()
for dataGuy in eachMsg:
self.msgBuilder.append(dataGuy)
protoMsg = self.msgBuilder.getBinary()
lenprotoMsg = lengthOfMsg(protoMsg)
self.binaryData += (convertIntToNetworkInt(lenprotoMsg) + protoMsg)
def getMessage(self):
return self.binaryData
#-----------------------------------------------------------------||||||||||||--
class scOSCMessage:
"""Builds OSC messages for SuperCollider.
SC doesn't like the regular ones.
Algo: build messages one at a time and throw in a list
don't build first message's tag, build all others' tags (ints, strings, etc.)
fill out the tag bytes to work % 4
build the network message in binary (first OSCArgument(message)[1] +
OSCArgument(tags(not First message)) +
rest OSCArgument(message)[1])
terurn message through getBinary() method in OSC.py
"""
def __init__(self):
self.typetags = ","
self.message = ""
self.messages = []
def setMessage(self, message):
self.message = message
def setTypetags(self, typetags):
self.typetags = typetags
def clear(self):
self.clearData()
def clearData(self):
self.typetags = ","
self.message = ""
self.messages = []
def append(self, argument, typehint = None):
"""Appends data to the message,
updating the typetags based on
the argument's type.
If the argument is a blob (counted string)
pass in 'b' as typehint."""
if typehint == 'b':
binary = OSC.OSCBlob(argument)
else:
binary = OSC.OSCArgument(argument)
if len(self.messages) != 0:
self.typetags = self.typetags + binary[0]
self.messages.append(binary[1])
def rawAppend(self, data):
"""Appends raw data to the message. Use append()."""
self.message = self.message + data
#print "rawAppend changed self.message to %s"%str(self.message)
def getBinary(self):
"""Returns the binary message (so far) with typetags."""
typetags = OSC.OSCArgument(self.typetags)[1]
if len(self.messages) == 0:
pass
elif len(self.messages) == 1:
return self.messages[0] + OSC.OSCArgument(self.typetags)[1]
# Commands of length one need the first typetag
else:
outMessage = self.messages[0] + typetags
restOfEm = self.messages[1:]
for eachMessage in restOfEm:
outMessage += eachMessage
return outMessage
def __repr__(self):
return self.getBinary()
#-----------------------------------------------------------------||||||||||||--
if __name__=='__main__':
testOscTime()
|