001: /*_############################################################################
002: _##
003: _## SNMP4J - MultiThreadedMessageDispatcher.java
004: _##
005: _## Copyright (C) 2003-2008 Frank Fock and Jochen Katz (SNMP4J.org)
006: _##
007: _## Licensed under the Apache License, Version 2.0 (the "License");
008: _## you may not use this file except in compliance with the License.
009: _## You may obtain a copy of the License at
010: _##
011: _## http://www.apache.org/licenses/LICENSE-2.0
012: _##
013: _## Unless required by applicable law or agreed to in writing, software
014: _## distributed under the License is distributed on an "AS IS" BASIS,
015: _## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016: _## See the License for the specific language governing permissions and
017: _## limitations under the License.
018: _##
019: _##########################################################################*/
020:
021: package org.snmp4j.util;
022:
023: import org.snmp4j.MessageDispatcher;
024: import org.snmp4j.CommandResponder;
025: import org.snmp4j.mp.MessageProcessingModel;
026: import org.snmp4j.TransportMapping;
027: import java.util.Collection;
028: import org.snmp4j.smi.Address;
029: import org.snmp4j.asn1.BERInputStream;
030: import org.snmp4j.mp.PduHandle;
031: import org.snmp4j.PDU;
032: import org.snmp4j.mp.StateReference;
033: import org.snmp4j.mp.StatusInformation;
034: import org.snmp4j.MessageException;
035: import java.nio.ByteBuffer;
036: import org.snmp4j.mp.PduHandleCallback;
037:
038: /**
039: * The <code>MultiThreadedMessageDispatcher</code> class is a decorator
040: * for any <code>MessageDispatcher</code> instances that processes incoming
041: * message with a supplied <code>ThreadPool</code>. The processing is thus
042: * parallelized on up to the size of the supplied thread pool threads.
043: *
044: * @author Frank Fock
045: * @version 1.8
046: * @since 1.0.2
047: */
048: public class MultiThreadedMessageDispatcher implements
049: MessageDispatcher {
050:
051: private MessageDispatcher dispatcher;
052: private ThreadPool threadPool;
053:
054: /**
055: * Creates a multi thread message dispatcher using the provided
056: * <code>ThreadPool</code> to concurrently process incoming messages
057: * that are forwarded to the supplied decorated
058: * <code>MessageDispatcher</code>.
059: *
060: * @param threadPool
061: * a <code>ThreadPool</code> instance (that can be shared). <em>The thread
062: * pool has to be stopped externally.</em>
063: * @param decoratedDispatcher
064: * the decorated <code>MessageDispatcher</code> that must be
065: * multi-threading safe.
066: */
067: public MultiThreadedMessageDispatcher(ThreadPool threadPool,
068: MessageDispatcher decoratedDispatcher) {
069: this .threadPool = threadPool;
070: this .dispatcher = decoratedDispatcher;
071: }
072:
073: public int getNextRequestID() {
074: return dispatcher.getNextRequestID();
075: }
076:
077: public void addMessageProcessingModel(MessageProcessingModel model) {
078: dispatcher.addMessageProcessingModel(model);
079: }
080:
081: public void removeMessageProcessingModel(
082: MessageProcessingModel model) {
083: dispatcher.removeMessageProcessingModel(model);
084: }
085:
086: public MessageProcessingModel getMessageProcessingModel(
087: int messageProcessingModel) {
088: return dispatcher
089: .getMessageProcessingModel(messageProcessingModel);
090: }
091:
092: public void addTransportMapping(TransportMapping transport) {
093: dispatcher.addTransportMapping(transport);
094: }
095:
096: public TransportMapping removeTransportMapping(
097: TransportMapping transport) {
098: return dispatcher.removeTransportMapping(transport);
099: }
100:
101: public Collection getTransportMappings() {
102: return dispatcher.getTransportMappings();
103: }
104:
105: public void addCommandResponder(CommandResponder listener) {
106: dispatcher.addCommandResponder(listener);
107: }
108:
109: public void removeCommandResponder(CommandResponder listener) {
110: dispatcher.removeCommandResponder(listener);
111: }
112:
113: public PduHandle sendPdu(Address transportAddress,
114: int messageProcessingModel, int securityModel,
115: byte[] securityName, int securityLevel, PDU pdu,
116: boolean expectResponse) throws MessageException {
117: return dispatcher.sendPdu(transportAddress,
118: messageProcessingModel, securityModel, securityName,
119: securityLevel, pdu, expectResponse);
120: }
121:
122: public PduHandle sendPdu(TransportMapping transportMapping,
123: Address transportAddress, int messageProcessingModel,
124: int securityModel, byte[] securityName, int securityLevel,
125: PDU pdu, boolean expectResponse) throws MessageException {
126: return dispatcher.sendPdu(transportMapping, transportAddress,
127: messageProcessingModel, securityModel, securityName,
128: securityLevel, pdu, expectResponse);
129: }
130:
131: public PduHandle sendPdu(TransportMapping transportMapping,
132: Address transportAddress, int messageProcessingModel,
133: int securityModel, byte[] securityName, int securityLevel,
134: PDU pdu, boolean expectResponse, PduHandleCallback callback)
135: throws MessageException {
136: return dispatcher.sendPdu(transportMapping, transportAddress,
137: messageProcessingModel, securityModel, securityName,
138: securityLevel, pdu, expectResponse, callback);
139: }
140:
141: public int returnResponsePdu(int messageProcessingModel,
142: int securityModel, byte[] securityName, int securityLevel,
143: PDU pdu, int maxSizeResponseScopedPDU,
144: StateReference stateReference,
145: StatusInformation statusInformation)
146: throws MessageException {
147: return dispatcher.returnResponsePdu(messageProcessingModel,
148: securityModel, securityName, securityLevel, pdu,
149: maxSizeResponseScopedPDU, stateReference,
150: statusInformation);
151: }
152:
153: public void processMessage(TransportMapping sourceTransport,
154: Address incomingAddress, BERInputStream wholeMessage) {
155: // OK, here wo do all that what this class is all about!
156: MessageTask task = new MessageTask(sourceTransport,
157: incomingAddress, wholeMessage);
158: threadPool.execute(task);
159: }
160:
161: public void processMessage(TransportMapping sourceTransport,
162: Address incomingAddress, ByteBuffer wholeMessage) {
163: processMessage(sourceTransport, incomingAddress,
164: new BERInputStream(wholeMessage));
165: }
166:
167: public void releaseStateReference(int messageProcessingModel,
168: PduHandle pduHandle) {
169: dispatcher.releaseStateReference(messageProcessingModel,
170: pduHandle);
171: }
172:
173: public TransportMapping getTransport(Address destAddress) {
174: return dispatcher.getTransport(destAddress);
175: }
176:
177: class MessageTask implements WorkerTask {
178: private TransportMapping sourceTransport;
179: private Address incomingAddress;
180: private BERInputStream wholeMessage;
181:
182: public MessageTask(TransportMapping sourceTransport,
183: Address incomingAddress, BERInputStream wholeMessage) {
184: this .sourceTransport = sourceTransport;
185: this .incomingAddress = incomingAddress;
186: this .wholeMessage = wholeMessage;
187: }
188:
189: public void run() {
190: dispatcher.processMessage(sourceTransport, incomingAddress,
191: wholeMessage);
192: }
193:
194: public void terminate() {
195: }
196:
197: public void join() throws InterruptedException {
198: }
199:
200: public void interrupt() {
201: }
202:
203: }
204: }
|