0001: /*_############################################################################
0002: _##
0003: _## SNMP4J-AgentX - AgentXSubagent.java
0004: _##
0005: _## Copyright (C) 2005-2007 Frank Fock (SNMP4J.org)
0006: _##
0007: _## This program is free software; you can redistribute it and/or modify
0008: _## it under the terms of the GNU General Public License version 2 as
0009: _## published by the Free Software Foundation.
0010: _##
0011: _## This program is distributed in the hope that it will be useful,
0012: _## but WITHOUT ANY WARRANTY; without even the implied warranty of
0013: _## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
0014: _## GNU General Public License for more details.
0015: _##
0016: _## You should have received a copy of the GNU General Public License
0017: _## along with this program; if not, write to the Free Software
0018: _## Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
0019: _## MA 02110-1301 USA
0020: _##
0021: _##########################################################################*/
0022:
0023: package org.snmp4j.agent.agentx.subagent;
0024:
0025: import java.io.IOException;
0026: import java.util.*;
0027:
0028: import org.snmp4j.PDU;
0029: import org.snmp4j.TransportMapping;
0030: import org.snmp4j.agent.*;
0031: import org.snmp4j.agent.agentx.*;
0032: import org.snmp4j.agent.agentx.event.PingEvent;
0033: import org.snmp4j.agent.agentx.event.PingListener;
0034: import org.snmp4j.agent.mo.MOScalar;
0035: import org.snmp4j.agent.mo.snmp.CoexistenceInfo;
0036: import org.snmp4j.agent.request.*;
0037: import org.snmp4j.log.LogAdapter;
0038: import org.snmp4j.log.LogFactory;
0039: import org.snmp4j.mp.SnmpConstants;
0040: import org.snmp4j.smi.*;
0041: import org.snmp4j.transport.ConnectionOrientedTransportMapping;
0042: import org.snmp4j.transport.TransportMappings;
0043: import org.snmp4j.util.ThreadPool;
0044: import org.snmp4j.agent.mo.MOTableRow;
0045: import org.snmp4j.agent.agentx.subagent.index.AnyNewIndexOID;
0046: import org.snmp4j.agent.agentx.subagent.index.NewIndexOID;
0047: import org.snmp4j.agent.mo.snmp.SysUpTime;
0048:
0049: /**
0050: * The <code>AgentXSubagent</code> class implements the AgentX communication
0051: * for an AgentX subagent implementation.
0052: *
0053: * @author Frank Fock
0054: * @version 1.0
0055: */
0056: public class AgentXSubagent implements AgentXCommandListener,
0057: NotificationOriginator {
0058:
0059: private static final LogAdapter LOGGER = LogFactory
0060: .getLogger(AgentXSubagent.class);
0061:
0062: private ArrayList moServers = new ArrayList();
0063: private ThreadPool threadPool;
0064: private RequestFactory factory;
0065: private AgentX agentX;
0066: protected Map requestList;
0067:
0068: protected Map peers = new LinkedHashMap(2);
0069: protected Map sessions = new Hashtable(2);
0070:
0071: protected RequestHandler requestHandlerGet;
0072: protected RequestHandler requestHandlerGetNext;
0073: protected RequestHandler requestHandlerGetBulk;
0074: protected RequestHandler requestHandlerTestSet;
0075: protected RequestHandler requestHandlerCommitSet;
0076: protected RequestHandler requestHandlerUndoSet;
0077: protected RequestHandler requestHandlerCleanupSet;
0078:
0079: protected int nextTransactionID = 0;
0080:
0081: private OID subagentID;
0082: private OctetString subagentDescr;
0083:
0084: private long timeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS * 1000;
0085: private byte defaultPriority = AgentXProtocol.DEFAULT_PRIORITY;
0086:
0087: private Timer pingTimer;
0088: private transient Vector pingListeners;
0089:
0090: public AgentXSubagent(AgentX agentX, OID subagentID,
0091: OctetString subagentDescr) {
0092: this .requestList = Collections.synchronizedMap(new HashMap(10));
0093: this .agentX = agentX;
0094: this .subagentID = subagentID;
0095: this .subagentDescr = subagentDescr;
0096: this .factory = new DefaultAgentXRequestFactory();
0097: requestHandlerGet = new GetRequestHandler();
0098: requestHandlerCleanupSet = new CleanupSetHandler();
0099: requestHandlerCommitSet = new CommitSetHandler();
0100: requestHandlerTestSet = new TestSetHandler();
0101: requestHandlerUndoSet = new UndoSetHandler();
0102: requestHandlerGetNext = new GetNextHandler();
0103: requestHandlerGetBulk = new GetBulkHandler();
0104: agentX.addCommandResponder(this );
0105: }
0106:
0107: /**
0108: * Sets the ping delay in seconds. If greater than zero, for each session
0109: * a ping PDU is sent to the master to validate the session regularly with
0110: * the specified delay. To monitor the ping requests, it is necessary to
0111: * add a {@link PingListener} with {@link #addPingListener}.
0112: *
0113: * @param seconds
0114: * the delay. If zero or a negative value is supplied, no pings are sent
0115: */
0116: public void setPingDelay(int seconds) {
0117: if (pingTimer != null) {
0118: pingTimer.cancel();
0119: pingTimer = null;
0120: }
0121: if (seconds > 0) {
0122: pingTimer = new Timer();
0123: pingTimer.schedule(new PingTask(), seconds * 1000,
0124: seconds * 1000);
0125: }
0126: }
0127:
0128: public void processCommand(AgentXCommandEvent event) {
0129: if (event.getCommand() != null) {
0130: event.setProcessed(true);
0131: Command command = new Command(event);
0132: if (threadPool != null) {
0133: threadPool.execute(command);
0134: } else {
0135: command.run();
0136: }
0137: }
0138: }
0139:
0140: protected synchronized int getNextTransactionID() {
0141: return nextTransactionID++;
0142: }
0143:
0144: protected synchronized int closeSession(int sessionID, byte reason)
0145: throws IOException {
0146: AgentXSession session = removeSession(sessionID);
0147: if ((session == null) || (session.isClosed())) {
0148: return AgentXProtocol.AGENTX_NOT_OPEN;
0149: }
0150: session.setClosed(true);
0151: AgentXClosePDU closePDU = new AgentXClosePDU(
0152: AgentXProtocol.REASON_SHUTDOWN);
0153: AgentXTarget target = session.createAgentXTarget();
0154: AgentXResponseEvent resp = agentX.send(closePDU, target,
0155: session.getPeer().getTransport());
0156: if (resp == null) {
0157: return AgentXProtocol.AGENTX_TIMEOUT;
0158: }
0159: return ((AgentXResponsePDU) resp.getResponse())
0160: .getErrorStatus();
0161: }
0162:
0163: protected int openSession(TransportMapping transport,
0164: Address masterAddress, AgentXSession session)
0165: throws IOException {
0166: AgentXOpenPDU openPDU = new AgentXOpenPDU(0,
0167: getNextTransactionID(), 0, session.getTimeout(),
0168: subagentID, subagentDescr);
0169: AgentXResponseEvent responseEvent = agentX.send(openPDU,
0170: session.createAgentXTarget(), transport);
0171: if (responseEvent.getResponse() == null) {
0172: LOGGER.error("Timeout on connection to master "
0173: + masterAddress);
0174: } else if (responseEvent.getResponse() instanceof AgentXResponsePDU) {
0175: AgentXResponsePDU response = responseEvent.getResponse();
0176: if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0177: session.setSessionID(response.getSessionID());
0178: }
0179: return response.getErrorStatus();
0180: } else {
0181: LOGGER
0182: .error("Received packet on open PDU is not a response AgentX PDU: "
0183: + responseEvent);
0184: }
0185: return AgentXProtocol.AGENTX_TIMEOUT;
0186: }
0187:
0188: private static int getResponseStatus(
0189: AgentXResponseEvent responseEvent) {
0190: if (responseEvent.getResponse() == null) {
0191: LOGGER.error("Timeout on connection to master "
0192: + responseEvent.getTarget());
0193: return AgentXProtocol.AGENTX_TIMEOUT;
0194: } else if (responseEvent.getResponse() instanceof AgentXResponsePDU) {
0195: AgentXResponsePDU response = responseEvent.getResponse();
0196: return response.getErrorStatus();
0197: } else {
0198: LOGGER
0199: .error("Received packet on open PDU is not a response AgentX PDU: "
0200: + responseEvent);
0201: }
0202: return AgentXProtocol.AGENTX_ERROR;
0203: }
0204:
0205: public void disconnect(Address masterAddress) throws IOException {
0206: AgentXPeer peer = (AgentXPeer) peers.remove(masterAddress);
0207: if (peer != null) {
0208: TransportMapping transport = peer.getTransport();
0209: if (transport instanceof ConnectionOrientedTransportMapping) {
0210: ((ConnectionOrientedTransportMapping) transport)
0211: .close(masterAddress);
0212: }
0213: }
0214: }
0215:
0216: public int connect(Address masterAddress, Address localAddress,
0217: AgentXSession session) throws IOException {
0218: AgentXPeer peer = (AgentXPeer) peers.get(masterAddress);
0219: TransportMapping transport;
0220: if (peer == null) {
0221: transport = addMaster(localAddress);
0222: peer = new AgentXPeer(transport, masterAddress);
0223: } else {
0224: transport = peer.getTransport();
0225: }
0226: peer.setTimeout(session.getTimeout());
0227: session.setPeer(peer);
0228: int status = AgentXProtocol.AGENTX_TIMEOUT;
0229: try {
0230: status = openSession(transport, masterAddress, session);
0231: if (status != AgentXProtocol.AGENTX_TIMEOUT) {
0232: peers.put(masterAddress, peer);
0233: LOGGER.info("Added new peer address=" + masterAddress
0234: + ",peer=" + peer);
0235: }
0236: } catch (IOException ex) {
0237: LOGGER.error(ex);
0238: removeMaster(transport);
0239: return AgentXProtocol.AGENTX_ERROR;
0240: }
0241: if (status == AgentXProtocol.AGENTX_SUCCESS) {
0242: sessions.put(new Integer(session.getSessionID()), session);
0243: LOGGER.info("Opened subagent session successfully: "
0244: + session);
0245: } else {
0246: removeMaster(transport);
0247: }
0248: return status;
0249: }
0250:
0251: public int close(AgentXSession session, byte reason)
0252: throws IOException {
0253: return closeSession(session.getSessionID(), reason);
0254: }
0255:
0256: private synchronized AgentXSession getSession(int sessionID) {
0257: return (AgentXSession) sessions.get(new Integer(sessionID));
0258: }
0259:
0260: private synchronized AgentXSession removeSession(int sessionID) {
0261: return (AgentXSession) sessions.remove(new Integer(sessionID));
0262: }
0263:
0264: public void setDefaultPriority(byte priority) {
0265: this .defaultPriority = priority;
0266: }
0267:
0268: public byte getDefaultPriority() {
0269: return defaultPriority;
0270: }
0271:
0272: /**
0273: * Gets the priority with which the supplied managed object and
0274: * region should be registered at the master agent. Overwrite
0275: * this method to use individual priorites depending on the registered
0276: * region/managed object. The default implementation returns
0277: * {@link #getDefaultPriority()}.
0278: *
0279: * @param mo ManagedObject
0280: * a managed object instance that manages <code>region</code>.
0281: * @param region
0282: * the region to be registered.
0283: * @return
0284: * the priority between 0 and 255 (lower value results in higher priority).
0285: */
0286: protected byte getPriority(ManagedObject mo, AgentXRegion region) {
0287: return defaultPriority;
0288: }
0289:
0290: /**
0291: * Registers the subagent regions at the master agent.
0292: * @param session
0293: * the session on whose behalf regions are registered.
0294: * @param context
0295: * the context to use for registration.
0296: * @return
0297: * a List of the managed objects which failed to register.
0298: */
0299: public List registerRegions(AgentXSession session,
0300: OctetString context) {
0301: return registerRegions(session, context, null);
0302: }
0303:
0304: /**
0305: * Registers the subagent regions at the master agent.
0306: * @param session
0307: * the session on whose behalf regions are registered.
0308: * @param context
0309: * the context to use for registration.
0310: * @param sysUpTime
0311: * if not <code>null</code>, the master agent's notion of the sysUpTime
0312: * for the registered context is returned. The input value is always
0313: * ignored!
0314: * @return
0315: * a List of the managed objects which failed to register.
0316: */
0317: public List registerRegions(AgentXSession session,
0318: OctetString context, TimeTicks sysUpTime) {
0319: LinkedList failures = new LinkedList();
0320: MOServer server = getServer(context);
0321: if (server == null) {
0322: LOGGER.warn("No MOServer found for context '" + context
0323: + "'");
0324: return null;
0325: }
0326: for (Iterator it = server.iterator(); it.hasNext();) {
0327: ManagedObject mo = (ManagedObject) it.next();
0328: if (mo instanceof AgentXSharedMOTable) {
0329: List failedRows = registerSharedTableRows(session,
0330: context, (AgentXSharedMOTable) mo);
0331: failures.addAll(failedRows);
0332: } else {
0333: MOScope scope = mo.getScope();
0334: AgentXRegion region = new AgentXRegion(scope
0335: .getLowerBound(), scope.getUpperBound());
0336: if (mo instanceof MOScalar) {
0337: region.setSingleOID(true);
0338: }
0339: region.setUpperIncluded(scope.isUpperIncluded());
0340: try {
0341: int status = registerRegion(session, context,
0342: region, getPriority(mo, region), sysUpTime);
0343: if (status != AgentXProtocol.AGENTX_SUCCESS) {
0344: failures.add(mo);
0345: if (LOGGER.isWarnEnabled()) {
0346: LOGGER.warn("Failed to registered MO "
0347: + scope + " with status = "
0348: + status);
0349: }
0350: } else {
0351: if (LOGGER.isInfoEnabled()) {
0352: LOGGER.info("Registered MO " + scope
0353: + " successfully");
0354: }
0355: }
0356:
0357: } catch (IOException ex) {
0358: LOGGER.warn("Failed to register " + mo
0359: + " in context '" + context
0360: + "' of session " + session);
0361: failures.add(mo);
0362: }
0363: }
0364: }
0365: return failures;
0366: }
0367:
0368: /**
0369: * Registers the indexes and (row) regions of a shared table. This method
0370: * is called on behalf of {@link #registerRegions(AgentXSession session,
0371: * OctetString context, TimeTicks sysUpTime)} and
0372: * {@link #registerRegions(AgentXSession session, OctetString context)}.
0373: *
0374: * @param session
0375: * the session on whose behalf regions are registered.
0376: * @param context
0377: * the context to use for registration.
0378: * @param mo
0379: * the <code>AgentXSharedMOTable</code> instance to register.
0380: * @return List
0381: * a list of failed {@link MOTableRow} instances.
0382: */
0383: public List registerSharedTableRows(AgentXSession session,
0384: OctetString context, AgentXSharedMOTable mo) {
0385: LinkedList failedRows = new LinkedList();
0386: AgentXSharedMOTableSupport sharedTableSupport = new AgentXSharedMOTableSupport(
0387: agentX, session, context);
0388: synchronized (mo) {
0389: if (mo instanceof AgentXSharedMutableMOTable) {
0390: ((AgentXSharedMutableMOTable) mo)
0391: .setAgentXSharedMOTableSupport(sharedTableSupport);
0392: }
0393: for (Iterator it = mo.getModel().iterator(); it.hasNext();) {
0394: MOTableRow row = (MOTableRow) it.next();
0395: OID newIndex = (OID) row.getIndex().clone();
0396: int status = sharedTableSupport
0397: .allocateIndex(
0398: context,
0399: mo.getIndexDef(),
0400: (byte) AgentXSharedMOTableSupport.INDEX_MODE_ALLOCATE,
0401: newIndex);
0402: if (status == AgentXProtocol.AGENTX_SUCCESS) {
0403: if ((newIndex instanceof AnyNewIndexOID)
0404: || (newIndex instanceof NewIndexOID)) {
0405: if (mo instanceof AgentXSharedMutableMOTable) {
0406: ((AgentXSharedMutableMOTable) mo)
0407: .changeRowIndex(newIndex, row
0408: .getIndex());
0409: }
0410: }
0411: status = sharedTableSupport.registerRow(mo, row);
0412: if (status != AgentXProtocol.AGENTX_SUCCESS) {
0413: sharedTableSupport.deallocateIndex(context, mo
0414: .getIndexDef(), row.getIndex());
0415: LOGGER.warn("Failed to register row with "
0416: + status + " for " + row);
0417: failedRows.add(row);
0418: }
0419: } else {
0420: LOGGER.warn("Failed to allocate index with "
0421: + status + " for row " + row);
0422: failedRows.add(row);
0423: }
0424: }
0425: }
0426: return failedRows;
0427: }
0428:
0429: protected int registerRegion(AgentXSession session,
0430: OctetString context, AgentXRegion region, byte priority,
0431: TimeTicks sysUpTime) throws IOException {
0432: if ((session == null) || (session.isClosed())) {
0433: return AgentXProtocol.AGENTX_NOT_OPEN;
0434: }
0435: long t = (this .timeout == 0) ? session.getTimeout() * 1000
0436: : this .timeout;
0437: AgentXRegisterPDU pdu = new AgentXRegisterPDU(context, region
0438: .getLowerBound(), priority, region.getRangeSubID(),
0439: region.getUpperBoundSubID());
0440: pdu.setSessionAttributes(session);
0441: AgentXResponseEvent event = agentX.send(pdu, new AgentXTarget(
0442: session.getPeer().getAddress(), t), session.getPeer()
0443: .getTransport());
0444: if ((sysUpTime != null) && (event.getResponse() != null)) {
0445: sysUpTime
0446: .setValue(event.getResponse().getSysUpTime() & 0xFFFFFFFFL);
0447: }
0448: return getResponseStatus(event);
0449: }
0450:
0451: protected int unregisterRegion(AgentXSession session,
0452: OctetString context, AgentXRegion region, byte timeout)
0453: throws IOException {
0454: if ((session == null) || (session.isClosed())) {
0455: return AgentXProtocol.AGENTX_NOT_OPEN;
0456: }
0457: byte t = (timeout == 0) ? session.getTimeout() : timeout;
0458: AgentXUnregisterPDU pdu = new AgentXUnregisterPDU(context,
0459: region.getLowerBound(), t, region.getRangeSubID(),
0460: region.getUpperBoundSubID());
0461: pdu.setSessionAttributes(session);
0462: AgentXResponseEvent event = agentX.send(pdu, new AgentXTarget(
0463: session.getPeer().getAddress(), this .timeout), session
0464: .getPeer().getTransport());
0465: return getResponseStatus(event);
0466: }
0467:
0468: protected TransportMapping addMaster(Address localAddress)
0469: throws IOException {
0470: /*
0471: if (transport != null) {
0472: try {
0473: transport.close();
0474: }
0475: catch (IOException ex) {
0476: logger.error(ex);
0477: }
0478: agentX.removeTransportMapping(transport);
0479: }
0480: */
0481: TransportMapping transport = TransportMappings.getInstance()
0482: .createTransportMapping(localAddress);
0483: if (transport instanceof ConnectionOrientedTransportMapping) {
0484: ConnectionOrientedTransportMapping tcpTransport = (ConnectionOrientedTransportMapping) transport;
0485: tcpTransport.setConnectionTimeout(0);
0486: tcpTransport.setMessageLengthDecoder(new AgentXProtocol());
0487: }
0488: agentX.addTransportMapping(transport);
0489: transport.listen();
0490: return transport;
0491: }
0492:
0493: protected void removeMaster(TransportMapping transport) {
0494: agentX.removeTransportMapping(transport);
0495: try {
0496: transport.close();
0497: } catch (IOException ex) {
0498: LOGGER.warn("Closing transport mapping " + transport
0499: + " failed with: " + ex.getMessage());
0500: }
0501: }
0502:
0503: public synchronized void addMOServer(MOServer server) {
0504: moServers.add(server);
0505: }
0506:
0507: public synchronized void removeMOServer(MOServer server) {
0508: moServers.remove(server);
0509: }
0510:
0511: public synchronized MOServer getServer(OctetString context) {
0512: for (int i = 0; i < moServers.size(); i++) {
0513: MOServer s = (MOServer) moServers.get(i);
0514: if (s.isContextSupported(context)) {
0515: return s;
0516: }
0517: }
0518: return null;
0519: }
0520:
0521: public synchronized Collection getContexts() {
0522: LinkedList allContexts = new LinkedList();
0523: for (int i = 0; i < moServers.size(); i++) {
0524: MOServer s = (MOServer) moServers.get(i);
0525: OctetString[] contexts = s.getContexts();
0526: allContexts.addAll(Arrays.asList(contexts));
0527: }
0528: return allContexts;
0529: }
0530:
0531: public ThreadPool getThreadPool() {
0532: return threadPool;
0533: }
0534:
0535: public void setThreadPool(ThreadPool threadPool) {
0536: this .threadPool = threadPool;
0537: }
0538:
0539: public void dispatchCommand(AgentXCommandEvent cmd) {
0540: boolean pendingSessionClose = false;
0541: if (cmd.getCommand().isConfirmedPDU()) {
0542: AgentXRequest request = null;
0543: MOServer server = null;
0544: switch (cmd.getCommand().getType()) {
0545: case AgentXPDU.AGENTX_GET_PDU: {
0546: request = (AgentXRequest) factory.createRequest(cmd,
0547: null);
0548: server = getServer(request.getContext());
0549: requestHandlerGet.processPdu(request, server);
0550: break;
0551: }
0552: case AgentXPDU.AGENTX_GETNEXT_PDU: {
0553: request = (AgentXRequest) factory.createRequest(cmd,
0554: null);
0555: server = getServer(request.getContext());
0556: requestHandlerGetNext.processPdu(request, server);
0557: break;
0558: }
0559: case AgentXPDU.AGENTX_GETBULK_PDU: {
0560: request = (AgentXRequest) factory.createRequest(cmd,
0561: null);
0562: server = getServer(request.getContext());
0563: requestHandlerGetBulk.processPdu(request, server);
0564: break;
0565: }
0566: case AgentXPDU.AGENTX_TESTSET_PDU: {
0567: request = (AgentXRequest) factory.createRequest(cmd,
0568: null);
0569: request.setPhase(Request.PHASE_2PC_PREPARE);
0570: server = getServer(request.getContext());
0571: requestHandlerTestSet.processPdu(request, server);
0572: requestList.put(createRequestID(cmd), request);
0573: break;
0574: }
0575: case AgentXPDU.AGENTX_COMMITSET_PDU:
0576: case AgentXPDU.AGENTX_UNDOSET_PDU:
0577: case AgentXPDU.AGENTX_CLEANUPSET_PDU: {
0578: RequestID reqID = createRequestID(cmd);
0579: request = (AgentXRequest) requestList.get(reqID);
0580: if (request == null) {
0581: LOGGER.error("Request with ID " + reqID
0582: + " not found in request list");
0583: request = new AgentXRequest(cmd);
0584: request
0585: .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0586: break;
0587: }
0588: server = getServer(request.getContext());
0589: switch (cmd.getCommand().getType()) {
0590: case AgentXPDU.AGENTX_COMMITSET_PDU:
0591: request.setPhase(Request.PHASE_2PC_COMMIT);
0592: requestHandlerCommitSet.processPdu(request, server);
0593: break;
0594: case AgentXPDU.AGENTX_UNDOSET_PDU:
0595: request.setPhase(Request.PHASE_2PC_UNDO);
0596: requestHandlerUndoSet.processPdu(request, server);
0597: break;
0598: case AgentXPDU.AGENTX_CLEANUPSET_PDU:
0599: request.setPhase(Request.PHASE_2PC_CLEANUP);
0600: requestHandlerCleanupSet
0601: .processPdu(request, server);
0602: break;
0603: default: {
0604: LOGGER.fatal("Internal error");
0605: }
0606: }
0607: if (cmd.getCommand().getType() != AgentXPDU.AGENTX_COMMITSET_PDU) {
0608: // remove request from request list
0609: requestList.remove(reqID);
0610: }
0611: break;
0612: }
0613: case AgentXPDU.AGENTX_CLOSE_PDU: {
0614: AgentXSession session = removeSession(cmd.getCommand()
0615: .getSessionID());
0616: if (session != null) {
0617: session.setClosed(true);
0618: pendingSessionClose = true;
0619: }
0620: break;
0621: }
0622: default: {
0623: LOGGER.error("Unhandled PDU type: " + cmd.getCommand());
0624: request = new AgentXRequest(cmd);
0625: request
0626: .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0627: }
0628: }
0629: if (request != null) {
0630: // Since this is an AgentX subagent it only processes a single phase at
0631: // once.
0632: if (request.isPhaseComplete()) {
0633: // send response
0634: sendResponse(cmd, request);
0635: }
0636: if (server != null) {
0637: release(server, request);
0638: }
0639: }
0640: if (pendingSessionClose) {
0641: try {
0642: disconnect(cmd.getPeerAddress());
0643: } catch (IOException ex) {
0644: LOGGER.error("Failed to disconnect from master at "
0645: + cmd.getPeerAddress() + ": "
0646: + ex.getMessage(), ex);
0647: }
0648: }
0649: } else {
0650: processResponse(cmd);
0651: }
0652: }
0653:
0654: protected void sendResponse(AgentXCommandEvent cmd,
0655: AgentXRequest request) {
0656: AgentXMessageDispatcher dispatcher = cmd.getDispatcher();
0657: AgentXResponsePDU response = request.getResponsePDU();
0658: if (response != null) {
0659: AgentXPDU rpdu = cmd.getCommand();
0660: response.setSessionID(rpdu.getSessionID());
0661: response.setTransactionID(rpdu.getTransactionID());
0662: response.setByteOrder(rpdu.getByteOrder());
0663: response.setPacketID(rpdu.getPacketID());
0664: // only send a response if required
0665: try {
0666: dispatcher.send(cmd.getPeerTransport(), cmd
0667: .getPeerAddress(), response, null);
0668: } catch (IOException ex) {
0669: LOGGER.warn("Failed to send AgentX response to '"
0670: + cmd.getPeerAddress() + "' with error: "
0671: + ex.getMessage());
0672: }
0673: }
0674: }
0675:
0676: protected void release(MOServer server, Request req) {
0677: for (Iterator it = req.iterator(); it.hasNext();) {
0678: SubRequest sreq = (SubRequest) it.next();
0679: if (sreq.getTargetMO() != null) {
0680: server.unlock(req, sreq.getTargetMO());
0681: }
0682: }
0683: }
0684:
0685: private static RequestID createRequestID(AgentXCommandEvent cmd) {
0686: return new RequestID(cmd.getPeerAddress(), cmd.getCommand()
0687: .getSessionID(), cmd.getCommand().getTransactionID());
0688:
0689: }
0690:
0691: protected void processResponse(AgentXCommandEvent cmd) {
0692: if (LOGGER.isDebugEnabled()) {
0693: LOGGER.debug("Received response " + cmd);
0694: }
0695: }
0696:
0697: protected void processNextSubRequest(Request request,
0698: MOServer server, OctetString context, SubRequest sreq)
0699: throws NoSuchElementException {
0700: // We can be sure to have a default context scope here because
0701: // the inner class AgentXSubRequest creates it!
0702: DefaultMOContextScope scope = (DefaultMOContextScope) sreq
0703: .getScope();
0704: MOQuery query = sreq.getQuery();
0705: if (query == null) {
0706: query = new DefaultMOQuery(scope, false, request);
0707: }
0708: while (!sreq.getStatus().isProcessed()) {
0709: ManagedObject mo = server.lookup(query);
0710: if (mo == null) {
0711: if (LOGGER.isDebugEnabled()) {
0712: LOGGER.debug("EndOfMibView at scope="
0713: + query.getScope() + " and query " + query);
0714: }
0715: sreq.getVariableBinding()
0716: .setVariable(Null.endOfMibView);
0717: sreq.getStatus().setPhaseComplete(true);
0718: break;
0719: }
0720: try {
0721: if (!mo.next(sreq)) {
0722: // We can be sure to have a default context scope here because
0723: // the inner class SnmpSubRequest creates it!
0724: // don't forget to update query:
0725: sreq.getVariableBinding()
0726: .setVariable(Null.instance);
0727: scope.substractScope(mo.getScope());
0728: // query is updated automatically because scope is updated.
0729: query.substractScope(mo.getScope());
0730: }
0731: } catch (Exception moex) {
0732: if (LOGGER.isDebugEnabled()) {
0733: moex.printStackTrace();
0734: }
0735: LOGGER.warn(moex);
0736: if (sreq.getStatus().getErrorStatus() == PDU.noError) {
0737: sreq.getStatus().setErrorStatus(PDU.genErr);
0738: }
0739: }
0740: }
0741: }
0742:
0743: /**
0744: * Sends notifications (traps) to all appropriate notification targets
0745: * through the master agent.
0746: *
0747: * @param context the context name of the context on whose behalf this
0748: * notification has been generated.
0749: * @param notificationID the object ID that uniquely identifies this
0750: * notification. For SNMPv1 traps, the notification ID has to be build
0751: * using the rules provided by RFC 2576.
0752: * @param vbs an array of <code>VariableBinding</code> instances
0753: * representing the payload of the notification.
0754: * @return
0755: * an {@link AgentXResponseEvent} instance or <code>null</code> if the
0756: * notification request timed out.
0757: */
0758: public Object notify(OctetString context, OID notificationID,
0759: VariableBinding[] vbs) {
0760: return notify(context, notificationID, null, vbs);
0761: }
0762:
0763: public Object notify(OctetString context, OID notificationID,
0764: TimeTicks sysUpTime, VariableBinding[] vbs) {
0765: AgentXSession session = firstSession();
0766: AgentXResponseEvent agentXResponse = null;
0767: try {
0768: agentXResponse = notify(session, context, notificationID,
0769: sysUpTime, vbs);
0770: if ((agentXResponse == null)
0771: || (agentXResponse.getResponse() == null)) {
0772: LOGGER
0773: .warn("Timeout on sending notification in context '"
0774: + context
0775: + "' with ID '"
0776: + notificationID
0777: + "' and payload "
0778: + Arrays.asList(vbs));
0779: return null;
0780: }
0781: return agentXResponse;
0782: } catch (IOException ex) {
0783: LOGGER.error("Failed to send notification in context '"
0784: + context + "' with ID '" + notificationID
0785: + "' and payload " + Arrays.asList(vbs)
0786: + ", reason is: " + ex.getMessage());
0787: return null;
0788: }
0789: }
0790:
0791: /**
0792: * Returns the first session that have been opened by this subagent and is
0793: * still open. If no open session exists, <code>null</code> is returned.
0794: *
0795: * @return
0796: * an <code>AgentXSession</code>.
0797: */
0798: public synchronized final AgentXSession firstSession() {
0799: if (sessions.size() > 0) {
0800: return (AgentXSession) sessions.values().iterator().next();
0801: }
0802: return null;
0803: }
0804:
0805: public AgentXResponseEvent notify(AgentXSession session,
0806: OctetString context, OID notificationID,
0807: TimeTicks sysUpTime, VariableBinding[] vbs)
0808: throws IOException {
0809: int offset = 1;
0810: if (sysUpTime != null) {
0811: offset = 2;
0812: }
0813: VariableBinding[] notifyVBs = new VariableBinding[vbs.length
0814: + offset];
0815: if (sysUpTime != null) {
0816: notifyVBs[0] = new VariableBinding(SnmpConstants.sysUpTime,
0817: sysUpTime);
0818: }
0819: notifyVBs[offset - 1] = new VariableBinding(
0820: SnmpConstants.snmpTrapOID, notificationID);
0821: System.arraycopy(vbs, 0, notifyVBs, offset, vbs.length);
0822: AgentXNotifyPDU notifyPDU = new AgentXNotifyPDU(context,
0823: notifyVBs);
0824: notifyPDU.setSessionAttributes(session);
0825: notifyPDU.setTransactionID(getNextTransactionID());
0826: AgentXResponseEvent response = agentX
0827: .send(notifyPDU, session.createAgentXTarget(), session
0828: .getPeer().getTransport());
0829: return response;
0830: }
0831:
0832: public int addAgentCaps(AgentXSession session, OctetString context,
0833: OID id, OctetString descr) {
0834: AgentXAddAgentCapsPDU pdu = new AgentXAddAgentCapsPDU(context,
0835: id, descr);
0836: pdu.setSessionAttributes(session);
0837: try {
0838: AgentXResponseEvent resp = agentX.send(pdu, session
0839: .createAgentXTarget(), session.getPeer()
0840: .getTransport());
0841: if (resp.getResponse() == null) {
0842: return AgentXProtocol.AGENTX_TIMEOUT;
0843: }
0844: return resp.getResponse().getErrorStatus();
0845: } catch (IOException ex) {
0846: LOGGER.error("Failed to send AgentX AddAgentCaps PDU "
0847: + pdu + " because: " + ex.getMessage(), ex);
0848: return AgentXProtocol.AGENTX_NOT_OPEN;
0849: }
0850: }
0851:
0852: public int removeAgentCaps(AgentXSession session,
0853: OctetString context, OID id) {
0854: AgentXRemoveAgentCapsPDU pdu = new AgentXRemoveAgentCapsPDU(
0855: context, id);
0856: pdu.setSessionAttributes(session);
0857: try {
0858: AgentXResponseEvent resp = agentX.send(pdu, session
0859: .createAgentXTarget(), session.getPeer()
0860: .getTransport());
0861: return resp.getResponse().getErrorStatus();
0862: } catch (IOException ex) {
0863: LOGGER.error("Failed to send AgentX RemoveAgentCaps PDU "
0864: + pdu + " because: " + ex.getMessage(), ex);
0865: return AgentXProtocol.AGENTX_NOT_OPEN;
0866: }
0867: }
0868:
0869: public void addPingListener(PingListener l) {
0870: if (pingListeners == null) {
0871: pingListeners = new Vector();
0872: }
0873: pingListeners.add(l);
0874: }
0875:
0876: public void removePingListener(PingListener l) {
0877: if (pingListeners != null) {
0878: synchronized (pingListeners) {
0879: pingListeners.remove(l);
0880: }
0881: }
0882: }
0883:
0884: protected void firePinged(PingEvent event) {
0885: if (pingListeners != null) {
0886: synchronized (pingListeners) {
0887: Vector listeners = pingListeners;
0888: int count = listeners.size();
0889: for (int i = 0; i < count; i++) {
0890: ((PingListener) listeners.elementAt(i))
0891: .pinged(event);
0892: }
0893: }
0894: }
0895: }
0896:
0897: private static void initRequestPhase(Request request) {
0898: if (request.getPhase() == Request.PHASE_INIT) {
0899: request.nextPhase();
0900: }
0901: }
0902:
0903: static class GetRequestHandler implements RequestHandler {
0904:
0905: public boolean isSupported(int pduType) {
0906: return pduType == AgentXPDU.AGENTX_GET_PDU;
0907: }
0908:
0909: public void processPdu(Request request, MOServer server) {
0910: initRequestPhase(request);
0911: try {
0912: SubRequestIterator it = (SubRequestIterator) request
0913: .iterator();
0914: while (it.hasNext()) {
0915: SubRequest sreq = it.nextSubRequest();
0916: DefaultMOQuery query = new DefaultMOQuery(
0917: (MOContextScope) sreq.getScope(), false,
0918: request);
0919: ManagedObject mo = server.lookup(query);
0920: if (mo == null) {
0921: sreq.getVariableBinding().setVariable(
0922: Null.noSuchObject);
0923: sreq.getStatus().setPhaseComplete(true);
0924: continue;
0925: }
0926: try {
0927: mo.get(sreq);
0928: } catch (Exception moex) {
0929: if (LOGGER.isDebugEnabled()) {
0930: moex.printStackTrace();
0931: }
0932: LOGGER.warn(moex);
0933: if (sreq.getStatus().getErrorStatus() == PDU.noError) {
0934: sreq.getStatus().setErrorStatus(PDU.genErr);
0935: }
0936: }
0937: }
0938: } catch (NoSuchElementException nsex) {
0939: if (LOGGER.isDebugEnabled()) {
0940: nsex.printStackTrace();
0941: }
0942: LOGGER.error("SubRequest not found");
0943: request.setErrorStatus(PDU.genErr);
0944: }
0945: }
0946: }
0947:
0948: class GetNextHandler implements RequestHandler {
0949:
0950: public void processPdu(Request request, MOServer server) {
0951: initRequestPhase(request);
0952: OctetString context = request.getContext();
0953: try {
0954: SubRequestIterator it = (SubRequestIterator) request
0955: .iterator();
0956: while (it.hasNext()) {
0957: SubRequest sreq = it.nextSubRequest();
0958: processNextSubRequest(request, server, context,
0959: sreq);
0960: }
0961: } catch (NoSuchElementException nsex) {
0962: if (LOGGER.isDebugEnabled()) {
0963: nsex.printStackTrace();
0964: }
0965: LOGGER.error("SubRequest not found");
0966: request.setErrorStatus(PDU.genErr);
0967: }
0968: }
0969:
0970: public boolean isSupported(int pduType) {
0971: return (pduType == PDU.GETNEXT);
0972: }
0973:
0974: }
0975:
0976: class GetBulkHandler implements RequestHandler {
0977:
0978: public void processPdu(Request request, MOServer server) {
0979: initRequestPhase(request);
0980: OctetString context = request.getContext();
0981: AgentXRequest req = (AgentXRequest) request;
0982: int nonRep = req.getNonRepeaters();
0983: try {
0984: SubRequestIterator it = (SubRequestIterator) request
0985: .iterator();
0986: int i = 0;
0987: // non repeaters
0988: for (; ((i < nonRep) && it.hasNext()); i++) {
0989: SubRequest sreq = it.nextSubRequest();
0990: processNextSubRequest(request, server, context,
0991: sreq);
0992: }
0993: // repetitions
0994: for (; it.hasNext(); i++) {
0995: SubRequest sreq = it.nextSubRequest();
0996: processNextSubRequest(request, server, context,
0997: sreq);
0998: }
0999: } catch (NoSuchElementException nsex) {
1000: if (LOGGER.isDebugEnabled()) {
1001: nsex.printStackTrace();
1002: }
1003: LOGGER.error("SubRequest not found");
1004: request.setErrorStatus(PDU.genErr);
1005: }
1006:
1007: }
1008:
1009: public boolean isSupported(int pduType) {
1010: return (pduType == PDU.GETBULK);
1011: }
1012:
1013: }
1014:
1015: static class TestSetHandler implements RequestHandler {
1016:
1017: public void processPdu(Request request, MOServer server) {
1018: try {
1019: SubRequestIterator it = (SubRequestIterator) request
1020: .iterator();
1021: while ((!request.isPhaseComplete()) && (it.hasNext())) {
1022: SubRequest sreq = it.nextSubRequest();
1023: if (sreq.isComplete()) {
1024: continue;
1025: }
1026: DefaultMOQuery query = new DefaultMOQuery(
1027: (MOContextScope) sreq.getScope(), false,
1028: request);
1029: ManagedObject mo = server.lookup(query);
1030: if (mo == null) {
1031: sreq.getStatus().setErrorStatus(PDU.noSuchName);
1032: break;
1033: }
1034: sreq.setTargetMO(mo);
1035: server.lock(sreq.getRequest(), mo);
1036: try {
1037: mo.prepare(sreq);
1038: sreq.getStatus().setPhaseComplete(true);
1039: } catch (Exception moex) {
1040: moex.printStackTrace();
1041: if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1042: sreq.getStatus().setErrorStatus(PDU.genErr);
1043: }
1044: /**@todo implement error handling */
1045: }
1046: }
1047: } catch (NoSuchElementException nsex) {
1048: if (LOGGER.isDebugEnabled()) {
1049: nsex.printStackTrace();
1050: }
1051: LOGGER.error("Cannot find sub-request: ", nsex);
1052: request.setErrorStatus(PDU.genErr);
1053: }
1054: }
1055:
1056: public boolean isSupported(int pduType) {
1057: return (pduType == AgentXPDU.AGENTX_TESTSET_PDU);
1058: }
1059: }
1060:
1061: class UndoSetHandler implements RequestHandler {
1062:
1063: public void processPdu(Request request, MOServer server) {
1064: try {
1065: SubRequestIterator it = (SubRequestIterator) request
1066: .iterator();
1067: while (it.hasNext()) {
1068: SubRequest sreq = it.nextSubRequest();
1069: if (sreq.isComplete()) {
1070: continue;
1071: }
1072: ManagedObject mo = sreq.getTargetMO();
1073: if (mo == null) {
1074: DefaultMOQuery query = new DefaultMOQuery(
1075: (MOContextScope) sreq.getScope(), true);
1076: mo = server.lookup(query);
1077: }
1078: if (mo == null) {
1079: sreq.getStatus().setErrorStatus(PDU.undoFailed);
1080: continue;
1081: }
1082: try {
1083: mo.undo(sreq);
1084: sreq.getStatus().setPhaseComplete(true);
1085: } catch (Exception moex) {
1086: if (LOGGER.isDebugEnabled()) {
1087: moex.printStackTrace();
1088: }
1089: LOGGER.error(moex);
1090: if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1091: sreq.getStatus().setErrorStatus(
1092: PDU.undoFailed);
1093: }
1094: }
1095: }
1096: } catch (NoSuchElementException nsex) {
1097: if (LOGGER.isDebugEnabled()) {
1098: nsex.printStackTrace();
1099: }
1100: LOGGER.error("Cannot find sub-request: ", nsex);
1101: request.setErrorStatus(PDU.genErr);
1102: }
1103: }
1104:
1105: public boolean isSupported(int pduType) {
1106: return (pduType == AgentXPDU.AGENTX_UNDOSET_PDU);
1107: }
1108: }
1109:
1110: class CommitSetHandler implements RequestHandler {
1111:
1112: public void processPdu(Request request, MOServer server) {
1113: try {
1114: SubRequestIterator it = (SubRequestIterator) request
1115: .iterator();
1116: while ((!request.isPhaseComplete()) && (it.hasNext())) {
1117: SubRequest sreq = it.nextSubRequest();
1118: if (sreq.isComplete()) {
1119: continue;
1120: }
1121: ManagedObject mo = sreq.getTargetMO();
1122: if (mo == null) {
1123: DefaultMOQuery query = new DefaultMOQuery(
1124: (MOContextScope) sreq.getScope(), true);
1125: mo = server.lookup(query);
1126: }
1127: if (mo == null) {
1128: sreq.getStatus().setErrorStatus(
1129: PDU.commitFailed);
1130: continue;
1131: }
1132: try {
1133: mo.commit(sreq);
1134: sreq.getStatus().setPhaseComplete(true);
1135: } catch (Exception moex) {
1136: if (LOGGER.isDebugEnabled()) {
1137: moex.printStackTrace();
1138: }
1139: LOGGER.error(moex);
1140: if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1141: sreq.getStatus().setErrorStatus(
1142: PDU.commitFailed);
1143: }
1144: }
1145: }
1146: } catch (NoSuchElementException nsex) {
1147: if (LOGGER.isDebugEnabled()) {
1148: nsex.printStackTrace();
1149: }
1150: LOGGER.error("Cannot find sub-request: ", nsex);
1151: request.setErrorStatus(PDU.genErr);
1152: }
1153: }
1154:
1155: public boolean isSupported(int pduType) {
1156: return (pduType == AgentXPDU.AGENTX_COMMITSET_PDU);
1157: }
1158:
1159: }
1160:
1161: class CleanupSetHandler implements RequestHandler {
1162:
1163: public void processPdu(Request request, MOServer server) {
1164: try {
1165: SubRequestIterator it = (SubRequestIterator) request
1166: .iterator();
1167: while (it.hasNext()) {
1168: SubRequest sreq = it.nextSubRequest();
1169: if (sreq.isComplete()) {
1170: continue;
1171: }
1172: ManagedObject mo = sreq.getTargetMO();
1173: if (mo == null) {
1174: DefaultMOQuery query = new DefaultMOQuery(
1175: (MOContextScope) sreq.getScope(), false);
1176: mo = server.lookup(query);
1177: }
1178: if (mo == null) {
1179: sreq.completed();
1180: continue;
1181: }
1182: server.unlock(sreq.getRequest(), mo);
1183: try {
1184: mo.cleanup(sreq);
1185: sreq.getStatus().setPhaseComplete(true);
1186: } catch (Exception moex) {
1187: if (LOGGER.isDebugEnabled()) {
1188: moex.printStackTrace();
1189: }
1190: LOGGER.error(moex);
1191: }
1192: }
1193: } catch (NoSuchElementException nsex) {
1194: if (LOGGER.isDebugEnabled()) {
1195: nsex.printStackTrace();
1196: }
1197: LOGGER.warn("Cannot find sub-request: "
1198: + nsex.getMessage());
1199: }
1200: }
1201:
1202: public boolean isSupported(int pduType) {
1203: return (pduType == AgentXPDU.AGENTX_CLEANUPSET_PDU);
1204: }
1205:
1206: }
1207:
1208: static class DefaultAgentXRequestFactory implements RequestFactory {
1209:
1210: public Request createRequest(EventObject initiatingEvent,
1211: CoexistenceInfo cinfo) {
1212: Request request = new AgentXRequest(
1213: (AgentXCommandEvent) initiatingEvent);
1214: if (LOGGER.isDebugEnabled()) {
1215: LOGGER.debug("Creating AgentX request " + request
1216: + " from " + initiatingEvent);
1217: }
1218: return request;
1219: }
1220:
1221: }
1222:
1223: class Command implements Runnable {
1224:
1225: private AgentXCommandEvent request;
1226:
1227: public Command(AgentXCommandEvent event) {
1228: this .request = event;
1229: }
1230:
1231: public void run() {
1232: dispatchCommand(request);
1233: }
1234:
1235: }
1236:
1237: static class RequestID implements Comparable {
1238: private Address masterAddress;
1239: private int sessionID;
1240: private int transactionID;
1241:
1242: public RequestID(Address masterAddress, int sessionID,
1243: int transactionID) {
1244: this .masterAddress = masterAddress;
1245: this .sessionID = sessionID;
1246: this .transactionID = transactionID;
1247: }
1248:
1249: public int compareTo(Object o) {
1250: RequestID other = (RequestID) o;
1251: int c = masterAddress.compareTo(other.masterAddress);
1252: if (c == 0) {
1253: c = sessionID - other.sessionID;
1254: if (c == 0) {
1255: c = transactionID - other.transactionID;
1256: }
1257: }
1258: return c;
1259: }
1260:
1261: public boolean equals(Object obj) {
1262: if (obj instanceof RequestID) {
1263: return (compareTo(obj) == 0);
1264: }
1265: return false;
1266: }
1267:
1268: public int hashCode() {
1269: return transactionID;
1270: }
1271:
1272: }
1273:
1274: class PingTask extends TimerTask {
1275:
1276: public void run() {
1277: List l;
1278: synchronized (sessions) {
1279: l = new LinkedList(sessions.values());
1280: }
1281: for (Iterator it = l.iterator(); it.hasNext();) {
1282: AgentXSession session = (AgentXSession) it.next();
1283: for (Iterator cit = getContexts().iterator(); cit
1284: .hasNext();) {
1285: OctetString context = (OctetString) cit.next();
1286: AgentXPingPDU ping = new AgentXPingPDU(context);
1287: ping.setSessionAttributes(session);
1288: ping.setTransactionID(getNextTransactionID());
1289: PingEvent pingEvent;
1290: try {
1291: AgentXResponseEvent resp = agentX.send(ping,
1292: session.createAgentXTarget(), session
1293: .getPeer().getTransport());
1294: pingEvent = new PingEvent(this , session, resp
1295: .getResponse());
1296: } catch (IOException ex) {
1297: pingEvent = new PingEvent(this , session, ex);
1298: }
1299: firePinged(pingEvent);
1300: if (LOGGER.isDebugEnabled()) {
1301: LOGGER.debug("Fired ping event " + pingEvent);
1302: }
1303: if (pingEvent.isCloseSession()
1304: || pingEvent.isResetSession()) {
1305: try {
1306: closeSession(session.getSessionID(),
1307: AgentXProtocol.REASON_TIMEOUTS);
1308: if (pingEvent.isResetSession()) {
1309: reopenSession(session);
1310: }
1311: } catch (IOException ex1) {
1312: }
1313: }
1314: }
1315: }
1316: }
1317:
1318: /**
1319: * Reopens a closed session.
1320: *
1321: * @param session
1322: * a closed AgentXSession instance.
1323: * @return
1324: * {@link AgentXProtocol#AGENTX_SUCCESS} if the session could be opened
1325: * sucessfully. Otherwise the AgentX error status is returned.
1326: * @throws IOException
1327: * if the session cannot be reopened due to an IO exception.
1328: */
1329: public int reopenSession(AgentXSession session)
1330: throws IOException {
1331: return openSession(session.getPeer().getTransport(),
1332: session.getPeer().getAddress(), session);
1333: }
1334:
1335: }
1336: }
|