__freedb__.py :  » Media-Sound-Audio » Python-Audio-Tools » audiotools-2.14 » audiotools » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Media Sound Audio » Python Audio Tools 
Python Audio Tools » audiotools 2.14 » audiotools » __freedb__.py
#!/usr/bin/python

#Audio Tools, a module and set of tools for manipulating audio data
#Copyright (C) 2007-2010  Brian Langenberger

#This program is free software; you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation; either version 2 of the License, or
#(at your option) any later version.

#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details.

#You should have received a copy of the GNU General Public License
#along with this program; if not, write to the Free Software
#Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA


from audiotools import VERSION,Con,cStringIO,sys,re,MetaData,AlbumMetaData,__most_numerous__,DummyAudioFile,MetaDataFileException

import gettext

gettext.install("audiotools",unicode=True)

#######################
#XMCD
#######################

class XMCDException(MetaDataFileException):
    def __unicode__(self):
        return _(u"Invalid XMCD file")

class XMCD:
    LINE_LIMIT = 78

    #values is a dict of key->value pairs
    #such as "TTITLE0":u"Track Name"
    #offsets is a list of track offset integers (in CD frames)
    #length is a total album length integer (in seconds)
    def __init__(self,values,offsets,length):
        self.__values__ = values
        self.offsets = offsets
        self.length = length

    def __repr__(self):
        return "XMCD(%s,%s,%s)" % (repr(self.__values__),
                                   repr(self.offsets),
                                   repr(self.length))

    def __getitem__(self,key):
        return self.__values__[key]

    def get(self,key,default):
        return self.__values__.get(key,default)

    def __setitem__(self,key,value):
        self.__values__[key] = value

    def __len__(self):
        return len(self.__values__)

    def keys(self):
        return self.__values__.keys()

    def values(self):
        return self.__values__.values()

    def items(self):
        return self.__values__.items()

    @classmethod
    def key_digits(cls,key):
        import re

        d = re.search(r'\d+',key)
        if (d is not None):
            return int(d.group(0))
        else:
            return -1

    def build(self):
        import string

        key_order = ['DISCID','DTITLE','DYEAR','TTITLE','EXTDD','EXTT',
                     'PLAYORDER']

        def by_pair(p1,p2):
            if (p1[0].rstrip(string.digits) in key_order):
                p1 = (key_order.index(p1[0].rstrip(string.digits)),
                      self.key_digits(p1[0]),
                      p1[0])
            else:
                p1 = (len(key_order),
                      self.key_digits(p1[0]),
                      p1[0])

            if (p2[0].rstrip(string.digits) in key_order):
                p2 = (key_order.index(p2[0].rstrip(string.digits)),
                      self.key_digits(p2[0]),
                      p2[0])
            else:
                p2 = (len(key_order),
                      self.key_digits(p2[0]),
                      p2[0])

            return cmp(p1,p2)

        def encode(u):
            return u.encode('utf-8')


        def split_fields(pairs):
            #returns index i which is less than l bytes from unicode string u
            def max_chars(u,l):
                for i in xrange(len(u.encode('utf-8')) + 1):
                    if (len(u[0:i].encode('utf-8')) > l):
                        return i - 1
                else:
                    return i

            for (key,value) in pairs:
                #line = u"%s=%s" % (key.decode('ascii'),value)
                keylen = len(key) + len("=")
                while ((keylen + len(value.encode('utf-8'))) > XMCD.LINE_LIMIT):
                    #latin-1 lines might be shorter, but shouldn't be longer
                    #UTF-8 assumes the worst case
                    cut = max_chars(value,XMCD.LINE_LIMIT - len(key) - len('='))
                    partial = value[0:cut]
                    value = value[cut:]
                    yield u"%s=%s" % (key.decode('ascii'),partial)

                yield u"%s=%s" % (key.decode('ascii'),value)

        return encode(u"# xmcd\n#\n# Track frame offsets:\n%(offsets)s\n#\n# Disc length: %(length)s seconds\n#\n%(fields)s\n" % \
            {"offsets":u"\n".join(["#\t%s" % (offset)
                                   for offset in self.offsets]),
             "length":self.length,
             "fields":"\n".join(split_fields(sorted(self.items(),by_pair)))})

    @classmethod
    def read(cls, filename):
        import StringIO,re

        try:
            f = open(filename,'r')
        except IOError:
            raise XMCDException(filename)

        try:
            data = f.read()
            try:
                data = data.decode('utf-8')
            except UnicodeDecodeError:
                try:
                    data = data.decode('ISO-8859-1')
                except UnicodeDecodeError:
                    raise XMCDException(filename)
        finally:
            f.close()

        return cls.read_data(data)

    #takes a unicode string of XMCD data
    #returns an XMCD object
    @classmethod
    def read_data(cls,data):
        if (not data.startswith(u"# xmcd")):
            raise XMCDException("")

        disc_length = re.search(r'# Disc length: (\d+)',data)
        if (disc_length is not None):
            disc_length = int(disc_length.group(1))

        track_lengths = re.compile(r'# Track frame offsets:\s+[#\s\d]+',
                                   re.DOTALL).search(data)
        if (track_lengths is not None):
            track_lengths = map(int,re.findall(r'\d+',track_lengths.group(0)))

        fields = {}

        for line in re.findall(r'.+=.*[\r\n]',data):
            (field,value) = line.split(u'=',1)
            field = field.encode('ascii')
            value = value.rstrip('\r\n')
            if (field in fields.keys()):
                fields[field] += value
            else:
                fields[field] = value

        return XMCD(values=fields,offsets=track_lengths,length=disc_length)

    #audiofiles should be a list of AudioFile-compatible objects
    #from the same album, possibly with valid MetaData
    @classmethod
    def from_files(cls, audiofiles):
        def track_string(track,album_artist,metadata):
            if (track.track_number() in metadata.keys()):
                metadata = metadata[track.track_number()]
                if (metadata.artist_name == album_artist):
                    return metadata.track_name
                else:
                    return u"%s / %s" % (metadata.artist_name,
                                         metadata.track_name)
            else:
                return u""

        audiofiles = [f for f in audiofiles if f.track_number() != 0]
        audiofiles.sort(lambda t1,t2: cmp(t1.track_number(),
                                          t2.track_number()))

        discid = DiscID([track.cd_frames() for track in audiofiles])

        metadata = dict([(t.track_number(),t.get_metadata())
                          for t in audiofiles
                         if (t.get_metadata() is not None)])

        artist_names = [m.artist_name for m in metadata.values()]
        if (len(artist_names) == 0):
            album_artist = u""
        elif ((len(artist_names) > 1) and
              (len(set(artist_names)) == len(artist_names))):
            #if all track artists are different, don't pick one
            album_artist = u"Various"
        else:
            album_artist = __most_numerous__(artist_names)

        return XMCD(dict([("DISCID",str(discid).decode('ascii')),
                          ("DTITLE",u"%s / %s" % \
                               (album_artist,
                                __most_numerous__([m.album_name for m in
                                                   metadata.values()]))),
                          ("DYEAR",__most_numerous__([m.year for m in
                                                      metadata.values()])),
                          ("EXTDD",u""),
                          ("PLAYORDER",u"")] + \
                         [("TTITLE%d" % (track.track_number() - 1),
                           track_string(track,album_artist,metadata))
                          for track in audiofiles] + \
                         [("EXTT%d" % (track.track_number() - 1),
                           u"")
                           for track in audiofiles]),
                    discid.offsets(),
                    (discid.length() / 75) + 2)

    @classmethod
    def from_cuesheet(cls, cuesheet, total_frames, sample_rate, metadata=None):
        if (metadata is None):
            metadata = MetaData()

        return cls.from_files([DummyAudioFile(
                    length = (pcm_frames * 75) / sample_rate,
                    metadata = metadata,
                    track_number = i + 1) for (i,pcm_frames) in enumerate(
                    cuesheet.pcm_lengths(total_frames))])

    def metadata(self):
        dtitle = self.get('DTITLE',u'')
        if (u' / ' in dtitle):
            (album_artist,album_name) = dtitle.split(u' / ',1)
        else:
            (album_artist,album_name) = (dtitle,dtitle)

        dyear = self.get('DYEAR',u'')

        tracks = []

        for key in self.keys():
            if (key.startswith('TTITLE')):
                tracknum = self.key_digits(key)
                if (tracknum == -1):
                    continue

                ttitle = self[key]

                if (u' / ' in ttitle):
                    (track_artist,track_name) = ttitle.split(u' / ',1)
                else:
                    track_name = ttitle
                    track_artist = album_artist

                tracks.append(MetaData(track_name=track_name,
                                       track_number=tracknum + 1,
                                       album_name=album_name,
                                       artist_name=track_artist,
                                       year=dyear))

        track_total = max([t.track_number for t in tracks])
        for t in tracks:
            t.track_total = track_total
        tracks.sort(lambda t1,t2: cmp(t1.track_number,t2.track_number))
        return AlbumMetaData(tracks)


#######################
#FREEDB
#######################

class DiscID:
    DISCID = Con.Struct('discid',
                        Con.UBInt8('digit_sum'),
                        Con.UBInt16('length'),
                        Con.UBInt8('track_count'))

    #tracks is a list of track lengths in CD frames
    #offsets, if present, is a list of track offsets in CD frames
    #length, if present, is the length of the entire disc in CD frames
    #lead_in is the location of the first track on the CD, in frames
    def __init__(self, tracks=[], offsets=None, length=None, lead_in=150):
        self.tracks = tracks
        self.__offsets__ = offsets
        self.__length__ = length
        self.__lead_in__ = lead_in

    @classmethod
    def from_cdda(cls, cdda):
        tracks = list(cdda)

        return cls(tracks=[t.length() for t in tracks],
                   offsets=[t.offset() for t in tracks],
                   length=cdda.length(),
                   lead_in=tracks[0].offset())

    def add(self, track):
        self.tracks.append(track)

    def offsets(self):
        if (self.__offsets__ is None):
            offsets = [self.__lead_in__]

            for track in self.tracks[0:-1]:
                offsets.append(track + offsets[-1])

            return offsets
        else:
            return self.__offsets__

    def length(self):
        if (self.__length__ is None):
            return sum(self.tracks)
        else:
            return self.__length__

    def idsuffix(self):
        return str(len(self.tracks)) + " " + \
               " ".join([str(offset) for offset in self.offsets()]) + \
               " " + str((self.length() + self.__lead_in__) / 75)

    def __str__(self):
        def __count_digits__(i):
            if (i == 0):
                return 0
            else:
                return (i % 10) + __count_digits__(i / 10)

        disc_id = Con.Container()

        disc_id.track_count = len(self.tracks)
        disc_id.length = self.length() / 75
        disc_id.digit_sum = sum([__count_digits__(o / 75)
                                 for o in self.offsets()]) % 0xFF

        return DiscID.DISCID.build(disc_id).encode('hex')

    def freedb_id(self):
        return str(self) + " " + self.idsuffix()

    def toxmcd(self, output):
        output.write(XMCD.from_files(
            [DummyAudioFile(length,None,i + 1)
             for (i,length) in enumerate(self.tracks)]).build())

class FreeDBException(Exception): pass

class FreeDB:
    LINE = re.compile(r'\d\d\d\s.+')

    def __init__(self, server, port, messenger):
        self.server = server
        self.port = port
        self.socket = None
        self.r = None
        self.w = None
        self.messenger = messenger

    def connect(self):
        import socket

        try:
            self.messenger.info(_(u"Connecting to \"%s\"") % (self.server))

            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.socket.connect((self.server,self.port))

            self.r = self.socket.makefile("rb")
            self.w = self.socket.makefile("wb")

            (code,msg) = self.read()  #the welcome message
            if (code == 201):
                self.messenger.info(_(u"Connected ... attempting to login"))
            else:
                self.r.close()
                self.w.close()
                self.socket.close()
                raise FreeDBException(_(u"Invalid hello message"))

            self.write("cddb hello user %s %s %s" % \
                       (socket.getfqdn(),"audiotools",VERSION))

            (code,msg) = self.read()  #the handshake successful message
            if (code != 200):
                self.r.close()
                self.w.close()
                self.socket.close()
                raise FreeDBException(_(u"Handshake unsuccessful"))

            self.write("proto 6")

            (code,msg) = self.read()  #the protocol successful message
            if ((code != 200) and (code != 201)):
                self.r.close()
                self.w.close()
                self.socket.close()
                raise FreeDBException(_(u"Protocol change unsuccessful"))

        except socket.error,err:
            raise FreeDBException(err[1])

    def close(self):
        self.messenger.info(_(u"Closing connection"))

        self.write("quit")
        (code,msg) = self.read()  #the quit successful message

        self.r.close()
        self.w.close()
        self.socket.close()

    def write(self, line):
        if (self.socket is not None):
            self.w.write(line)
            self.w.write("\r\n")
            self.w.flush()

    def read(self):
        line = self.r.readline()
        if (FreeDB.LINE.match(line)):
            return (int(line[0:3]),line[4:].rstrip("\r\n"))
        else:
            return (None,line.rstrip("\r\n"))

    def query(self, disc_id):
        matches = []

        self.messenger.info(
            _(u"Sending Disc ID \"%(disc_id)s\" to server \"%(server)s\"") % \
                {"disc_id":str(disc_id).decode('ascii'),
                 "server":self.server.decode('ascii','replace')})

        self.write("cddb query " + disc_id.freedb_id())
        (code,msg) = self.read()
        if (code == 200):
            matches.append(msg)
        elif ((code == 211) or (code == 210)):
            while (msg != "."):
                (code,msg) = self.read()
                if (msg != "."):
                    matches.append(msg)

        if (len(matches) == 1):
            self.messenger.info(_(u"1 match found"))
        else:
            self.messenger.info(_(u"%s matches found") % (len(matches)))

        return map(lambda m: m.split(" ",2), matches)

    #category and id are raw strings, as returned by query()
    #output is a file handle the output will be written to
    def read_data(self, category, id, output):
        self.write("cddb read " + category + " " + id)
        (code,msg) = self.read()
        if (code == 210):
            line = self.r.readline()
            while (line.strip() != "."):
                output.write(line)
                line = self.r.readline()
        else:
            print >>sys.stderr,(code,msg)



class FreeDBWeb(FreeDB):
    def __init__(self, server, port, messenger):
        self.server = server
        self.port = port
        self.connection = None
        self.messenger = messenger

    def connect(self):
        import httplib

        self.connection = httplib.HTTPConnection(self.server,self.port)

    def close(self):
        if (self.connection is not None):
            self.connection.close()

    def write(self, line):
        import urllib,socket

        u = urllib.urlencode({"hello":"user %s %s %s" % \
                                      (socket.getfqdn(),
                                       "audiotools",
                                       VERSION),
                              "proto":str(6),
                              "cmd":line})

        try:
            self.connection.request(
                "POST",
                "/~cddb/cddb.cgi",
                u,
                {"Content-type":"application/x-www-form-urlencoded",
                 "Accept": "text/plain"})
        except socket.error,msg:
            raise FreeDBException(str(msg))

    def read(self):
        response = self.connection.getresponse()
        return response.read()

    def __parse_line__(self, line):
        if (FreeDB.LINE.match(line)):
            return (int(line[0:3]),line[4:].rstrip("\r\n"))
        else:
            return (None,line.rstrip("\r\n"))

    def query(self, disc_id):
        matches = []

        self.messenger.info(
            _(u"Sending Disc ID \"%(disc_id)s\" to server \"%(server)s\"") % \
                {"disc_id":str(disc_id).decode('ascii'),
                 "server":self.server.decode('ascii','replace')})

        self.write("cddb query " + disc_id.freedb_id())
        data =  cStringIO.StringIO(self.read())
        (code,msg) = self.__parse_line__(data.readline())
        if (code == 200):
            matches.append(msg)
        elif ((code == 211) or (code == 210)):
            while (msg != "."):
                (code,msg) = self.__parse_line__(data.readline())
                if (msg != "."):
                    matches.append(msg)

        if (len(matches) == 1):
            self.messenger.info(_(u"1 match found"))
        else:
            self.messenger.info(_(u"%s matches found") % (len(matches)))

        return map(lambda m: m.split(" ",2), matches)

    #category and id are raw strings, as returned by query()
    #output is a file handle the output will be written to
    def read_data(self, category, id, output):
        self.write("cddb read " + category + " " + id)
        data = cStringIO.StringIO(self.read())
        (code,msg) = self.__parse_line__(data.readline())
        if (code == 210):
            line = data.readline()
            while (line.strip() != "."):
                output.write(line)
                line = data.readline()
        else:
            print >>sys.stderr,(code,msg)


#matches is a list of (category,disc_id,title) tuples returned from
#FreeDB.query().  If the length of that list is 1, return the first
#item.  If the length is greater than one, present the user a list of
#choices and force him/her to pick the closest match for the CD.
#That data can then be sent to FreeDB.read_data()
def __select_match__(matches, messenger):
    if (len(matches) == 1):
        return matches[0]
    elif (len(matches) < 1):
        return None
    else:
        messenger.info(_(u"Please Select the Closest Match:"))
        selected = 0
        while ((selected < 1) or (selected > len(matches))):
            for i in range(len(matches)):
                messenger.info(_(u"%(choice)s) [%(genre)s] %(name)s") % \
                                   {"choice":i + 1,
                                    "genre":matches[i][0],
                                    "name":matches[i][2].decode('utf-8',
                                                                'replace')})
            try:
                messenger.partial_info(_(u"Your Selection [1-%s]:") % \
                                           (len(matches)))
                selected = int(sys.stdin.readline().strip())
            except ValueError:
                selected = 0

        return matches[selected - 1]

def __select_default_match__(matches, selection):
    if (len(matches) < 1):
        return None
    else:
        try:
            return matches[selection]
        except IndexError:
            return matches[0]

#takes a DiscID value and a file handle for output
#and runs the entire FreeDB querying sequence
#the file handle is closed at the conclusion of this function
#if at least one match is found
#returns the number of matches
def get_xmcd(disc_id, output, freedb_server, freedb_server_port,
             messenger,default_selection=None):
    try:
        freedb = FreeDBWeb(freedb_server,freedb_server_port,messenger)
        freedb.connect()
    except FreeDBException,msg:
        #if an exception occurs during the opening,
        #freedb will auto-close its sockets
        raise IOError(str(msg))

    try:
        matches = freedb.query(disc_id)
        #HANDLE MULTIPLE MATCHES, or NO MATCHES
        if (len(matches) > 0):
            if (default_selection is None):
                (category,idstring,title) = __select_match__(
                    matches,messenger)
            else:
                (category,idstring,title) = __select_default_match__(
                    matches,default_selection)

            freedb.read_data(category,idstring,output)
            output.close()

        freedb.close()
    except FreeDBException,msg:
        #otherwise, close the sockets manually
        freedb.close()
        raise IOError(str(msg))

    if (len(matches) > 0):
        messenger.info(_(u"%s written") % (messenger.filename(output.name)))
    return len(matches)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.