#
# yapsnmp - Yet Another Python SNMP module
#
# This is a highish level Python SNMP module, based on my SWIGed net-snmp
# (formerly known as ucd-snmp) interface. It currently supports v1 and v2c,
# with full MIB parsing. v3 coming soon (shouldn't be technically a problem,
# simply a matter of finding the time to implement it).
#
# -- Yves Perrenoud <yves@xpand.org>
__version__ = "0.7.8"
import netsnmp, socket, string
# initialize the library (mostly involves parsing in MIBs)
netsnmp.init_snmp("snmpapp")
# define supported versions of SNMP
snmpversions = {1: netsnmp.SNMP_VERSION_1,
2: netsnmp.SNMP_VERSION_2c,
}
# define the boolean flags that influence the library operation
MIB_ERRORS = netsnmp.NETSNMP_DS_LIB_MIB_ERRORS
SAVE_MIB_DESCRS = netsnmp.NETSNMP_DS_LIB_SAVE_MIB_DESCRS
MIB_COMMENT_TERM = netsnmp.NETSNMP_DS_LIB_MIB_COMMENT_TERM
MIB_PARSE_LABEL = netsnmp.NETSNMP_DS_LIB_MIB_PARSE_LABEL
DUMP_PACKET = netsnmp.NETSNMP_DS_LIB_DUMP_PACKET
LOG_TIMESTAMP = netsnmp.NETSNMP_DS_LIB_LOG_TIMESTAMP
DONT_READ_CONFIGS = netsnmp.NETSNMP_DS_LIB_DONT_READ_CONFIGS
MIB_REPLACE = netsnmp.NETSNMP_DS_LIB_MIB_REPLACE
PRINT_NUMERIC_ENUM = netsnmp.NETSNMP_DS_LIB_PRINT_NUMERIC_ENUM
PRINT_NUMERIC_OIDS = netsnmp.NETSNMP_DS_LIB_PRINT_NUMERIC_OIDS
DONT_BREAKDOWN_OIDS = netsnmp.NETSNMP_DS_LIB_DONT_BREAKDOWN_OIDS
ALARM_DONT_USE_SIG = netsnmp.NETSNMP_DS_LIB_ALARM_DONT_USE_SIG
PRINT_FULL_OID = netsnmp.NETSNMP_DS_LIB_PRINT_FULL_OID
QUICK_PRINT = netsnmp.NETSNMP_DS_LIB_QUICK_PRINT
RANDOM_ACCESS = netsnmp.NETSNMP_DS_LIB_RANDOM_ACCESS
REGEX_ACCESS = netsnmp.NETSNMP_DS_LIB_REGEX_ACCESS
DONT_CHECK_RANGE = netsnmp.NETSNMP_DS_LIB_DONT_CHECK_RANGE
NO_TOKEN_WARNINGS = netsnmp.NETSNMP_DS_LIB_NO_TOKEN_WARNINGS
NUMERIC_TIMETICKS = netsnmp.NETSNMP_DS_LIB_NUMERIC_TIMETICKS
ESCAPE_QUOTES = netsnmp.NETSNMP_DS_LIB_ESCAPE_QUOTES
REVERSE_ENCODE = netsnmp.NETSNMP_DS_LIB_REVERSE_ENCODE
PRINT_BARE_VALUE = netsnmp.NETSNMP_DS_LIB_PRINT_BARE_VALUE
EXTENDED_INDEX = netsnmp.NETSNMP_DS_LIB_EXTENDED_INDEX
PRINT_HEX_TEXT = netsnmp.NETSNMP_DS_LIB_PRINT_HEX_TEXT
# define module specific exceptions
class MIBParseError(Exception): pass
class SendError(Exception): pass
class GetError(SendError): pass
class VersionMismatch(Exception): pass
class OIDTypeError(Exception): pass
class PDUVarAddError(Exception): pass
# a sequencing testing function
def issequence(obj):
if type(obj) == type(()) or type(obj) == type([]):
return 1
else:
return 0
# function returning a clean string without surrounding double quotes
def cleanval(str):
if str and (str[0] == str[-1] == '"'):
return str[1:-1]
else:
return str
# function transforming an ASCII OID to the appropriate oid object
def get_node(reqoid):
if reqoid[:2] == '.1':
reqoid = 'iso' + reqoid[2:]
oid = netsnmp.py_get_node(reqoid)
if oid.this == 'NULL':
raise MIBParseError, reqoid
return oid
# function translating a numeric or partial OID, to a fully qualified version
def translate(reqoid):
return netsnmp.py_sprint_objid(get_node(reqoid))
# function allowing you to set a flag to true
def set_flag(flag):
netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, 1)
# function allowing you to set a flag to false
def clear_flag(flag):
netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, 0)
# function allowing you to define the state of a flag
def define_flag(flag, state):
netsnmp.netsnmp_ds_set_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag, state)
# function allowing you to query the state of a flag
def get_flag(flag):
return netsnmp.netsnmp_ds_get_boolean(netsnmp.NETSNMP_DS_LIBRARY_ID, flag)
# function performing a translate with guaranteed translation direction
def translate_to(reqoid, type):
statetup = (netsnmp.NETSNMP_OID_OUTPUT_UCD,
netsnmp.NETSNMP_OID_OUTPUT_NUMERIC)
numstat = netsnmp.netsnmp_ds_get_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT)
if numstat != statetup[type]:
netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
statetup[type])
result = translate(reqoid)
if numstat != statetup[type]:
netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
numstat)
return result
# Let's set the default to return numeric timeticks and quick print
set_flag(NUMERIC_TIMETICKS)
set_flag(QUICK_PRINT)
# Reverting back to a more usable default output format
netsnmp.netsnmp_ds_set_int(netsnmp.NETSNMP_DS_LIBRARY_ID,
netsnmp.NETSNMP_DS_LIB_OID_OUTPUT_FORMAT,
netsnmp.NETSNMP_OID_OUTPUT_UCD)
# The main Class allowing the SNMP interaction
class Session:
def __init__(self, peername, version=1, community='public', port=161):
session = netsnmp.snmp_session()
netsnmp.snmp_sess_init(session)
if not peername:
raise ValueError, "expecting a non null string as host identifier"
if socket.gethostbyname(peername):
session.peername = peername
self.version = version
session.version = snmpversions[version]
session.remote_port = port
if type(community) == type(''):
session.community = community
session.community_len = len(community)
else:
raise TypeError, "expecting a community of type string"
self.session = netsnmp.snmp_sess_open(session)
def __del__(self):
try:
netsnmp.snmp_sess_close(self.session)
except AttributeError:
pass
def __sprint__(self, variable, printoid):
if printoid:
value = string.split(netsnmp.py_sprint_variable(variable), None, 1)
if len(value) < 2:
value.append('')
else:
value[1] = cleanval(value[1])
value = tuple(value)
else:
value = cleanval(netsnmp.py_sprint_value(variable))
return value
def __synch_response__(self, pdu, printoid=0):
response = netsnmp.py_snmp_sess_synch_response(self.session, pdu)
if response.this != 'NULL' and response.errstat == netsnmp.SNMP_ERR_NOERROR:
if (not self.__sequence__) and (response.variables.next_variable.this == 'NULL'):
value = self.__sprint__(response.variables, printoid)
else:
value = ()
curvar = response.variables
while curvar.this != 'NULL':
value = value + (self.__sprint__(curvar, printoid),)
curvar = curvar.next_variable
netsnmp.snmp_free_pdu(response)
return value
else:
if response.this == 'NULL':
errmsg = "couldn't obtain response"
else:
errmsg = netsnmp.snmp_errstring(response.errstat)
netsnmp.snmp_free_pdu(response)
raise GetError, errmsg
def __fill_get_pdu__(self, pdu, oidlist):
if issequence(oidlist[0]):
oidlist = oidlist[0]
self.__sequence__ = 1
else:
self.__sequence__ = 0
for argoid in oidlist:
oid = get_node(argoid)
status = netsnmp.snmp_add_null_var(pdu, oid.oid, oid.len)
if not status:
raise PDUVarAddError, "couldn't add %s to the pdu" % argoid
def __fill_set_pdu__(self, pdu, pduoid, reqoid, args):
if not issequence(reqoid):
raise TypeError, "expecting a sequence"
if issequence(reqoid[0]):
oidlist = tuple(pduoid) + tuple(reqoid)
self.__sequence__ = 1
else:
oidlist = tuple(pduoid) + (reqoid,) + args
self.__sequence__ = 0
for argoid in oidlist:
oid = get_node(argoid[0])
status = netsnmp.snmp_add_var(pdu, oid.oid, oid.len, argoid[1],
argoid[2])
if status:
raise PDUVarAddError, "couldn't add %s to the pdu" % argoid[0]
def get(self, reqoid, *args):
pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GET)
self.__fill_get_pdu__(pdu, (reqoid,) + args)
return self.__synch_response__(pdu)
def getnext(self, reqoid, *args):
pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GETNEXT)
self.__fill_get_pdu__(pdu, (reqoid,) + args)
return self.__synch_response__(pdu, printoid=1)
def getbulk(self, reqoid, *args, **kw):
if self.version == 1:
raise VersionMismatch, "GETBULK requires v2c and higher"
pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_GETBULK)
try:
pdu.errstat = kw['non_repeaters']
except KeyError:
pdu.errstat = 0
try:
pdu.errindex = kw['max_repetitions']
except KeyError:
pdu.errindex = 100
self.__fill_get_pdu__(pdu, (reqoid,) + args)
return self.__synch_response__(pdu, printoid=1)
def set(self, reqoid, *args):
pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_SET)
self.__fill_set_pdu__(pdu, (), reqoid, args)
return self.__synch_response__(pdu, printoid=1)
def trap(self, uptime, trapoid, reqoid, *args):
if self.version == 1:
raise VersionMismatch, "currently only supports v2 traps"
pdu = netsnmp.snmp_pdu_create(netsnmp.SNMP_MSG_TRAP2)
pduoid = (('sysUpTime.0', 't', uptime),
('snmpTrapOID.0', 'o', translate_to(trapoid, 1)))
self.__fill_set_pdu__(pdu, pduoid, reqoid, args)
status = netsnmp.snmp_sess_send(self.session, pdu)
if not status:
errmsg = py_snmp_sess_error(self.session)
netsnmp.snmp_free_pdu(pdu)
raise SendError, errmsg
def walk(self, reqoid):
lreqoid = translate_to(reqoid, 0)
walklist = []
nextoid = lreqoid
while nextoid:
if self.version == 1:
oidtuplelist = [self.getnext(nextoid)]
else:
oidtuplelist = self.getbulk(nextoid)
for oidtuple in oidtuplelist:
nextoid = oidtuple[0]
if lreqoid == nextoid[:len(lreqoid)]:
oidtuple = (reqoid + oidtuple[0][len(lreqoid):], oidtuple[1])
walklist.append(oidtuple)
else:
nextoid = ''
break
return tuple(walklist)
def table(self, reqoid):
walktuple = self.walk(reqoid)
if not walktuple:
raise OIDTypeError, "expecting a table"
posoid = string.split(walktuple[0][0], '.').index(reqoid)
headers = []
tabledict = {}
table = []
for oidtuple in walktuple:
oidlist = string.split(oidtuple[0], '.')
if oidlist[posoid + 1][-5:] != "Entry":
raise OIDTypeError, "expecting a table"
index = string.join(oidlist[posoid + 3:], '.')
varname = oidlist[posoid + 2]
if varname not in headers:
headers.append(varname)
if not tabledict.has_key(index):
tabledict[index] = {}
tabledict[index][varname] = oidtuple[1]
indexes = tabledict.keys()
indexes.sort()
for index in indexes:
table.append(tabledict[index])
return tuple(table)
|