#!/usr/bin/python
"""Waits on port 4950, ready to decode any OSC packets that come its
way."""
import sys, time, socket
from athenaCL.libATH.osc import OSC
from athenaCL.tools.util.superCollider.scListenerSingleton import Future
def decodeOSC(data):
"""Converts a typetagged OSC message to a Python list."""
table = {"i":OSC.readInt, "f":OSC.readFloat, "s":OSC.readString, "b":OSC.readBlob}
decoded = []
address, rest = OSC.readString(data)
typetags = ""
if address == "#bundle":
time, rest = OSC.readLong(rest)
decoded.append(address)
decoded.append(time)
while len(rest)>0:
length, rest = OSC.readInt(rest)
decoded.append(decodeOSC(rest[:length]))
rest = rest[length:]
elif len(rest)>0:
typetags, rest = OSC.readString(rest)
decoded.append(address)
decoded.append(typetags)
if(typetags[0] == ","):
for tag in typetags[1:]:
value, rest = table[tag](rest)
decoded.append(value)
else:
print "Oops, typetag lacks the magic ,"
# return only the data
return decoded
class getOSC:
def __init__(self, port):
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.bind(('', port))
self.stringIn = 1
Future(self.stringKill)
time.sleep(.5)
print ""
while self.stringIn == 1:
self.s.settimeout(.5)
try:
data, address = self.s.recvfrom(2**12)
print address
OSC.hexDump(data)
print decodeOSC(data)
self.printLines()
except socket.timeout:
print "No Data coming from port %s; (any key quits)" %port
def printLines(self):
print "_____________________"
def stringKill(self):
self.stringIn = raw_input("ENTER KEY QUITS")
print self.stringIn
def run():
port = 57110
helpArguments = ('--help', '-h', '-help')
help = """%s
Listens for OSC packets, decoding and printing
them as they arrive.
Usage:
%s [port]
(port defaults to %d)\n""" % (sys.argv[0], sys.argv[0], port)
if len(sys.argv) >= 2:
if sys.argv[1] in helpArguments:
print help
sys.exit(0)
try:
port = int(sys.argv[1])
except:
print help
sys.exit(0)
x = getOSC(port)
if __name__ == '__main__':
run()
|