Source Code Cross Referenced for AgentXSubagent.java in  » Net » snmp4j » org » snmp4j » agent » agentx » subagent » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » snmp4j » org.snmp4j.agent.agentx.subagent 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*_############################################################################
0002:          _##
0003:          _##  SNMP4J-AgentX - AgentXSubagent.java
0004:          _##
0005:          _##  Copyright (C) 2005-2007  Frank Fock (SNMP4J.org)
0006:          _##
0007:          _##  This program is free software; you can redistribute it and/or modify
0008:          _##  it under the terms of the GNU General Public License version 2 as
0009:          _##  published by the Free Software Foundation.
0010:          _##
0011:          _##  This program is distributed in the hope that it will be useful,
0012:          _##  but WITHOUT ANY WARRANTY; without even the implied warranty of
0013:          _##  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0014:          _##  GNU General Public License for more details.
0015:          _##
0016:          _##  You should have received a copy of the GNU General Public License
0017:          _##  along with this program; if not, write to the Free Software
0018:          _##  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
0019:          _##  MA  02110-1301  USA
0020:          _##
0021:          _##########################################################################*/
0022:
0023:        package org.snmp4j.agent.agentx.subagent;
0024:
0025:        import java.io.IOException;
0026:        import java.util.*;
0027:
0028:        import org.snmp4j.PDU;
0029:        import org.snmp4j.TransportMapping;
0030:        import org.snmp4j.agent.*;
0031:        import org.snmp4j.agent.agentx.*;
0032:        import org.snmp4j.agent.agentx.event.PingEvent;
0033:        import org.snmp4j.agent.agentx.event.PingListener;
0034:        import org.snmp4j.agent.mo.MOScalar;
0035:        import org.snmp4j.agent.mo.snmp.CoexistenceInfo;
0036:        import org.snmp4j.agent.request.*;
0037:        import org.snmp4j.log.LogAdapter;
0038:        import org.snmp4j.log.LogFactory;
0039:        import org.snmp4j.mp.SnmpConstants;
0040:        import org.snmp4j.smi.*;
0041:        import org.snmp4j.transport.ConnectionOrientedTransportMapping;
0042:        import org.snmp4j.transport.TransportMappings;
0043:        import org.snmp4j.util.ThreadPool;
0044:        import org.snmp4j.agent.mo.MOTableRow;
0045:        import org.snmp4j.agent.agentx.subagent.index.AnyNewIndexOID;
0046:        import org.snmp4j.agent.agentx.subagent.index.NewIndexOID;
0047:        import org.snmp4j.agent.mo.snmp.SysUpTime;
0048:
0049:        /**
0050:         * The <code>AgentXSubagent</code> class implements the AgentX communication
0051:         * for an AgentX subagent implementation.
0052:         *
0053:         * @author Frank Fock
0054:         * @version 1.0
0055:         */
0056:        public class AgentXSubagent implements  AgentXCommandListener,
0057:                NotificationOriginator {
0058:
0059:            private static final LogAdapter LOGGER = LogFactory
0060:                    .getLogger(AgentXSubagent.class);
0061:
0062:            private ArrayList moServers = new ArrayList();
0063:            private ThreadPool threadPool;
0064:            private RequestFactory factory;
0065:            private AgentX agentX;
0066:            protected Map requestList;
0067:
0068:            protected Map peers = new LinkedHashMap(2);
0069:            protected Map sessions = new Hashtable(2);
0070:
0071:            protected RequestHandler requestHandlerGet;
0072:            protected RequestHandler requestHandlerGetNext;
0073:            protected RequestHandler requestHandlerGetBulk;
0074:            protected RequestHandler requestHandlerTestSet;
0075:            protected RequestHandler requestHandlerCommitSet;
0076:            protected RequestHandler requestHandlerUndoSet;
0077:            protected RequestHandler requestHandlerCleanupSet;
0078:
0079:            protected int nextTransactionID = 0;
0080:
0081:            private OID subagentID;
0082:            private OctetString subagentDescr;
0083:
0084:            private long timeout = AgentXProtocol.DEFAULT_TIMEOUT_SECONDS * 1000;
0085:            private byte defaultPriority = AgentXProtocol.DEFAULT_PRIORITY;
0086:
0087:            private Timer pingTimer;
0088:            private transient Vector pingListeners;
0089:
0090:            public AgentXSubagent(AgentX agentX, OID subagentID,
0091:                    OctetString subagentDescr) {
0092:                this .requestList = Collections.synchronizedMap(new HashMap(10));
0093:                this .agentX = agentX;
0094:                this .subagentID = subagentID;
0095:                this .subagentDescr = subagentDescr;
0096:                this .factory = new DefaultAgentXRequestFactory();
0097:                requestHandlerGet = new GetRequestHandler();
0098:                requestHandlerCleanupSet = new CleanupSetHandler();
0099:                requestHandlerCommitSet = new CommitSetHandler();
0100:                requestHandlerTestSet = new TestSetHandler();
0101:                requestHandlerUndoSet = new UndoSetHandler();
0102:                requestHandlerGetNext = new GetNextHandler();
0103:                requestHandlerGetBulk = new GetBulkHandler();
0104:                agentX.addCommandResponder(this );
0105:            }
0106:
0107:            /**
0108:             * Sets the ping delay in seconds. If greater than zero, for each session
0109:             * a ping PDU is sent to the master to validate the session regularly with
0110:             * the specified delay. To monitor the ping requests, it is necessary to
0111:             * add a {@link PingListener} with {@link #addPingListener}.
0112:             *
0113:             * @param seconds
0114:             *    the delay. If zero or a negative value is supplied, no pings are sent
0115:             */
0116:            public void setPingDelay(int seconds) {
0117:                if (pingTimer != null) {
0118:                    pingTimer.cancel();
0119:                    pingTimer = null;
0120:                }
0121:                if (seconds > 0) {
0122:                    pingTimer = new Timer();
0123:                    pingTimer.schedule(new PingTask(), seconds * 1000,
0124:                            seconds * 1000);
0125:                }
0126:            }
0127:
0128:            public void processCommand(AgentXCommandEvent event) {
0129:                if (event.getCommand() != null) {
0130:                    event.setProcessed(true);
0131:                    Command command = new Command(event);
0132:                    if (threadPool != null) {
0133:                        threadPool.execute(command);
0134:                    } else {
0135:                        command.run();
0136:                    }
0137:                }
0138:            }
0139:
0140:            protected synchronized int getNextTransactionID() {
0141:                return nextTransactionID++;
0142:            }
0143:
0144:            protected synchronized int closeSession(int sessionID, byte reason)
0145:                    throws IOException {
0146:                AgentXSession session = removeSession(sessionID);
0147:                if ((session == null) || (session.isClosed())) {
0148:                    return AgentXProtocol.AGENTX_NOT_OPEN;
0149:                }
0150:                session.setClosed(true);
0151:                AgentXClosePDU closePDU = new AgentXClosePDU(
0152:                        AgentXProtocol.REASON_SHUTDOWN);
0153:                AgentXTarget target = session.createAgentXTarget();
0154:                AgentXResponseEvent resp = agentX.send(closePDU, target,
0155:                        session.getPeer().getTransport());
0156:                if (resp == null) {
0157:                    return AgentXProtocol.AGENTX_TIMEOUT;
0158:                }
0159:                return ((AgentXResponsePDU) resp.getResponse())
0160:                        .getErrorStatus();
0161:            }
0162:
0163:            protected int openSession(TransportMapping transport,
0164:                    Address masterAddress, AgentXSession session)
0165:                    throws IOException {
0166:                AgentXOpenPDU openPDU = new AgentXOpenPDU(0,
0167:                        getNextTransactionID(), 0, session.getTimeout(),
0168:                        subagentID, subagentDescr);
0169:                AgentXResponseEvent responseEvent = agentX.send(openPDU,
0170:                        session.createAgentXTarget(), transport);
0171:                if (responseEvent.getResponse() == null) {
0172:                    LOGGER.error("Timeout on connection to master "
0173:                            + masterAddress);
0174:                } else if (responseEvent.getResponse() instanceof  AgentXResponsePDU) {
0175:                    AgentXResponsePDU response = responseEvent.getResponse();
0176:                    if (response.getErrorStatus() == AgentXProtocol.AGENTX_SUCCESS) {
0177:                        session.setSessionID(response.getSessionID());
0178:                    }
0179:                    return response.getErrorStatus();
0180:                } else {
0181:                    LOGGER
0182:                            .error("Received packet on open PDU is not a response AgentX PDU: "
0183:                                    + responseEvent);
0184:                }
0185:                return AgentXProtocol.AGENTX_TIMEOUT;
0186:            }
0187:
0188:            private static int getResponseStatus(
0189:                    AgentXResponseEvent responseEvent) {
0190:                if (responseEvent.getResponse() == null) {
0191:                    LOGGER.error("Timeout on connection to master "
0192:                            + responseEvent.getTarget());
0193:                    return AgentXProtocol.AGENTX_TIMEOUT;
0194:                } else if (responseEvent.getResponse() instanceof  AgentXResponsePDU) {
0195:                    AgentXResponsePDU response = responseEvent.getResponse();
0196:                    return response.getErrorStatus();
0197:                } else {
0198:                    LOGGER
0199:                            .error("Received packet on open PDU is not a response AgentX PDU: "
0200:                                    + responseEvent);
0201:                }
0202:                return AgentXProtocol.AGENTX_ERROR;
0203:            }
0204:
0205:            public void disconnect(Address masterAddress) throws IOException {
0206:                AgentXPeer peer = (AgentXPeer) peers.remove(masterAddress);
0207:                if (peer != null) {
0208:                    TransportMapping transport = peer.getTransport();
0209:                    if (transport instanceof  ConnectionOrientedTransportMapping) {
0210:                        ((ConnectionOrientedTransportMapping) transport)
0211:                                .close(masterAddress);
0212:                    }
0213:                }
0214:            }
0215:
0216:            public int connect(Address masterAddress, Address localAddress,
0217:                    AgentXSession session) throws IOException {
0218:                AgentXPeer peer = (AgentXPeer) peers.get(masterAddress);
0219:                TransportMapping transport;
0220:                if (peer == null) {
0221:                    transport = addMaster(localAddress);
0222:                    peer = new AgentXPeer(transport, masterAddress);
0223:                } else {
0224:                    transport = peer.getTransport();
0225:                }
0226:                peer.setTimeout(session.getTimeout());
0227:                session.setPeer(peer);
0228:                int status = AgentXProtocol.AGENTX_TIMEOUT;
0229:                try {
0230:                    status = openSession(transport, masterAddress, session);
0231:                    if (status != AgentXProtocol.AGENTX_TIMEOUT) {
0232:                        peers.put(masterAddress, peer);
0233:                        LOGGER.info("Added new peer address=" + masterAddress
0234:                                + ",peer=" + peer);
0235:                    }
0236:                } catch (IOException ex) {
0237:                    LOGGER.error(ex);
0238:                    removeMaster(transport);
0239:                    return AgentXProtocol.AGENTX_ERROR;
0240:                }
0241:                if (status == AgentXProtocol.AGENTX_SUCCESS) {
0242:                    sessions.put(new Integer(session.getSessionID()), session);
0243:                    LOGGER.info("Opened subagent session successfully: "
0244:                            + session);
0245:                } else {
0246:                    removeMaster(transport);
0247:                }
0248:                return status;
0249:            }
0250:
0251:            public int close(AgentXSession session, byte reason)
0252:                    throws IOException {
0253:                return closeSession(session.getSessionID(), reason);
0254:            }
0255:
0256:            private synchronized AgentXSession getSession(int sessionID) {
0257:                return (AgentXSession) sessions.get(new Integer(sessionID));
0258:            }
0259:
0260:            private synchronized AgentXSession removeSession(int sessionID) {
0261:                return (AgentXSession) sessions.remove(new Integer(sessionID));
0262:            }
0263:
0264:            public void setDefaultPriority(byte priority) {
0265:                this .defaultPriority = priority;
0266:            }
0267:
0268:            public byte getDefaultPriority() {
0269:                return defaultPriority;
0270:            }
0271:
0272:            /**
0273:             * Gets the priority with which the supplied managed object and
0274:             * region should be registered at the master agent. Overwrite
0275:             * this method to use individual priorites depending on the registered
0276:             * region/managed object. The default implementation returns
0277:             * {@link #getDefaultPriority()}.
0278:             *
0279:             * @param mo ManagedObject
0280:             *    a managed object instance that manages <code>region</code>.
0281:             * @param region
0282:             *    the region to be registered.
0283:             * @return
0284:             *    the priority between 0 and 255 (lower value results in higher priority).
0285:             */
0286:            protected byte getPriority(ManagedObject mo, AgentXRegion region) {
0287:                return defaultPriority;
0288:            }
0289:
0290:            /**
0291:             * Registers the subagent regions at the master agent.
0292:             * @param session
0293:             *    the session on whose behalf regions are registered.
0294:             * @param context
0295:             *    the context to use for registration.
0296:             * @return
0297:             *    a List of the managed objects which failed to register.
0298:             */
0299:            public List registerRegions(AgentXSession session,
0300:                    OctetString context) {
0301:                return registerRegions(session, context, null);
0302:            }
0303:
0304:            /**
0305:             * Registers the subagent regions at the master agent.
0306:             * @param session
0307:             *    the session on whose behalf regions are registered.
0308:             * @param context
0309:             *    the context to use for registration.
0310:             * @param sysUpTime
0311:             *    if not <code>null</code>, the master agent's notion of the sysUpTime
0312:             *    for the registered context is returned. The input value is always
0313:             *    ignored!
0314:             * @return
0315:             *    a List of the managed objects which failed to register.
0316:             */
0317:            public List registerRegions(AgentXSession session,
0318:                    OctetString context, TimeTicks sysUpTime) {
0319:                LinkedList failures = new LinkedList();
0320:                MOServer server = getServer(context);
0321:                if (server == null) {
0322:                    LOGGER.warn("No MOServer found for context '" + context
0323:                            + "'");
0324:                    return null;
0325:                }
0326:                for (Iterator it = server.iterator(); it.hasNext();) {
0327:                    ManagedObject mo = (ManagedObject) it.next();
0328:                    if (mo instanceof  AgentXSharedMOTable) {
0329:                        List failedRows = registerSharedTableRows(session,
0330:                                context, (AgentXSharedMOTable) mo);
0331:                        failures.addAll(failedRows);
0332:                    } else {
0333:                        MOScope scope = mo.getScope();
0334:                        AgentXRegion region = new AgentXRegion(scope
0335:                                .getLowerBound(), scope.getUpperBound());
0336:                        if (mo instanceof  MOScalar) {
0337:                            region.setSingleOID(true);
0338:                        }
0339:                        region.setUpperIncluded(scope.isUpperIncluded());
0340:                        try {
0341:                            int status = registerRegion(session, context,
0342:                                    region, getPriority(mo, region), sysUpTime);
0343:                            if (status != AgentXProtocol.AGENTX_SUCCESS) {
0344:                                failures.add(mo);
0345:                                if (LOGGER.isWarnEnabled()) {
0346:                                    LOGGER.warn("Failed to registered MO "
0347:                                            + scope + " with status = "
0348:                                            + status);
0349:                                }
0350:                            } else {
0351:                                if (LOGGER.isInfoEnabled()) {
0352:                                    LOGGER.info("Registered MO " + scope
0353:                                            + " successfully");
0354:                                }
0355:                            }
0356:
0357:                        } catch (IOException ex) {
0358:                            LOGGER.warn("Failed to register " + mo
0359:                                    + " in context '" + context
0360:                                    + "' of session " + session);
0361:                            failures.add(mo);
0362:                        }
0363:                    }
0364:                }
0365:                return failures;
0366:            }
0367:
0368:            /**
0369:             * Registers the indexes and (row) regions of a shared table. This method
0370:             * is called on behalf of {@link #registerRegions(AgentXSession session,
0371:             * OctetString context, TimeTicks sysUpTime)} and
0372:             * {@link #registerRegions(AgentXSession session, OctetString context)}.
0373:             *
0374:             * @param session
0375:             *    the session on whose behalf regions are registered.
0376:             * @param context
0377:             *    the context to use for registration.
0378:             * @param mo
0379:             *    the <code>AgentXSharedMOTable</code> instance to register.
0380:             * @return List
0381:             *    a list of failed {@link MOTableRow} instances.
0382:             */
0383:            public List registerSharedTableRows(AgentXSession session,
0384:                    OctetString context, AgentXSharedMOTable mo) {
0385:                LinkedList failedRows = new LinkedList();
0386:                AgentXSharedMOTableSupport sharedTableSupport = new AgentXSharedMOTableSupport(
0387:                        agentX, session, context);
0388:                synchronized (mo) {
0389:                    if (mo instanceof  AgentXSharedMutableMOTable) {
0390:                        ((AgentXSharedMutableMOTable) mo)
0391:                                .setAgentXSharedMOTableSupport(sharedTableSupport);
0392:                    }
0393:                    for (Iterator it = mo.getModel().iterator(); it.hasNext();) {
0394:                        MOTableRow row = (MOTableRow) it.next();
0395:                        OID newIndex = (OID) row.getIndex().clone();
0396:                        int status = sharedTableSupport
0397:                                .allocateIndex(
0398:                                        context,
0399:                                        mo.getIndexDef(),
0400:                                        (byte) AgentXSharedMOTableSupport.INDEX_MODE_ALLOCATE,
0401:                                        newIndex);
0402:                        if (status == AgentXProtocol.AGENTX_SUCCESS) {
0403:                            if ((newIndex instanceof  AnyNewIndexOID)
0404:                                    || (newIndex instanceof  NewIndexOID)) {
0405:                                if (mo instanceof  AgentXSharedMutableMOTable) {
0406:                                    ((AgentXSharedMutableMOTable) mo)
0407:                                            .changeRowIndex(newIndex, row
0408:                                                    .getIndex());
0409:                                }
0410:                            }
0411:                            status = sharedTableSupport.registerRow(mo, row);
0412:                            if (status != AgentXProtocol.AGENTX_SUCCESS) {
0413:                                sharedTableSupport.deallocateIndex(context, mo
0414:                                        .getIndexDef(), row.getIndex());
0415:                                LOGGER.warn("Failed to register row with "
0416:                                        + status + " for " + row);
0417:                                failedRows.add(row);
0418:                            }
0419:                        } else {
0420:                            LOGGER.warn("Failed to allocate index with "
0421:                                    + status + " for row " + row);
0422:                            failedRows.add(row);
0423:                        }
0424:                    }
0425:                }
0426:                return failedRows;
0427:            }
0428:
0429:            protected int registerRegion(AgentXSession session,
0430:                    OctetString context, AgentXRegion region, byte priority,
0431:                    TimeTicks sysUpTime) throws IOException {
0432:                if ((session == null) || (session.isClosed())) {
0433:                    return AgentXProtocol.AGENTX_NOT_OPEN;
0434:                }
0435:                long t = (this .timeout == 0) ? session.getTimeout() * 1000
0436:                        : this .timeout;
0437:                AgentXRegisterPDU pdu = new AgentXRegisterPDU(context, region
0438:                        .getLowerBound(), priority, region.getRangeSubID(),
0439:                        region.getUpperBoundSubID());
0440:                pdu.setSessionAttributes(session);
0441:                AgentXResponseEvent event = agentX.send(pdu, new AgentXTarget(
0442:                        session.getPeer().getAddress(), t), session.getPeer()
0443:                        .getTransport());
0444:                if ((sysUpTime != null) && (event.getResponse() != null)) {
0445:                    sysUpTime
0446:                            .setValue(event.getResponse().getSysUpTime() & 0xFFFFFFFFL);
0447:                }
0448:                return getResponseStatus(event);
0449:            }
0450:
0451:            protected int unregisterRegion(AgentXSession session,
0452:                    OctetString context, AgentXRegion region, byte timeout)
0453:                    throws IOException {
0454:                if ((session == null) || (session.isClosed())) {
0455:                    return AgentXProtocol.AGENTX_NOT_OPEN;
0456:                }
0457:                byte t = (timeout == 0) ? session.getTimeout() : timeout;
0458:                AgentXUnregisterPDU pdu = new AgentXUnregisterPDU(context,
0459:                        region.getLowerBound(), t, region.getRangeSubID(),
0460:                        region.getUpperBoundSubID());
0461:                pdu.setSessionAttributes(session);
0462:                AgentXResponseEvent event = agentX.send(pdu, new AgentXTarget(
0463:                        session.getPeer().getAddress(), this .timeout), session
0464:                        .getPeer().getTransport());
0465:                return getResponseStatus(event);
0466:            }
0467:
0468:            protected TransportMapping addMaster(Address localAddress)
0469:                    throws IOException {
0470:                /*
0471:                 if (transport != null) {
0472:                 try {
0473:                 transport.close();
0474:                 }
0475:                 catch (IOException ex) {
0476:                 logger.error(ex);
0477:                 }
0478:                 agentX.removeTransportMapping(transport);
0479:                 }
0480:                 */
0481:                TransportMapping transport = TransportMappings.getInstance()
0482:                        .createTransportMapping(localAddress);
0483:                if (transport instanceof  ConnectionOrientedTransportMapping) {
0484:                    ConnectionOrientedTransportMapping tcpTransport = (ConnectionOrientedTransportMapping) transport;
0485:                    tcpTransport.setConnectionTimeout(0);
0486:                    tcpTransport.setMessageLengthDecoder(new AgentXProtocol());
0487:                }
0488:                agentX.addTransportMapping(transport);
0489:                transport.listen();
0490:                return transport;
0491:            }
0492:
0493:            protected void removeMaster(TransportMapping transport) {
0494:                agentX.removeTransportMapping(transport);
0495:                try {
0496:                    transport.close();
0497:                } catch (IOException ex) {
0498:                    LOGGER.warn("Closing transport mapping " + transport
0499:                            + " failed with: " + ex.getMessage());
0500:                }
0501:            }
0502:
0503:            public synchronized void addMOServer(MOServer server) {
0504:                moServers.add(server);
0505:            }
0506:
0507:            public synchronized void removeMOServer(MOServer server) {
0508:                moServers.remove(server);
0509:            }
0510:
0511:            public synchronized MOServer getServer(OctetString context) {
0512:                for (int i = 0; i < moServers.size(); i++) {
0513:                    MOServer s = (MOServer) moServers.get(i);
0514:                    if (s.isContextSupported(context)) {
0515:                        return s;
0516:                    }
0517:                }
0518:                return null;
0519:            }
0520:
0521:            public synchronized Collection getContexts() {
0522:                LinkedList allContexts = new LinkedList();
0523:                for (int i = 0; i < moServers.size(); i++) {
0524:                    MOServer s = (MOServer) moServers.get(i);
0525:                    OctetString[] contexts = s.getContexts();
0526:                    allContexts.addAll(Arrays.asList(contexts));
0527:                }
0528:                return allContexts;
0529:            }
0530:
0531:            public ThreadPool getThreadPool() {
0532:                return threadPool;
0533:            }
0534:
0535:            public void setThreadPool(ThreadPool threadPool) {
0536:                this .threadPool = threadPool;
0537:            }
0538:
0539:            public void dispatchCommand(AgentXCommandEvent cmd) {
0540:                boolean pendingSessionClose = false;
0541:                if (cmd.getCommand().isConfirmedPDU()) {
0542:                    AgentXRequest request = null;
0543:                    MOServer server = null;
0544:                    switch (cmd.getCommand().getType()) {
0545:                    case AgentXPDU.AGENTX_GET_PDU: {
0546:                        request = (AgentXRequest) factory.createRequest(cmd,
0547:                                null);
0548:                        server = getServer(request.getContext());
0549:                        requestHandlerGet.processPdu(request, server);
0550:                        break;
0551:                    }
0552:                    case AgentXPDU.AGENTX_GETNEXT_PDU: {
0553:                        request = (AgentXRequest) factory.createRequest(cmd,
0554:                                null);
0555:                        server = getServer(request.getContext());
0556:                        requestHandlerGetNext.processPdu(request, server);
0557:                        break;
0558:                    }
0559:                    case AgentXPDU.AGENTX_GETBULK_PDU: {
0560:                        request = (AgentXRequest) factory.createRequest(cmd,
0561:                                null);
0562:                        server = getServer(request.getContext());
0563:                        requestHandlerGetBulk.processPdu(request, server);
0564:                        break;
0565:                    }
0566:                    case AgentXPDU.AGENTX_TESTSET_PDU: {
0567:                        request = (AgentXRequest) factory.createRequest(cmd,
0568:                                null);
0569:                        request.setPhase(Request.PHASE_2PC_PREPARE);
0570:                        server = getServer(request.getContext());
0571:                        requestHandlerTestSet.processPdu(request, server);
0572:                        requestList.put(createRequestID(cmd), request);
0573:                        break;
0574:                    }
0575:                    case AgentXPDU.AGENTX_COMMITSET_PDU:
0576:                    case AgentXPDU.AGENTX_UNDOSET_PDU:
0577:                    case AgentXPDU.AGENTX_CLEANUPSET_PDU: {
0578:                        RequestID reqID = createRequestID(cmd);
0579:                        request = (AgentXRequest) requestList.get(reqID);
0580:                        if (request == null) {
0581:                            LOGGER.error("Request with ID " + reqID
0582:                                    + " not found in request list");
0583:                            request = new AgentXRequest(cmd);
0584:                            request
0585:                                    .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0586:                            break;
0587:                        }
0588:                        server = getServer(request.getContext());
0589:                        switch (cmd.getCommand().getType()) {
0590:                        case AgentXPDU.AGENTX_COMMITSET_PDU:
0591:                            request.setPhase(Request.PHASE_2PC_COMMIT);
0592:                            requestHandlerCommitSet.processPdu(request, server);
0593:                            break;
0594:                        case AgentXPDU.AGENTX_UNDOSET_PDU:
0595:                            request.setPhase(Request.PHASE_2PC_UNDO);
0596:                            requestHandlerUndoSet.processPdu(request, server);
0597:                            break;
0598:                        case AgentXPDU.AGENTX_CLEANUPSET_PDU:
0599:                            request.setPhase(Request.PHASE_2PC_CLEANUP);
0600:                            requestHandlerCleanupSet
0601:                                    .processPdu(request, server);
0602:                            break;
0603:                        default: {
0604:                            LOGGER.fatal("Internal error");
0605:                        }
0606:                        }
0607:                        if (cmd.getCommand().getType() != AgentXPDU.AGENTX_COMMITSET_PDU) {
0608:                            // remove request from request list
0609:                            requestList.remove(reqID);
0610:                        }
0611:                        break;
0612:                    }
0613:                    case AgentXPDU.AGENTX_CLOSE_PDU: {
0614:                        AgentXSession session = removeSession(cmd.getCommand()
0615:                                .getSessionID());
0616:                        if (session != null) {
0617:                            session.setClosed(true);
0618:                            pendingSessionClose = true;
0619:                        }
0620:                        break;
0621:                    }
0622:                    default: {
0623:                        LOGGER.error("Unhandled PDU type: " + cmd.getCommand());
0624:                        request = new AgentXRequest(cmd);
0625:                        request
0626:                                .setErrorStatus(AgentXProtocol.AGENTX_PROCESSING_ERROR);
0627:                    }
0628:                    }
0629:                    if (request != null) {
0630:                        // Since this is an AgentX subagent it only processes a single phase at
0631:                        // once.
0632:                        if (request.isPhaseComplete()) {
0633:                            // send response
0634:                            sendResponse(cmd, request);
0635:                        }
0636:                        if (server != null) {
0637:                            release(server, request);
0638:                        }
0639:                    }
0640:                    if (pendingSessionClose) {
0641:                        try {
0642:                            disconnect(cmd.getPeerAddress());
0643:                        } catch (IOException ex) {
0644:                            LOGGER.error("Failed to disconnect from master at "
0645:                                    + cmd.getPeerAddress() + ": "
0646:                                    + ex.getMessage(), ex);
0647:                        }
0648:                    }
0649:                } else {
0650:                    processResponse(cmd);
0651:                }
0652:            }
0653:
0654:            protected void sendResponse(AgentXCommandEvent cmd,
0655:                    AgentXRequest request) {
0656:                AgentXMessageDispatcher dispatcher = cmd.getDispatcher();
0657:                AgentXResponsePDU response = request.getResponsePDU();
0658:                if (response != null) {
0659:                    AgentXPDU rpdu = cmd.getCommand();
0660:                    response.setSessionID(rpdu.getSessionID());
0661:                    response.setTransactionID(rpdu.getTransactionID());
0662:                    response.setByteOrder(rpdu.getByteOrder());
0663:                    response.setPacketID(rpdu.getPacketID());
0664:                    // only send a response if required
0665:                    try {
0666:                        dispatcher.send(cmd.getPeerTransport(), cmd
0667:                                .getPeerAddress(), response, null);
0668:                    } catch (IOException ex) {
0669:                        LOGGER.warn("Failed to send AgentX response to '"
0670:                                + cmd.getPeerAddress() + "' with error: "
0671:                                + ex.getMessage());
0672:                    }
0673:                }
0674:            }
0675:
0676:            protected void release(MOServer server, Request req) {
0677:                for (Iterator it = req.iterator(); it.hasNext();) {
0678:                    SubRequest sreq = (SubRequest) it.next();
0679:                    if (sreq.getTargetMO() != null) {
0680:                        server.unlock(req, sreq.getTargetMO());
0681:                    }
0682:                }
0683:            }
0684:
0685:            private static RequestID createRequestID(AgentXCommandEvent cmd) {
0686:                return new RequestID(cmd.getPeerAddress(), cmd.getCommand()
0687:                        .getSessionID(), cmd.getCommand().getTransactionID());
0688:
0689:            }
0690:
0691:            protected void processResponse(AgentXCommandEvent cmd) {
0692:                if (LOGGER.isDebugEnabled()) {
0693:                    LOGGER.debug("Received response " + cmd);
0694:                }
0695:            }
0696:
0697:            protected void processNextSubRequest(Request request,
0698:                    MOServer server, OctetString context, SubRequest sreq)
0699:                    throws NoSuchElementException {
0700:                // We can be sure to have a default context scope here because
0701:                // the inner class AgentXSubRequest creates it!
0702:                DefaultMOContextScope scope = (DefaultMOContextScope) sreq
0703:                        .getScope();
0704:                MOQuery query = sreq.getQuery();
0705:                if (query == null) {
0706:                    query = new DefaultMOQuery(scope, false, request);
0707:                }
0708:                while (!sreq.getStatus().isProcessed()) {
0709:                    ManagedObject mo = server.lookup(query);
0710:                    if (mo == null) {
0711:                        if (LOGGER.isDebugEnabled()) {
0712:                            LOGGER.debug("EndOfMibView at scope="
0713:                                    + query.getScope() + " and query " + query);
0714:                        }
0715:                        sreq.getVariableBinding()
0716:                                .setVariable(Null.endOfMibView);
0717:                        sreq.getStatus().setPhaseComplete(true);
0718:                        break;
0719:                    }
0720:                    try {
0721:                        if (!mo.next(sreq)) {
0722:                            // We can be sure to have a default context scope here because
0723:                            // the inner class SnmpSubRequest creates it!
0724:                            // don't forget to update query:
0725:                            sreq.getVariableBinding()
0726:                                    .setVariable(Null.instance);
0727:                            scope.substractScope(mo.getScope());
0728:                            // query is updated automatically because scope is updated.
0729:                            query.substractScope(mo.getScope());
0730:                        }
0731:                    } catch (Exception moex) {
0732:                        if (LOGGER.isDebugEnabled()) {
0733:                            moex.printStackTrace();
0734:                        }
0735:                        LOGGER.warn(moex);
0736:                        if (sreq.getStatus().getErrorStatus() == PDU.noError) {
0737:                            sreq.getStatus().setErrorStatus(PDU.genErr);
0738:                        }
0739:                    }
0740:                }
0741:            }
0742:
0743:            /**
0744:             * Sends notifications (traps) to all appropriate notification targets
0745:             * through the master agent.
0746:             *
0747:             * @param context the context name of the context on whose behalf this
0748:             *   notification has been generated.
0749:             * @param notificationID the object ID that uniquely identifies this
0750:             *   notification. For SNMPv1 traps, the notification ID has to be build
0751:             *   using the rules provided by RFC 2576.
0752:             * @param vbs an array of <code>VariableBinding</code> instances
0753:             *   representing the payload of the notification.
0754:             * @return
0755:             *    an {@link AgentXResponseEvent} instance or <code>null</code> if the
0756:             *    notification request timed out.
0757:             */
0758:            public Object notify(OctetString context, OID notificationID,
0759:                    VariableBinding[] vbs) {
0760:                return notify(context, notificationID, null, vbs);
0761:            }
0762:
0763:            public Object notify(OctetString context, OID notificationID,
0764:                    TimeTicks sysUpTime, VariableBinding[] vbs) {
0765:                AgentXSession session = firstSession();
0766:                AgentXResponseEvent agentXResponse = null;
0767:                try {
0768:                    agentXResponse = notify(session, context, notificationID,
0769:                            sysUpTime, vbs);
0770:                    if ((agentXResponse == null)
0771:                            || (agentXResponse.getResponse() == null)) {
0772:                        LOGGER
0773:                                .warn("Timeout on sending notification in context '"
0774:                                        + context
0775:                                        + "' with ID '"
0776:                                        + notificationID
0777:                                        + "' and payload "
0778:                                        + Arrays.asList(vbs));
0779:                        return null;
0780:                    }
0781:                    return agentXResponse;
0782:                } catch (IOException ex) {
0783:                    LOGGER.error("Failed to send notification in context '"
0784:                            + context + "' with ID '" + notificationID
0785:                            + "' and payload " + Arrays.asList(vbs)
0786:                            + ", reason is: " + ex.getMessage());
0787:                    return null;
0788:                }
0789:            }
0790:
0791:            /**
0792:             * Returns the first session that have been opened by this subagent and is
0793:             * still open. If no open session exists, <code>null</code> is returned.
0794:             *
0795:             * @return
0796:             *    an <code>AgentXSession</code>.
0797:             */
0798:            public synchronized final AgentXSession firstSession() {
0799:                if (sessions.size() > 0) {
0800:                    return (AgentXSession) sessions.values().iterator().next();
0801:                }
0802:                return null;
0803:            }
0804:
0805:            public AgentXResponseEvent notify(AgentXSession session,
0806:                    OctetString context, OID notificationID,
0807:                    TimeTicks sysUpTime, VariableBinding[] vbs)
0808:                    throws IOException {
0809:                int offset = 1;
0810:                if (sysUpTime != null) {
0811:                    offset = 2;
0812:                }
0813:                VariableBinding[] notifyVBs = new VariableBinding[vbs.length
0814:                        + offset];
0815:                if (sysUpTime != null) {
0816:                    notifyVBs[0] = new VariableBinding(SnmpConstants.sysUpTime,
0817:                            sysUpTime);
0818:                }
0819:                notifyVBs[offset - 1] = new VariableBinding(
0820:                        SnmpConstants.snmpTrapOID, notificationID);
0821:                System.arraycopy(vbs, 0, notifyVBs, offset, vbs.length);
0822:                AgentXNotifyPDU notifyPDU = new AgentXNotifyPDU(context,
0823:                        notifyVBs);
0824:                notifyPDU.setSessionAttributes(session);
0825:                notifyPDU.setTransactionID(getNextTransactionID());
0826:                AgentXResponseEvent response = agentX
0827:                        .send(notifyPDU, session.createAgentXTarget(), session
0828:                                .getPeer().getTransport());
0829:                return response;
0830:            }
0831:
0832:            public int addAgentCaps(AgentXSession session, OctetString context,
0833:                    OID id, OctetString descr) {
0834:                AgentXAddAgentCapsPDU pdu = new AgentXAddAgentCapsPDU(context,
0835:                        id, descr);
0836:                pdu.setSessionAttributes(session);
0837:                try {
0838:                    AgentXResponseEvent resp = agentX.send(pdu, session
0839:                            .createAgentXTarget(), session.getPeer()
0840:                            .getTransport());
0841:                    if (resp.getResponse() == null) {
0842:                        return AgentXProtocol.AGENTX_TIMEOUT;
0843:                    }
0844:                    return resp.getResponse().getErrorStatus();
0845:                } catch (IOException ex) {
0846:                    LOGGER.error("Failed to send AgentX AddAgentCaps PDU "
0847:                            + pdu + " because: " + ex.getMessage(), ex);
0848:                    return AgentXProtocol.AGENTX_NOT_OPEN;
0849:                }
0850:            }
0851:
0852:            public int removeAgentCaps(AgentXSession session,
0853:                    OctetString context, OID id) {
0854:                AgentXRemoveAgentCapsPDU pdu = new AgentXRemoveAgentCapsPDU(
0855:                        context, id);
0856:                pdu.setSessionAttributes(session);
0857:                try {
0858:                    AgentXResponseEvent resp = agentX.send(pdu, session
0859:                            .createAgentXTarget(), session.getPeer()
0860:                            .getTransport());
0861:                    return resp.getResponse().getErrorStatus();
0862:                } catch (IOException ex) {
0863:                    LOGGER.error("Failed to send AgentX RemoveAgentCaps PDU "
0864:                            + pdu + " because: " + ex.getMessage(), ex);
0865:                    return AgentXProtocol.AGENTX_NOT_OPEN;
0866:                }
0867:            }
0868:
0869:            public void addPingListener(PingListener l) {
0870:                if (pingListeners == null) {
0871:                    pingListeners = new Vector();
0872:                }
0873:                pingListeners.add(l);
0874:            }
0875:
0876:            public void removePingListener(PingListener l) {
0877:                if (pingListeners != null) {
0878:                    synchronized (pingListeners) {
0879:                        pingListeners.remove(l);
0880:                    }
0881:                }
0882:            }
0883:
0884:            protected void firePinged(PingEvent event) {
0885:                if (pingListeners != null) {
0886:                    synchronized (pingListeners) {
0887:                        Vector listeners = pingListeners;
0888:                        int count = listeners.size();
0889:                        for (int i = 0; i < count; i++) {
0890:                            ((PingListener) listeners.elementAt(i))
0891:                                    .pinged(event);
0892:                        }
0893:                    }
0894:                }
0895:            }
0896:
0897:            private static void initRequestPhase(Request request) {
0898:                if (request.getPhase() == Request.PHASE_INIT) {
0899:                    request.nextPhase();
0900:                }
0901:            }
0902:
0903:            static class GetRequestHandler implements  RequestHandler {
0904:
0905:                public boolean isSupported(int pduType) {
0906:                    return pduType == AgentXPDU.AGENTX_GET_PDU;
0907:                }
0908:
0909:                public void processPdu(Request request, MOServer server) {
0910:                    initRequestPhase(request);
0911:                    try {
0912:                        SubRequestIterator it = (SubRequestIterator) request
0913:                                .iterator();
0914:                        while (it.hasNext()) {
0915:                            SubRequest sreq = it.nextSubRequest();
0916:                            DefaultMOQuery query = new DefaultMOQuery(
0917:                                    (MOContextScope) sreq.getScope(), false,
0918:                                    request);
0919:                            ManagedObject mo = server.lookup(query);
0920:                            if (mo == null) {
0921:                                sreq.getVariableBinding().setVariable(
0922:                                        Null.noSuchObject);
0923:                                sreq.getStatus().setPhaseComplete(true);
0924:                                continue;
0925:                            }
0926:                            try {
0927:                                mo.get(sreq);
0928:                            } catch (Exception moex) {
0929:                                if (LOGGER.isDebugEnabled()) {
0930:                                    moex.printStackTrace();
0931:                                }
0932:                                LOGGER.warn(moex);
0933:                                if (sreq.getStatus().getErrorStatus() == PDU.noError) {
0934:                                    sreq.getStatus().setErrorStatus(PDU.genErr);
0935:                                }
0936:                            }
0937:                        }
0938:                    } catch (NoSuchElementException nsex) {
0939:                        if (LOGGER.isDebugEnabled()) {
0940:                            nsex.printStackTrace();
0941:                        }
0942:                        LOGGER.error("SubRequest not found");
0943:                        request.setErrorStatus(PDU.genErr);
0944:                    }
0945:                }
0946:            }
0947:
0948:            class GetNextHandler implements  RequestHandler {
0949:
0950:                public void processPdu(Request request, MOServer server) {
0951:                    initRequestPhase(request);
0952:                    OctetString context = request.getContext();
0953:                    try {
0954:                        SubRequestIterator it = (SubRequestIterator) request
0955:                                .iterator();
0956:                        while (it.hasNext()) {
0957:                            SubRequest sreq = it.nextSubRequest();
0958:                            processNextSubRequest(request, server, context,
0959:                                    sreq);
0960:                        }
0961:                    } catch (NoSuchElementException nsex) {
0962:                        if (LOGGER.isDebugEnabled()) {
0963:                            nsex.printStackTrace();
0964:                        }
0965:                        LOGGER.error("SubRequest not found");
0966:                        request.setErrorStatus(PDU.genErr);
0967:                    }
0968:                }
0969:
0970:                public boolean isSupported(int pduType) {
0971:                    return (pduType == PDU.GETNEXT);
0972:                }
0973:
0974:            }
0975:
0976:            class GetBulkHandler implements  RequestHandler {
0977:
0978:                public void processPdu(Request request, MOServer server) {
0979:                    initRequestPhase(request);
0980:                    OctetString context = request.getContext();
0981:                    AgentXRequest req = (AgentXRequest) request;
0982:                    int nonRep = req.getNonRepeaters();
0983:                    try {
0984:                        SubRequestIterator it = (SubRequestIterator) request
0985:                                .iterator();
0986:                        int i = 0;
0987:                        // non repeaters
0988:                        for (; ((i < nonRep) && it.hasNext()); i++) {
0989:                            SubRequest sreq = it.nextSubRequest();
0990:                            processNextSubRequest(request, server, context,
0991:                                    sreq);
0992:                        }
0993:                        // repetitions
0994:                        for (; it.hasNext(); i++) {
0995:                            SubRequest sreq = it.nextSubRequest();
0996:                            processNextSubRequest(request, server, context,
0997:                                    sreq);
0998:                        }
0999:                    } catch (NoSuchElementException nsex) {
1000:                        if (LOGGER.isDebugEnabled()) {
1001:                            nsex.printStackTrace();
1002:                        }
1003:                        LOGGER.error("SubRequest not found");
1004:                        request.setErrorStatus(PDU.genErr);
1005:                    }
1006:
1007:                }
1008:
1009:                public boolean isSupported(int pduType) {
1010:                    return (pduType == PDU.GETBULK);
1011:                }
1012:
1013:            }
1014:
1015:            static class TestSetHandler implements  RequestHandler {
1016:
1017:                public void processPdu(Request request, MOServer server) {
1018:                    try {
1019:                        SubRequestIterator it = (SubRequestIterator) request
1020:                                .iterator();
1021:                        while ((!request.isPhaseComplete()) && (it.hasNext())) {
1022:                            SubRequest sreq = it.nextSubRequest();
1023:                            if (sreq.isComplete()) {
1024:                                continue;
1025:                            }
1026:                            DefaultMOQuery query = new DefaultMOQuery(
1027:                                    (MOContextScope) sreq.getScope(), false,
1028:                                    request);
1029:                            ManagedObject mo = server.lookup(query);
1030:                            if (mo == null) {
1031:                                sreq.getStatus().setErrorStatus(PDU.noSuchName);
1032:                                break;
1033:                            }
1034:                            sreq.setTargetMO(mo);
1035:                            server.lock(sreq.getRequest(), mo);
1036:                            try {
1037:                                mo.prepare(sreq);
1038:                                sreq.getStatus().setPhaseComplete(true);
1039:                            } catch (Exception moex) {
1040:                                moex.printStackTrace();
1041:                                if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1042:                                    sreq.getStatus().setErrorStatus(PDU.genErr);
1043:                                }
1044:                                /**@todo implement error handling */
1045:                            }
1046:                        }
1047:                    } catch (NoSuchElementException nsex) {
1048:                        if (LOGGER.isDebugEnabled()) {
1049:                            nsex.printStackTrace();
1050:                        }
1051:                        LOGGER.error("Cannot find sub-request: ", nsex);
1052:                        request.setErrorStatus(PDU.genErr);
1053:                    }
1054:                }
1055:
1056:                public boolean isSupported(int pduType) {
1057:                    return (pduType == AgentXPDU.AGENTX_TESTSET_PDU);
1058:                }
1059:            }
1060:
1061:            class UndoSetHandler implements  RequestHandler {
1062:
1063:                public void processPdu(Request request, MOServer server) {
1064:                    try {
1065:                        SubRequestIterator it = (SubRequestIterator) request
1066:                                .iterator();
1067:                        while (it.hasNext()) {
1068:                            SubRequest sreq = it.nextSubRequest();
1069:                            if (sreq.isComplete()) {
1070:                                continue;
1071:                            }
1072:                            ManagedObject mo = sreq.getTargetMO();
1073:                            if (mo == null) {
1074:                                DefaultMOQuery query = new DefaultMOQuery(
1075:                                        (MOContextScope) sreq.getScope(), true);
1076:                                mo = server.lookup(query);
1077:                            }
1078:                            if (mo == null) {
1079:                                sreq.getStatus().setErrorStatus(PDU.undoFailed);
1080:                                continue;
1081:                            }
1082:                            try {
1083:                                mo.undo(sreq);
1084:                                sreq.getStatus().setPhaseComplete(true);
1085:                            } catch (Exception moex) {
1086:                                if (LOGGER.isDebugEnabled()) {
1087:                                    moex.printStackTrace();
1088:                                }
1089:                                LOGGER.error(moex);
1090:                                if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1091:                                    sreq.getStatus().setErrorStatus(
1092:                                            PDU.undoFailed);
1093:                                }
1094:                            }
1095:                        }
1096:                    } catch (NoSuchElementException nsex) {
1097:                        if (LOGGER.isDebugEnabled()) {
1098:                            nsex.printStackTrace();
1099:                        }
1100:                        LOGGER.error("Cannot find sub-request: ", nsex);
1101:                        request.setErrorStatus(PDU.genErr);
1102:                    }
1103:                }
1104:
1105:                public boolean isSupported(int pduType) {
1106:                    return (pduType == AgentXPDU.AGENTX_UNDOSET_PDU);
1107:                }
1108:            }
1109:
1110:            class CommitSetHandler implements  RequestHandler {
1111:
1112:                public void processPdu(Request request, MOServer server) {
1113:                    try {
1114:                        SubRequestIterator it = (SubRequestIterator) request
1115:                                .iterator();
1116:                        while ((!request.isPhaseComplete()) && (it.hasNext())) {
1117:                            SubRequest sreq = it.nextSubRequest();
1118:                            if (sreq.isComplete()) {
1119:                                continue;
1120:                            }
1121:                            ManagedObject mo = sreq.getTargetMO();
1122:                            if (mo == null) {
1123:                                DefaultMOQuery query = new DefaultMOQuery(
1124:                                        (MOContextScope) sreq.getScope(), true);
1125:                                mo = server.lookup(query);
1126:                            }
1127:                            if (mo == null) {
1128:                                sreq.getStatus().setErrorStatus(
1129:                                        PDU.commitFailed);
1130:                                continue;
1131:                            }
1132:                            try {
1133:                                mo.commit(sreq);
1134:                                sreq.getStatus().setPhaseComplete(true);
1135:                            } catch (Exception moex) {
1136:                                if (LOGGER.isDebugEnabled()) {
1137:                                    moex.printStackTrace();
1138:                                }
1139:                                LOGGER.error(moex);
1140:                                if (sreq.getStatus().getErrorStatus() == PDU.noError) {
1141:                                    sreq.getStatus().setErrorStatus(
1142:                                            PDU.commitFailed);
1143:                                }
1144:                            }
1145:                        }
1146:                    } catch (NoSuchElementException nsex) {
1147:                        if (LOGGER.isDebugEnabled()) {
1148:                            nsex.printStackTrace();
1149:                        }
1150:                        LOGGER.error("Cannot find sub-request: ", nsex);
1151:                        request.setErrorStatus(PDU.genErr);
1152:                    }
1153:                }
1154:
1155:                public boolean isSupported(int pduType) {
1156:                    return (pduType == AgentXPDU.AGENTX_COMMITSET_PDU);
1157:                }
1158:
1159:            }
1160:
1161:            class CleanupSetHandler implements  RequestHandler {
1162:
1163:                public void processPdu(Request request, MOServer server) {
1164:                    try {
1165:                        SubRequestIterator it = (SubRequestIterator) request
1166:                                .iterator();
1167:                        while (it.hasNext()) {
1168:                            SubRequest sreq = it.nextSubRequest();
1169:                            if (sreq.isComplete()) {
1170:                                continue;
1171:                            }
1172:                            ManagedObject mo = sreq.getTargetMO();
1173:                            if (mo == null) {
1174:                                DefaultMOQuery query = new DefaultMOQuery(
1175:                                        (MOContextScope) sreq.getScope(), false);
1176:                                mo = server.lookup(query);
1177:                            }
1178:                            if (mo == null) {
1179:                                sreq.completed();
1180:                                continue;
1181:                            }
1182:                            server.unlock(sreq.getRequest(), mo);
1183:                            try {
1184:                                mo.cleanup(sreq);
1185:                                sreq.getStatus().setPhaseComplete(true);
1186:                            } catch (Exception moex) {
1187:                                if (LOGGER.isDebugEnabled()) {
1188:                                    moex.printStackTrace();
1189:                                }
1190:                                LOGGER.error(moex);
1191:                            }
1192:                        }
1193:                    } catch (NoSuchElementException nsex) {
1194:                        if (LOGGER.isDebugEnabled()) {
1195:                            nsex.printStackTrace();
1196:                        }
1197:                        LOGGER.warn("Cannot find sub-request: "
1198:                                + nsex.getMessage());
1199:                    }
1200:                }
1201:
1202:                public boolean isSupported(int pduType) {
1203:                    return (pduType == AgentXPDU.AGENTX_CLEANUPSET_PDU);
1204:                }
1205:
1206:            }
1207:
1208:            static class DefaultAgentXRequestFactory implements  RequestFactory {
1209:
1210:                public Request createRequest(EventObject initiatingEvent,
1211:                        CoexistenceInfo cinfo) {
1212:                    Request request = new AgentXRequest(
1213:                            (AgentXCommandEvent) initiatingEvent);
1214:                    if (LOGGER.isDebugEnabled()) {
1215:                        LOGGER.debug("Creating AgentX request " + request
1216:                                + " from " + initiatingEvent);
1217:                    }
1218:                    return request;
1219:                }
1220:
1221:            }
1222:
1223:            class Command implements  Runnable {
1224:
1225:                private AgentXCommandEvent request;
1226:
1227:                public Command(AgentXCommandEvent event) {
1228:                    this .request = event;
1229:                }
1230:
1231:                public void run() {
1232:                    dispatchCommand(request);
1233:                }
1234:
1235:            }
1236:
1237:            static class RequestID implements  Comparable {
1238:                private Address masterAddress;
1239:                private int sessionID;
1240:                private int transactionID;
1241:
1242:                public RequestID(Address masterAddress, int sessionID,
1243:                        int transactionID) {
1244:                    this .masterAddress = masterAddress;
1245:                    this .sessionID = sessionID;
1246:                    this .transactionID = transactionID;
1247:                }
1248:
1249:                public int compareTo(Object o) {
1250:                    RequestID other = (RequestID) o;
1251:                    int c = masterAddress.compareTo(other.masterAddress);
1252:                    if (c == 0) {
1253:                        c = sessionID - other.sessionID;
1254:                        if (c == 0) {
1255:                            c = transactionID - other.transactionID;
1256:                        }
1257:                    }
1258:                    return c;
1259:                }
1260:
1261:                public boolean equals(Object obj) {
1262:                    if (obj instanceof  RequestID) {
1263:                        return (compareTo(obj) == 0);
1264:                    }
1265:                    return false;
1266:                }
1267:
1268:                public int hashCode() {
1269:                    return transactionID;
1270:                }
1271:
1272:            }
1273:
1274:            class PingTask extends TimerTask {
1275:
1276:                public void run() {
1277:                    List l;
1278:                    synchronized (sessions) {
1279:                        l = new LinkedList(sessions.values());
1280:                    }
1281:                    for (Iterator it = l.iterator(); it.hasNext();) {
1282:                        AgentXSession session = (AgentXSession) it.next();
1283:                        for (Iterator cit = getContexts().iterator(); cit
1284:                                .hasNext();) {
1285:                            OctetString context = (OctetString) cit.next();
1286:                            AgentXPingPDU ping = new AgentXPingPDU(context);
1287:                            ping.setSessionAttributes(session);
1288:                            ping.setTransactionID(getNextTransactionID());
1289:                            PingEvent pingEvent;
1290:                            try {
1291:                                AgentXResponseEvent resp = agentX.send(ping,
1292:                                        session.createAgentXTarget(), session
1293:                                                .getPeer().getTransport());
1294:                                pingEvent = new PingEvent(this , session, resp
1295:                                        .getResponse());
1296:                            } catch (IOException ex) {
1297:                                pingEvent = new PingEvent(this , session, ex);
1298:                            }
1299:                            firePinged(pingEvent);
1300:                            if (LOGGER.isDebugEnabled()) {
1301:                                LOGGER.debug("Fired ping event " + pingEvent);
1302:                            }
1303:                            if (pingEvent.isCloseSession()
1304:                                    || pingEvent.isResetSession()) {
1305:                                try {
1306:                                    closeSession(session.getSessionID(),
1307:                                            AgentXProtocol.REASON_TIMEOUTS);
1308:                                    if (pingEvent.isResetSession()) {
1309:                                        reopenSession(session);
1310:                                    }
1311:                                } catch (IOException ex1) {
1312:                                }
1313:                            }
1314:                        }
1315:                    }
1316:                }
1317:
1318:                /**
1319:                 * Reopens a closed session.
1320:                 *
1321:                 * @param session
1322:                 *    a closed AgentXSession instance.
1323:                 * @return
1324:                 *    {@link AgentXProtocol#AGENTX_SUCCESS} if the session could be opened
1325:                 *    sucessfully. Otherwise the AgentX error status is returned.
1326:                 * @throws IOException
1327:                 *    if the session cannot be reopened due to an IO exception.
1328:                 */
1329:                public int reopenSession(AgentXSession session)
1330:                        throws IOException {
1331:                    return openSession(session.getPeer().getTransport(),
1332:                            session.getPeer().getAddress(), session);
1333:                }
1334:
1335:            }
1336:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.