#
# snimpy -- Interactive SNMP tool
#
# Copyright (C) Vincent Bernat <bernat@luffy.cx>
#
# Permission to use, copy, modify, and/or distribute this software for any
# purpose with or without fee is hereby granted, provided that the above
# copyright notice and this permission notice appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
"""
This module is a low-level interface to build SNMP requests, send
them and receive answers. It is built on top of pysnmp_ but the
exposed interface is far simpler. It is also far less complete and
there is an important dependency to the :mod:`basictypes` module for
type coercing.
.. _pysnmp: http://pysnmp.sourceforge.net/
"""
import re
import socket
import inspect
import threading
import asyncio
import ipaddress
from pysnmp.hlapi.v3arch.asyncio import (
SnmpEngine, CommunityData, UsmUserData,
UdpTransportTarget, Udp6TransportTarget, ContextData,
ObjectType, ObjectIdentity,
get_cmd, set_cmd, walk_cmd, bulk_walk_cmd,
usmNoAuthProtocol, usmHMACMD5AuthProtocol, usmHMACSHAAuthProtocol,
usmHMAC128SHA224AuthProtocol, usmHMAC192SHA256AuthProtocol,
usmHMAC256SHA384AuthProtocol, usmHMAC384SHA512AuthProtocol,
usmNoPrivProtocol, usmDESPrivProtocol, usm3DESEDEPrivProtocol,
usmAesCfb128Protocol, usmAesCfb192Protocol, usmAesCfb256Protocol,
)
from pysnmp.proto import rfc1902, rfc1905
from pysnmp.smi import error
[docs]
class SNMPException(Exception):
"""SNMP related base exception. All SNMP exceptions are inherited from
this one. The inherited exceptions are named after the name of the
corresponding SNMP error.
"""
[docs]
class SNMPTooBig(SNMPException):
pass
[docs]
class SNMPNoSuchName(SNMPException):
pass
[docs]
class SNMPBadValue(SNMPException):
pass
[docs]
class SNMPReadOnly(SNMPException):
pass
# Dynamically build remaining (v2) exceptions
for name, obj in inspect.getmembers(error):
if name.endswith("Error") and \
inspect.isclass(obj) and \
issubclass(obj, error.MibOperationError) and \
obj != error.MibOperationError:
name = str("SNMP{}".format(name[:-5]))
globals()[name] = type(name, (SNMPException,), {})
del name
del obj
[docs]
class Session:
"""SNMP session. An instance of this object will represent an SNMP
session. From such an instance, one can get information from the
associated agent."""
_tls = threading.local()
def _run(self, coro):
"""Run an async coroutine synchronously using a thread-local loop."""
if not hasattr(self._tls, "loop"):
self._tls.loop = asyncio.new_event_loop()
return self._tls.loop.run_until_complete(coro)
def __init__(self, host,
community="public", version=2,
secname=None,
authprotocol=None,
authpassword=None,
privprotocol=None,
privpassword=None,
contextname=None,
bulk=40,
none=False):
"""Create a new SNMP session.
:param host: The hostname or IP address of the agent to
connect to. Optionally, the port can be specified
separated with a double colon.
:type host: str
:param community: The community to transmit to the agent for
authorization purpose. This parameter is ignored if the
specified version is 3.
:type community: str
:param version: The SNMP version to use to talk with the
agent. Possible values are `1`, `2` (community-based) or
`3`.
:type version: int
:param secname: Security name to use for SNMPv3 only.
:type secname: str
:param authprotocol: Authorization protocol to use for
SNMPv3. This can be `None` or one of the strings `SHA`,
`MD5`, `SHA224`, `SHA256`, `SHA384` or `SHA512`.
:type authprotocol: None or str
:param authpassword: Authorization password if authorization
protocol is not `None`.
:type authpassword: str
:param privprotocol: Privacy protocol to use for SNMPv3. This
can be `None` or either the string `AES`, `AES128`,
`AES192`, `AES256` or `3DES`.
:type privprotocol: None or str
:param privpassword: Privacy password if privacy protocol is
not `None`.
:type contextname: str
:param contextname: Context name for SNMPv3 messages.
:type privpassword: str
:param bulk: Max repetition value for `GETBULK` requests. Set
to `0` to disable.
:type bulk: int
:param none: When enabled, will return None for not found
values (instead of raising an exception)
:type none: bool
"""
self._host = host
self._version = version
self._none = none
if version == 3:
self._engine = SnmpEngine()
self._contextname = contextname
else:
if not hasattr(self._tls, "engine"):
self._tls.engine = SnmpEngine()
self._engine = self._tls.engine
self._contextname = None
if version == 1 and none:
raise ValueError("None-GET requests not compatible with SNMPv1")
# Put authentication stuff in self._auth
if version in [1, 2]:
self._auth = CommunityData(
community[0:30], community, version - 1)
elif version == 3:
if secname is None:
secname = community
try:
authprotocol = {
None: usmNoAuthProtocol,
"MD5": usmHMACMD5AuthProtocol,
"SHA": usmHMACSHAAuthProtocol,
"SHA1": usmHMACSHAAuthProtocol,
"SHA224": usmHMAC128SHA224AuthProtocol,
"SHA256": usmHMAC192SHA256AuthProtocol,
"SHA384": usmHMAC256SHA384AuthProtocol,
"SHA512": usmHMAC384SHA512AuthProtocol,
}[authprotocol]
except KeyError:
raise ValueError("{} is not an acceptable authentication "
"protocol".format(authprotocol))
try:
privprotocol = {
None: usmNoPrivProtocol,
"DES": usmDESPrivProtocol,
"3DES": usm3DESEDEPrivProtocol,
"AES": usmAesCfb128Protocol,
"AES128": usmAesCfb128Protocol,
"AES192": usmAesCfb192Protocol,
"AES256": usmAesCfb256Protocol,
}[privprotocol]
except KeyError:
raise ValueError("{} is not an acceptable privacy "
"protocol".format(privprotocol))
self._auth = UsmUserData(secname,
authpassword,
privpassword,
authprotocol,
privprotocol)
else:
raise ValueError("unsupported SNMP version {}".format(version))
# Put transport stuff into self._transport
mo = re.match(r'^(?:'
r'\[(?P<ipv6>[\d:A-Fa-f]+)\]|'
r'(?P<ipv4>[\d\.]+)|'
r'(?P<any>.*?))'
r'(?::(?P<port>\d+))?$',
host)
if mo.group("port"):
port = int(mo.group("port"))
else:
port = 161
if mo.group("ipv6"):
self._transport = self._run(
Udp6TransportTarget.create((mo.group("ipv6"), port)))
elif mo.group("ipv4"):
self._transport = self._run(
UdpTransportTarget.create((mo.group("ipv4"), port)))
else:
results = socket.getaddrinfo(mo.group("any"),
port,
0,
socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
# We should try to connect to each result to determine if
# the given family is available. However, we cannot do
# that over UDP. Let's implement a safe choice. If we have
# an IPv4 address, use that. If not, use IPv6. If we want
# to add an option to force IPv6, it is a good place.
if [x for x in results if x[0] == socket.AF_INET]:
self._transport = self._run(
UdpTransportTarget.create((mo.group("any"), port)))
else:
self._transport = self._run(
Udp6TransportTarget.create((mo.group("any"), port)))
# Context data
if self._contextname:
self._contextdata = ContextData(
contextName=rfc1902.OctetString(self._contextname))
else:
self._contextdata = ContextData()
# Bulk stuff
self.bulk = bulk
def _check_exception(self, value):
"""Check if the given ASN1 value is an exception"""
if isinstance(value, rfc1905.NoSuchObject):
raise SNMPNoSuchObject("No such object was found") # noqa: F821
if isinstance(value, rfc1905.NoSuchInstance):
raise SNMPNoSuchInstance("No such instance exists") # noqa: F821
if isinstance(value, rfc1905.EndOfMibView):
raise SNMPEndOfMibView("End of MIB was reached") # noqa: F821
def _check_error(self, errorIndication, errorStatus):
"""Check for SNMP protocol errors in response"""
if errorIndication:
self._check_exception(errorIndication)
raise SNMPException(str(errorIndication))
if errorStatus:
exc = str(errorStatus.prettyPrint())
exc = re.sub(r'\W+', '', exc)
exc = "SNMP{}".format(exc[0].upper() + exc[1:])
if str(exc) in globals():
raise globals()[exc]
raise SNMPException(errorStatus.prettyPrint())
def _convert(self, value):
"""Convert a PySNMP value to some native Python type"""
try:
# With PySNMP 4.3+, an OID is a ObjectIdentity. We try to
# extract it while being compatible with earlier releases.
value = value.getOid()
except AttributeError:
pass
convertors = {rfc1902.Integer: int,
rfc1902.Integer32: int,
rfc1902.OctetString: bytes,
rfc1902.IpAddress: ipaddress.IPv4Address,
rfc1902.Counter32: int,
rfc1902.Counter64: int,
rfc1902.Gauge32: int,
rfc1902.Unsigned32: int,
rfc1902.TimeTicks: int,
rfc1902.Bits: str,
rfc1902.Opaque: str,
rfc1902.univ.ObjectIdentifier: tuple}
if self._none:
convertors[rfc1905.NoSuchObject] = lambda x: None
convertors[rfc1905.NoSuchInstance] = lambda x: None
for cl, fn in convertors.items():
if isinstance(value, cl):
return fn(value)
self._check_exception(value)
raise NotImplementedError("unable to convert {}".format(repr(value)))
[docs]
def get(self, *oids):
"""Retrieve an OID value using GET.
:param oids: a list of OID to retrieve. An OID is a tuple.
:return: a list of tuples with the retrieved OID and the raw value.
"""
objecttypes = [ObjectType(ObjectIdentity(oid)) for oid in oids]
errorIndication, errorStatus, errorIndex, varBinds = self._run(
get_cmd(self._engine, self._auth, self._transport,
self._contextdata, *objecttypes, lookupMib=False))
self._check_error(errorIndication, errorStatus)
results = [(tuple(name), self._convert(val))
for name, val in varBinds]
if not results:
raise SNMPException("empty answer")
return tuple(results)
async def _walk_async(self, *oids):
"""Collect results from GETNEXT-based walk."""
results = []
for oid in oids:
walker = walk_cmd(
self._engine, self._auth, self._transport,
self._contextdata,
ObjectType(ObjectIdentity(oid)), lookupMib=False)
async for result in walker:
errorIndication, errorStatus, errorIndex, varBinds = result
self._check_error(errorIndication, errorStatus)
for name, val in varBinds:
results.append((tuple(name), val))
return results
async def _bulkwalk_async(self, bulk, *oids):
"""Collect results from GETBULK-based walk."""
results = []
for oid in oids:
walker = bulk_walk_cmd(
self._engine, self._auth, self._transport,
self._contextdata, 0, bulk,
ObjectType(ObjectIdentity(oid)), lookupMib=False)
async for result in walker:
errorIndication, errorStatus, errorIndex, varBinds = result
self._check_error(errorIndication, errorStatus)
for name, val in varBinds:
results.append((tuple(name), val))
return results
[docs]
def walkmore(self, *oids):
"""Retrieve OIDs values using GETBULK or GETNEXT. The method is called
"walk" but this is either a GETBULK or a GETNEXT. The later is
only used for SNMPv1 or if bulk has been disabled using
:meth:`bulk` property.
:param oids: a list of OID to retrieve. An OID is a tuple.
:return: a list of tuples with the retrieved OID and the raw value.
"""
if self._version == 1 or not self.bulk:
results = self._run(self._walk_async(*oids))
else:
try:
results = self._run(self._bulkwalk_async(self.bulk, *oids))
except SNMPTooBig:
# Let's try to ask for less values. We will never increase
# bulk again. We cannot increase it just after the walk
# because we may end up requesting everything twice (or
# more).
nbulk = self.bulk / 2 or False
if nbulk != self.bulk:
self.bulk = nbulk
return self.walk(*oids)
raise
return tuple([(oid, self._convert(val)) for oid, val in results])
[docs]
def walk(self, *oids):
"""Walk from given OIDs but don't return any "extra" results. Only
results in the subtree will be returned.
:param oid: OIDs used as a start point
:return: a list of tuples with the retrieved OID and the raw value.
"""
return ((noid, result)
for oid in oids
for noid, result in self.walkmore(oid)
if (len(noid) >= len(oid) and
noid[:len(oid)] == oid[:len(oid)]))
[docs]
def set(self, *args):
"""Set an OID value using SET. This function takes an odd number of
arguments. They are working by pair. The first member is an
OID and the second one is :class:`basictypes.Type` instace
whose `pack()` method will be used to transform into the
appropriate form.
:return: a list of tuples with the retrieved OID and the raw value.
"""
if len(args) % 2 != 0:
raise ValueError("expect an even number of arguments for SET")
objecttypes = [ObjectType(ObjectIdentity(oid), val.pack())
for oid, val in zip(args[0::2], args[1::2])]
errorIndication, errorStatus, errorIndex, varBinds = self._run(
set_cmd(self._engine, self._auth, self._transport,
self._contextdata, *objecttypes, lookupMib=False))
self._check_error(errorIndication, errorStatus)
results = [(tuple(name), self._convert(val))
for name, val in varBinds]
if not results:
raise SNMPException("empty answer")
return tuple(results)
def __repr__(self):
return "{}(host={},version={})".format(
self.__class__.__name__,
self._host,
self._version)
@property
def timeout(self):
"""Get timeout value for the current session.
:return: Timeout value in microseconds.
"""
return self._transport.timeout * 1000000
@timeout.setter
def timeout(self, value):
"""Set timeout value for the current session.
:param value: Timeout value in microseconds.
"""
value = int(value)
if value <= 0:
raise ValueError("timeout is a positive integer")
self._transport.timeout = value / 1000000.
@property
def retries(self):
"""Get number of times a request is retried.
:return: Number of retries for each request.
"""
return self._transport.retries
@retries.setter
def retries(self, value):
"""Set number of times a request is retried.
:param value: Number of retries for each request.
"""
value = int(value)
if value < 0:
raise ValueError("retries is a non-negative integer")
self._transport.retries = value
@property
def bulk(self):
"""Get bulk settings.
:return: `False` if bulk is disabled or a non-negative integer
for the number of repetitions.
"""
return self._bulk
@bulk.setter
def bulk(self, value):
"""Set bulk settings.
:param value: `False` to disable bulk or a non-negative
integer for the number of allowed repetitions.
"""
if value is False:
self._bulk = False
return
value = int(value)
if value <= 0:
raise ValueError("{} is not an appropriate value "
"for max repeater parameter".format(
value))
self._bulk = value