Source Code Cross Referenced for GroupConnection.java in  » EJB-Server-JBoss-4.2.1 » jms » org » jboss » jms » serverless » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » EJB Server JBoss 4.2.1 » jms » org.jboss.jms.serverless 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * JBoss, Home of Professional Open Source.
003:         * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004:         * as indicated by the @author tags. See the copyright.txt file in the
005:         * distribution for a full listing of individual contributors.
006:         *
007:         * This is free software; you can redistribute it and/or modify it
008:         * under the terms of the GNU Lesser General Public License as
009:         * published by the Free Software Foundation; either version 2.1 of
010:         * the License, or (at your option) any later version.
011:         *
012:         * This software is distributed in the hope that it will be useful,
013:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
014:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015:         * Lesser General Public License for more details.
016:         *
017:         * You should have received a copy of the GNU Lesser General Public
018:         * License along with this software; if not, write to the Free
019:         * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020:         * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021:         */
022:        package org.jboss.jms.serverless;
023:
024:        import org.jboss.logging.Logger;
025:        import javax.jms.Connection;
026:        import javax.jms.JMSException;
027:        import javax.jms.ConnectionMetaData;
028:        import javax.jms.ExceptionListener;
029:        import javax.jms.ConnectionConsumer;
030:        import javax.jms.ServerSessionPool;
031:        import javax.jms.Destination;
032:        import javax.jms.Topic;
033:        import javax.jms.Session;
034:        import org.jgroups.JChannel;
035:        import org.jgroups.ChannelListener;
036:        import org.jgroups.Channel;
037:        import org.jgroups.Address;
038:        import org.jgroups.ChannelException;
039:        import java.io.Serializable;
040:        import java.net.URL;
041:        import javax.jms.Queue;
042:        import org.jgroups.SetStateEvent;
043:        import org.jgroups.util.Util;
044:        import org.jgroups.GetStateEvent;
045:        import org.jgroups.View;
046:        import org.jgroups.SuspectEvent;
047:        import org.jgroups.ChannelClosedException;
048:        import org.jgroups.ChannelNotConnectedException;
049:
050:        /**
051:         * The main piece of the JMS client runtime. Sits in top of a JChannel and mainains the "server
052:         * group" state. Delegates the session management to the SessionManager instance. Deals with 
053:         * message delivery to and from sessions. Implements the Connection interface.
054:         * 
055:         * @author Ovidiu Feodorov <ovidiu@jboss.org>
056:         * @version $Revision: 57195 $ $Date: 2006-09-26 08:08:17 -0400 (Tue, 26 Sep 2006) $
057:         *
058:         **/
059:        class GroupConnection implements  Connection, Runnable {
060:
061:            private static final Logger log = Logger
062:                    .getLogger(GroupConnection.class);
063:
064:            private static final String DEFAULT_SERVER_GROUP_NAME = "serverGroup";
065:
066:            private URL serverChannelConfigURL;
067:
068:            private SessionManager sessionManager;
069:            private org.jgroups.util.Queue deliveryQueue;
070:            private ConnectionState connState;
071:
072:            //     private ChannelState channelState;
073:            private GroupState groupState;
074:            private Thread connManagementThread;
075:            private JChannel serverChannel;
076:
077:            /**
078:             * The constructor leaves the Connection in a DISCONNECTED state.
079:             *
080:             * @param serverChannelConfigURL the URL of the XML file containing channel configuration.
081:             **/
082:            GroupConnection(URL serverChannelConfigURL) {
083:
084:                this .serverChannelConfigURL = serverChannelConfigURL;
085:
086:                deliveryQueue = new org.jgroups.util.Queue();
087:                sessionManager = new SessionManager(this , deliveryQueue);
088:                groupState = new GroupState();
089:                connManagementThread = new Thread(this ,
090:                        "Connection Management Thread");
091:                connState = new ConnectionState();
092:
093:            }
094:
095:            /**
096:             * Initalizes the connection, by connecting the channel to the server group. Should be called
097:             * only once, when the Connection instance is created.
098:             **/
099:            void connect() throws JMSException {
100:
101:                // TO_DO: if is already connected (stopped), just return
102:
103:                try {
104:
105:                    serverChannel = new JChannel(serverChannelConfigURL);
106:                    serverChannel
107:                            .setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
108:                    serverChannel.setChannelListener(new ChannelListener() {
109:
110:                        public void channelClosed(Channel channel) {
111:                            log.debug("channelClosed(" + channel + ")");
112:                        }
113:
114:                        public void channelConnected(Channel channel) {
115:                            log.debug("channelConnected() to group ["
116:                                    + channel.getChannelName() + "]");
117:                        }
118:
119:                        public void channelDisconnected(Channel channel) {
120:                            log.debug("channelDisconnected(" + channel + ")");
121:                        }
122:
123:                        public void channelReconnected(Address addr) {
124:                            log.debug("channelReconnected(" + addr + ")");
125:                        }
126:
127:                        public void channelShunned() {
128:                            log.debug("channelShunned()");
129:                        }
130:                    });
131:
132:                    log.debug("channel created");
133:                    serverChannel.connect(DEFAULT_SERVER_GROUP_NAME);
134:                    log.debug("channel connected");
135:                    connState.setStopped();
136:                    connManagementThread.start();
137:                    log.debug("Connection Management Thread started");
138:                    boolean getStateOK = serverChannel.getState(null, 0);
139:                    log.debug("getState(): " + getStateOK);
140:                } catch (ChannelException e) {
141:                    String msg = "Failed to create an active connection";
142:                    log.error(msg, e);
143:                    JMSException jmse = new JMSException(msg);
144:                    jmse.setLinkedException(e);
145:                    throw jmse;
146:                }
147:            }
148:
149:            // TO_DO: deal with situation when this method is accessed concurrently from different threads
150:            void send(javax.jms.Message m) throws JMSException {
151:
152:                try {
153:                    // the Destination is already set for the message
154:                    if (m.getJMSDestination() instanceof  Topic) {
155:                        // for topics, multicast
156:                        serverChannel.send(null, null, (Serializable) m);
157:                    } else {
158:                        // for queues, unicast to the coordinator
159:
160:                        // TO_DO: optimization, if I am the only on in group, don't send the messages
161:                        // down the stack anymore
162:                        org.jgroups.Message jgmsg = new org.jgroups.Message(
163:                                (Address) serverChannel.getView().getMembers()
164:                                        .get(0), null, new QueueCarrier(m));
165:                        serverChannel.send(jgmsg);
166:                    }
167:                } catch (Exception e) {
168:                    String msg = "Failed to send message";
169:                    log.error(msg, e);
170:                    JMSException jmse = new JMSException(msg);
171:                    jmse.setLinkedException(e);
172:                    throw jmse;
173:                }
174:
175:            }
176:
177:            //
178:            // Runnable INTERFACE IMPLEMENTATION
179:            //
180:
181:            /**
182:             * Code executed on the Connection Management Thread thread. It synchronously pulls JG
183:             * message and events from the channel. 
184:             **/
185:            public void run() {
186:
187:                Object incoming = null;
188:
189:                while (true) {
190:
191:                    try {
192:                        incoming = serverChannel.receive(0);
193:                    } catch (ChannelClosedException e) {
194:                        log.debug("Channel closed, exiting");
195:                        break;
196:                    } catch (ChannelNotConnectedException e) {
197:                        log
198:                                .warn("TO_DO: Channel not connected, I should block the thread ...");
199:                        continue;
200:                    } catch (Exception e) {
201:                        // TO_DO: use a JMS ExceptionListener and do some other things as well ....
202:                        log
203:                                .error(
204:                                        "Failed to synchronously read from the channel",
205:                                        e);
206:                    }
207:
208:                    try {
209:                        dispatch(incoming);
210:                    } catch (Exception e) {
211:                        // TO_DO: I don't want that poorly written client code (dispatch() ends running
212:                        // MessageListener code) to throw RuntimeException and terminate this thread
213:                        // use the ExceptionListener and do some other things as well ....
214:                        log.error("Dispatching failed", e);
215:                    }
216:                }
217:            }
218:
219:            //
220:            //
221:            //
222:
223:            private void dispatch(Object o) throws Exception {
224:
225:                log.debug("dispatching " + o);
226:
227:                if (o instanceof  SetStateEvent) {
228:                    byte[] buffer = ((SetStateEvent) o).getArg();
229:                    if (buffer == null) {
230:                        // that's ok if I am the coordinator, just ignore it
231:                        log.debug("null group state, ignoring ...");
232:                    } else {
233:                        // update my group state
234:                        groupState.fromByteBuffer(buffer);
235:                    }
236:                    return;
237:                } else if (o instanceof  GetStateEvent) {
238:                    // somebody is requesting the group state
239:                    serverChannel.returnState(groupState.toByteBuffer());
240:                    return;
241:                } else if (o instanceof  View) {
242:                    // no use for it for the time being
243:                    return;
244:                } else if (o instanceof  SuspectEvent) {
245:                    // no use for it for the time being
246:                    return;
247:                } else if (!(o instanceof  org.jgroups.Message)) {
248:                    // ignore it for the time being
249:                    log.warn("Ignoring " + o);
250:                    return;
251:                }
252:
253:                org.jgroups.Message jgmsg = (org.jgroups.Message) o;
254:                Object payload = jgmsg.getObject();
255:                if (payload instanceof  ServerAdminCommand) {
256:                    // ADD_QUEUE_RECEIVER, aso
257:                    handleServerAdminCommand(jgmsg.getSrc(),
258:                            (ServerAdminCommand) payload);
259:                } else if (payload instanceof  QueueCarrier) {
260:                    QueueCarrier qc = (QueueCarrier) payload;
261:                    String sessionID = qc.getSessionID();
262:                    // this is either an initial queue carrier that forwards the message from its
263:                    // source to the coordinator, or a final queue carrier that forwards the message
264:                    // from the coordinator to its final destination.
265:                    if (sessionID == null) {
266:                        queueForward(qc);
267:                    } else {
268:                        deliveryQueue.add(qc);
269:                    }
270:                } else if (payload instanceof  javax.jms.Message) {
271:                    // deliver only if the connection is started, discard otherwise
272:                    if (connState.isStarted()) {
273:                        deliveryQueue.add((javax.jms.Message) payload);
274:                    }
275:                } else {
276:                    log
277:                            .warn("JG Message with a payload something else than a JMS Message: "
278:                                    + (payload == null ? "null" : payload
279:                                            .getClass().getName()));
280:                }
281:            }
282:
283:            private void handleServerAdminCommand(Address src,
284:                    ServerAdminCommand c) {
285:                //log.debug("Handling "+c.getCommand());
286:                String comm = c.getCommand();
287:                if (ServerAdminCommand.ADD_QUEUE_RECEIVER.equals(comm)) {
288:                    String queueName = (String) c.get(0);
289:                    String sessionID = (String) c.get(1);
290:                    String queueReceiverID = (String) c.get(2);
291:                    groupState.addQueueReceiver(queueName, src, sessionID,
292:                            queueReceiverID);
293:                } else if (ServerAdminCommand.REMOVE_QUEUE_RECEIVER
294:                        .equals(comm)) {
295:                    String queueName = (String) c.get(0);
296:                    String sessionID = (String) c.get(1);
297:                    String queueReceiverID = (String) c.get(2);
298:                    groupState.removeQueueReceiver(queueName, src, sessionID,
299:                            queueReceiverID);
300:                } else {
301:                    log.error("Unknown server administration command: " + comm);
302:                }
303:            }
304:
305:            void advertiseQueueReceiver(String queueName, String sessionID,
306:                    String queueReceiverID, boolean isOn)
307:                    throws ProviderException {
308:
309:                try {
310:                    // multicast the change, this will update my own state as well
311:                    String cs = isOn ? ServerAdminCommand.ADD_QUEUE_RECEIVER
312:                            : ServerAdminCommand.REMOVE_QUEUE_RECEIVER;
313:                    ServerAdminCommand comm = new ServerAdminCommand(cs,
314:                            queueName, sessionID, queueReceiverID);
315:                    serverChannel.send(null, null, comm);
316:                } catch (ChannelException e) {
317:                    throw new ProviderException(
318:                            "Failed to advertise the queue receiver", e);
319:                }
320:            }
321:
322:            private void queueForward(QueueCarrier qc) throws Exception {
323:
324:                Queue destQueue = (Queue) qc.getJMSMessage()
325:                        .getJMSDestination();
326:                QueueReceiverAddress ra = groupState.selectReceiver(destQueue
327:                        .getQueueName());
328:                if (ra == null) {
329:                    // TO_DO: no receivers for this queue, discard it for the time being
330:                    log.warn("Discarding message for queue "
331:                            + destQueue.getQueueName() + "!");
332:                    return;
333:                }
334:                Address destAddress = ra.getAddress();
335:                qc.setSessionID(ra.getSessionID());
336:                qc.setReceiverID(ra.getReceiverID());
337:
338:                // forward it to the final destination
339:                serverChannel.send(destAddress, null, qc);
340:
341:            }
342:
343:            //
344:            // Connection INTERFACE IMPLEMENTATION
345:            //
346:
347:            public void start() throws JMSException {
348:
349:                // makes sense to call it only a connection that is stopped. If called on a started 
350:                // connection, the call is ignored. If called on a closed connection: TO_DO
351:                // TO_DO: throw apropriate exceptions for illegal transitions
352:                if (connState.isStarted()) {
353:                    return;
354:                }
355:                synchronized (connState) {
356:                    connState.setStarted();
357:                    connState.notify();
358:                }
359:
360:            }
361:
362:            public void stop() throws JMSException {
363:
364:                // TO_DO: throw apropriate exceptions for illegal transitions
365:                connState.setStopped();
366:            }
367:
368:            public void close() throws JMSException {
369:
370:                // TO_DO: throw apropriate exceptions for illegal transitions
371:                // TO_DO: read the rest of specs and make sure I comply; tests
372:                if (connState.isClosed()) {
373:                    return;
374:                }
375:                connState.setClosed();
376:                serverChannel.close();
377:
378:            }
379:
380:            public Session createSession(boolean transacted, int acknowledgeMode)
381:                    throws JMSException {
382:
383:                return sessionManager
384:                        .createSession(transacted, acknowledgeMode);
385:
386:            }
387:
388:            public String getClientID() throws JMSException {
389:                throw new NotImplementedException();
390:            }
391:
392:            public void setClientID(String clientID) throws JMSException {
393:
394:                // Once the connection has been initialized, the runtime provides a ClientID, that cannot
395:                // be changed by the user; according to JMS1.1 specs, the method should throw 
396:                // IllegalStateException
397:                String msg = "ClientID (" + "" + ") cannot be modified";
398:                throw new IllegalStateException(msg);
399:            }
400:
401:            public ConnectionMetaData getMetaData() throws JMSException {
402:                throw new NotImplementedException();
403:            }
404:
405:            public ExceptionListener getExceptionListener() throws JMSException {
406:                throw new NotImplementedException();
407:            }
408:
409:            public void setExceptionListener(ExceptionListener listener)
410:                    throws JMSException {
411:                throw new NotImplementedException();
412:            }
413:
414:            public ConnectionConsumer createConnectionConsumer(
415:                    Destination destination, String messageSelector,
416:                    ServerSessionPool sessionPool, int maxMessages)
417:                    throws JMSException {
418:                throw new NotImplementedException();
419:            }
420:
421:            public ConnectionConsumer createDurableConnectionConsumer(
422:                    Topic topic, String subscriptionName,
423:                    String messageSelector, ServerSessionPool sessionPool,
424:                    int maxMessages) throws JMSException {
425:                throw new NotImplementedException();
426:            }
427:
428:            //
429:            // END OF Connection INTERFACE IMPLEMENTATION
430:            //
431:
432:            /**
433:             * Debugging only
434:             **/
435:            public static void main(String[] args) throws Exception {
436:
437:                GroupConnection c = new GroupConnection(new URL(args[0]));
438:                c.connect();
439:            }
440:
441:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.