yapsnmp.py :  » Network » yapsnmp » yapsnmp-0.7.8 » yapsnmp-0.7.8 » src » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » yapsnmp 
yapsnmp » yapsnmp 0.7.8 » yapsnmp 0.7.8 » src » yapsnmp.py
#
# yapsnmp - Yet Another Python SNMP module
#
# This is a highish level Python SNMP module, based on my SWIGed net-snmp
# (formerly known as ucd-snmp) interface. It currently supports v1 and v2c,
# with full MIB parsing. v3 coming soon (shouldn't be technically a problem,
# simply a matter of finding the time to implement it).
#
#  -- Yves Perrenoud <yves@xpand.org>

__version__ = "0.7.8"

import netsnmp, socket, string

# initialize the library (mostly involves parsing in MIBs)

netsnmp.init_snmp("snmpapp")

# define supported versions of SNMP

snmpversions = {1: netsnmp.SNMP_VERSION_1,
                2: netsnmp.SNMP_VERSION_2c,
                }

# define the boolean flags that influence the library operation

MIB_ERRORS = netsnmp.NETSNMP_DS_LIB_MIB_ERRORS
SAVE_MIB_DESCRS = netsnmp.NETSNMP_DS_LIB_SAVE_MIB_DESCRS
MIB_COMMENT_TERM = netsnmp.NETSNMP_DS_LIB_MIB_COMMENT_TERM
MIB_PARSE_LABEL = netsnmp.NETSNMP_DS_LIB_MIB_PARSE_LABEL
DUMP_PACKET = netsnmp.NETSNMP_DS_LIB_DUMP_PACKET
LOG_TIMESTAMP = netsnmp.NETSNMP_DS_LIB_LOG_TIMESTAMP
DONT_READ_CONFIGS = netsnmp.NETSNMP_DS_LIB_DONT_READ_CONFIGS
MIB_REPLACE = netsnmp.NETSNMP_DS_LIB_MIB_REPLACE
PRINT_NUMERIC_ENUM = netsnmp.NETSNMP_DS_LIB_PRINT_NUMERIC_ENUM
PRINT_NUMERIC_OIDS = netsnmp.NETSNMP_DS_LIB_PRINT_NUMERIC_OIDS
DONT_BREAKDOWN_OIDS = netsnmp.NETSNMP_DS_LIB_DONT_BREAKDOWN_OIDS
ALARM_DONT_USE_SIG = netsnmp.NETSNMP_DS_LIB_ALARM_DONT_USE_SIG
PRINT_FULL_OID = netsnmp.NETSNMP_DS_LIB_PRINT_FULL_OID
QUICK_PRINT = netsnmp.NETSNMP_DS_LIB_QUICK_PRINT
RANDOM_ACCESS = netsnmp.NETSNMP_DS_LIB_RANDOM_ACCESS
REGEX_ACCESS = netsnmp.NETSNMP_DS_LIB_REGEX_ACCESS
DONT_CHECK_RANGE = netsnmp.NETSNMP_DS_LIB_DONT_CHECK_RANGE
NO_TOKEN_WARNINGS = netsnmp.NETSNMP_DS_LIB_NO_TOKEN_WARNINGS
NUMERIC_TIMETICKS = netsnmp.NETSNMP_DS_LIB_NUMERIC_TIMETICKS
ESCAPE_QUOTES = netsnmp.NETSNMP_DS_LIB_ESCAPE_QUOTES
REVERSE_ENCODE = netsnmp.NETSNMP_DS_LIB_REVERSE_ENCODE
PRINT_BARE_VALUE = netsnmp.NETSNMP_DS_LIB_PRINT_BARE_VALUE
EXTENDED_INDEX = netsnmp.NETSNMP_DS_LIB_EXTENDED_INDEX
PRINT_HEX_TEXT = netsnmp.NETSNMP_DS_LIB_PRINT_HEX_TEXT

# define module specific exceptions

class MIBParseError(Exception): pass
class SendError(Exception): pass
class GetError(SendError): pass
class VersionMismatch(Exception): pass
class OIDTypeError(Exception): pass
class PDUVarAddError(Exception): pass

# a sequencing testing function

def issequence(obj):
    if type(obj) == type(()) or type(obj) == type([]):
        return 1
    else:
        return 0

# function returning a clean string without surrounding double quotes

def cleanval(str):
    if str and (str[0] == str[-1] == '"'):
        return str[1:-1]
    else:
        return str

# function transforming an ASCII OID to the appropriate oid object

def get_node(reqoid):
    if reqoid[:2] == '.1':
        reqoid = 'iso' + reqoid[2:]

    oid = netsnmp.py_get_node(reqoid)
    if oid.this == 'NULL':
        raise MIBParseError, reqoid

    return oid

# function translating a numeric or partial OID, to a fully qualified version

def translate(reqoid):
    return netsnmp.py_sprint_objid(get_node(reqoid))

# function allowing you to set a flag to true

def set_flag(flag):
    netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, 1)

# function allowing you to set a flag to false

def clear_flag(flag):
    netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, 0)

# function allowing you to define the state of a flag

def define_flag(flag, state):
    netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, state)

# function allowing you to query the state of a flag

def get_flag(flag):
    return netsnmp.netsnmp_ds_get_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag)

# function performing a translate with guaranteed translation direction

def translate_to(reqoid, type):
    statetup = (netsnmp.NETSNMP_OID_OUTPUT_UCD,
                netsnmp.NETSNMP_OID_OUTPUT_NUMERIC)

    numstat = netsnmp.netsnmp_ds_get_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
                                     netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT)

    if numstat != statetup[type]:
        netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
                                   netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
                                   statetup[type])

    result = translate(reqoid)

    if numstat != statetup[type]:
        netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
                                   netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
                                   numstat)

    return result


# Let's set the default to return numeric timeticks and quick print

set_flag(NUMERIC_TIMETICKS)
set_flag(QUICK_PRINT)

# Reverting back to a more usable default output format

netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
                           netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
                           netsnmp.NETSNMP_OID_OUTPUT_UCD)


# The main Class allowing the SNMP interaction

class Session:
    def __init__(self, peername, version=1, community='public', port=161):
        session = netsnmp.snmp_session()
        netsnmp.snmp_sess_init(session)

        if not peername:
            raise ValueError, "expecting a non null string as host identifier"

        if socket.gethostbyname(peername):
            session.peername = peername

        self.version = version
        session.version = snmpversions[version]
        session.remote_port = port

        if type(community) == type(''):
            session.community = community
            session.community_len = len(community)
        else:
            raise TypeError, "expecting a community of type string"

        self.session = netsnmp.snmp_sess_open(session)

    def __del__(self):
        try:
            netsnmp.snmp_sess_close(self.session)
        except AttributeError:
            pass

    def __sprint__(self, variable, printoid):
        if printoid:
            value = string.split(netsnmp.py_sprint_variable(variable), None, 1)
            if len(value) < 2:
                value.append('')
            else:
                value[1] = cleanval(value[1])

            value = tuple(value)
        else:
            value = cleanval(netsnmp.py_sprint_value(variable))

        return value

    def __synch_response__(self, pdu, printoid=0):
        response = netsnmp.py_snmp_sess_synch_response(self.session, pdu)
        if response.this != 'NULL' and response.errstat == netsnmp.SNMP_ERR_NOERROR:
            if (not self.__sequence__) and (response.variables.next_variable.this == 'NULL'):
                value = self.__sprint__(response.variables, printoid)
            else:
                value = ()
                curvar = response.variables
                while curvar.this != 'NULL':
                    value = value + (self.__sprint__(curvar, printoid),)
                    curvar = curvar.next_variable

            netsnmp.snmp_free_pdu(response)
            return value
        else:
            if response.this == 'NULL':
                errmsg = "couldn't obtain response"
            else:
                errmsg = netsnmp.snmp_errstring(response.errstat)
                netsnmp.snmp_free_pdu(response)

            raise GetError, errmsg

    def __fill_get_pdu__(self, pdu, oidlist):
        if issequence(oidlist[0]):
            oidlist = oidlist[0]
            self.__sequence__ = 1
        else:
            self.__sequence__ = 0

        for argoid in oidlist:
            oid = get_node(argoid)
            status = netsnmp.snmp_add_null_var(pdu, oid.oid, oid.len)
            if not status:
                raise PDUVarAddError, "couldn't add %s to the pdu" % argoid

    def __fill_set_pdu__(self, pdu, pduoid, reqoid, args):
        if not issequence(reqoid):
            raise TypeError, "expecting a sequence"

        if issequence(reqoid[0]):
            oidlist = tuple(pduoid) + tuple(reqoid)
            self.__sequence__ = 1
        else:
            oidlist = tuple(pduoid) + (reqoid,) + args
            self.__sequence__ = 0

        for argoid in oidlist:
            oid = get_node(argoid[0])
            status = netsnmp.snmp_add_var(pdu, oid.oid, oid.len, argoid[1],
                                          argoid[2])
            if status:
                raise PDUVarAddError, "couldn't add %s to the pdu" % argoid[0]

    def get(self, reqoid, *args):
        pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GET)
        self.__fill_get_pdu__(pdu, (reqoid,) + args)
        return self.__synch_response__(pdu)

    def getnext(self, reqoid, *args):
        pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GETNEXT)
        self.__fill_get_pdu__(pdu, (reqoid,) + args)
        return self.__synch_response__(pdu, printoid=1)

    def getbulk(self, reqoid, *args, **kw):
        if self.version == 1:
            raise VersionMismatch, "GETBULK requires v2c and higher"

        pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GETBULK)

        try:
            pdu.errstat = kw['non_repeaters']
        except KeyError:
            pdu.errstat = 0

        try:
            pdu.errindex = kw['max_repetitions']
        except KeyError:
            pdu.errindex = 100

        self.__fill_get_pdu__(pdu, (reqoid,) + args)
        return self.__synch_response__(pdu, printoid=1)

    def set(self, reqoid, *args):
        pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_SET)
        self.__fill_set_pdu__(pdu, (), reqoid, args)
        return self.__synch_response__(pdu, printoid=1)

    def trap(self, uptime, trapoid, reqoid, *args):
        if self.version == 1:
            raise VersionMismatch, "currently only supports v2 traps"

        pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_TRAP2)
        pduoid = (('sysUpTime.0', 't', uptime),
                  ('snmpTrapOID.0', 'o', translate_to(trapoid, 1)))
        self.__fill_set_pdu__(pdu, pduoid, reqoid, args)

        status = netsnmp.snmp_sess_send(self.session, pdu)
        if not status:
            errmsg = py_snmp_sess_error(self.session)
            netsnmp.snmp_free_pdu(pdu)
            raise SendError, errmsg

    def walk(self, reqoid):
        lreqoid = translate_to(reqoid, 0)
        walklist = []
        nextoid = lreqoid

        while nextoid:
            if self.version == 1:
                oidtuplelist = [self.getnext(nextoid)]
            else:
                oidtuplelist = self.getbulk(nextoid)

            for oidtuple in oidtuplelist:
                nextoid = oidtuple[0]
                if lreqoid == nextoid[:len(lreqoid)]:
                    oidtuple = (reqoid + oidtuple[0][len(lreqoid):], oidtuple[1])
                    walklist.append(oidtuple)
                else:
                    nextoid = ''
                    break

        return tuple(walklist)

    def table(self, reqoid):
        walktuple = self.walk(reqoid)

        if not walktuple:
            raise OIDTypeError, "expecting a table"

        posoid = string.split(walktuple[0][0], '.').index(reqoid)

        headers = []
        tabledict = {}
        table = []

        for oidtuple in walktuple:
            oidlist = string.split(oidtuple[0], '.')

            if oidlist[posoid + 1][-5:] != "Entry":
                raise OIDTypeError, "expecting a table"

            index = string.join(oidlist[posoid + 3:], '.')
            varname = oidlist[posoid + 2]

            if varname not in headers:
                headers.append(varname)

            if not tabledict.has_key(index):
                tabledict[index] = {}

            tabledict[index][varname] = oidtuple[1]

        indexes = tabledict.keys()
        indexes.sort()
        for index in indexes:
            table.append(tabledict[index])

        return tuple(table)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.