Source Code Cross Referenced for DataReceiverAdapter.java in  » Groupware » Data-share » org » datashare » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Groupware » Data share » org.datashare 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /* ----- BEGIN LICENSE BLOCK -----
002:         * Version: MPL 1.1
003:         *
004:         * The contents of this file are subject to the Mozilla Public License Version
005:         * 1.1 (the "License"); you may not use this file except in compliance with
006:         * the License. You may obtain a copy of the License at
007:         * http://www.mozilla.org/MPL/
008:         *
009:         * Software distributed under the License is distributed on an "AS IS" basis,
010:         * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
011:         * for the specific language governing rights and limitations under the
012:         * License.
013:         *
014:         * The Original Code is the DataShare server.
015:         *
016:         * The Initial Developer of the Original Code is
017:         * Ball Aerospace & Technologies Corp, Fairborn, Ohio
018:         * Portions created by the Initial Developer are Copyright (C) 2001
019:         * the Initial Developer. All Rights Reserved.
020:         *
021:         * Contributor(s): Charles Wood <cwood@ball.com>
022:         *
023:         * ----- END LICENSE BLOCK ----- */
024:        /* RCS $Id: DataReceiverAdapter.java,v 1.4 2002/02/20 14:12:57 lizellaman Exp $
025:         * $Log: DataReceiverAdapter.java,v $
026:         * Revision 1.4  2002/02/20 14:12:57  lizellaman
027:         * changes to improve history retrieval
028:         *
029:         * Revision 1.3  2002/02/04 13:51:39  lizellaman
030:         * Remove all references to past product names (or)
031:         * Add PublicAPI for creating Rendezvous Sessions
032:         *
033:         * Revision 1.2  2002/01/29 20:50:17  lizellaman
034:         * Added LoggingInterface, modified the PropertiesInterface implementation
035:         *
036:         * Revision 1.1.1.1  2001/10/23 13:37:15  lizellaman
037:         * initial sourceforge release
038:         *
039:         */
040:
041:        package org.datashare;
042:
043:        import java.util.Hashtable;
044:        import java.util.Enumeration;
045:        import java.util.Vector;
046:
047:        import org.datashare.FifoQueue;
048:        import org.datashare.FifoConsumer;
049:        import org.datashare.objects.DataShareObject;
050:        import org.datashare.objects.ActivateConnectionObject;
051:        import org.datashare.objects.ChannelDescription;
052:        import org.datashare.objects.RequestHistory;
053:        import org.datashare.objects.HistoryFinishedObject;
054:
055:        /**
056:         * Implements the standard behavior we expect for DataShare connections on the
057:         * server side, including taking care of activating the connection (initiated by the client
058:         * sending the ActivateConnectionObject) and automatically sending any received data to
059:         * the specified recipients.  This Adapter is only suitable for the DataShare function
060:         * Channels, not the commandStatus connection.  There is one instance of this class
061:         * for every function/channel, not one for every consumer.
062:         *
063:         * @date March 01, 2001
064:         * @author Charles Wood
065:         * @version 1.0
066:         */
067:        public class DataReceiverAdapter implements  DataReceiverInterface {
068:            /**
069:             * table of SocketServers, indexed by clientKey (cannot have two connections from same client on
070:             * same connection).  This is populated by displaying the consumer.
071:             */
072:            public Hashtable allMySocketHandlers = new Hashtable();
073:
074:            /**
075:             * the databaseKey for this instance's Channel, used only if data is persisted by the Server
076:             */
077:            private String channelDatabaseKey = null;
078:
079:            /**
080:             * provides the sequence for the Data objects, incremented as they are created, first data
081:             * object has sequence number of zero
082:             */
083:            protected int sequence = 0;
084:
085:            private SocketAdapter sockets[] = new SocketAdapter[0]; // lots of remote clients, comes from allMySocketHandlers
086:            private Vector allSockets = new Vector(); // populated by call to .newConnection()
087:
088:            private SocketServerInterface threadedServer = null; // our socket server, set externally
089:            private TreeViewServerInterface server;
090:            private SessionInfo sessionInfo;
091:            private ChannelInfo channelInfo;
092:            private boolean neverCreateBean;
093:            private Hashtable historyThreads = new Hashtable();
094:
095:            public DataReceiverAdapter(TreeViewServerInterface server) {
096:                this .server = server;
097:            }
098:
099:            /**
100:             * must be set after constructor called
101:             */
102:            public void setSessionInfo(SessionInfo sessionInfo) {
103:                this .sessionInfo = sessionInfo;
104:            }
105:
106:            /**
107:             * must be set after constructor called
108:             */
109:            public void setChannelInfo(ChannelInfo channelInfo) {
110:                this .channelInfo = channelInfo;
111:                neverCreateBean = !server.getPersistData()
112:                        || !channelInfo.getSaveDataForThisChannel();
113:            }
114:
115:            /**
116:             * must be set after constructor called
117:             */
118:            public void setSocketServer(SocketServerInterface threadedServer) {
119:                this .threadedServer = threadedServer;
120:            }
121:
122:            public SocketServerInterface getSocketServer() {
123:                return threadedServer;
124:            }
125:
126:            // if any data is received, check to see if it is an activation object or a RequestHistory object,
127:            // else send it to specified others if the channel is active
128:            public synchronized void clientDataReceived(DataShareObject dso,
129:                    SocketAdapter ts) {
130:                boolean forwardableData = false;
131:                boolean createBean = false; // used when decided by individual circumstance
132:
133:                SessionUtilities.getLoggingInterface().debugMsg(
134:                        SessionUtilities.getLoggingInterface().DEBUG,
135:                        SessionUtilities.getLoggingInterface().NETWORK,
136:                        "1-Rcvd " + (dso.isControlObject ? "control" : "data")
137:                                + " object from " + ts.getClientKey() + " on "
138:                                + ts.getKeyValue());
139:
140:                if (dso.isControlObject) // server only needs to look inside control objects
141:                {
142:                    try {
143:                        Object object = SessionUtilities
144:                                .retrieveObject(dso.objectBytes);
145:                        if (object instanceof  ActivateConnectionObject) {
146:                            ActivateConnectionObject aco = (ActivateConnectionObject) object;
147:                            if (ts.getActive()) {
148:                                SessionUtilities
149:                                        .getLoggingInterface()
150:                                        .debugMsg(
151:                                                SessionUtilities
152:                                                        .getLoggingInterface().DEBUG,
153:                                                SessionUtilities
154:                                                        .getLoggingInterface().NETWORK,
155:                                                "Consumer requested activation on channel that is already active");
156:                            }
157:                            connectionActivated(ts, aco);
158:                        } else if (object instanceof  RequestHistory) {
159:                            RequestHistory rh = (RequestHistory) object;
160:                            handleHistory(rh, ts); // asking for history or cancelling history
161:                        }
162:                    } catch (Exception e) {
163:                        e.printStackTrace();
164:                    }
165:                } else // not control object
166:                {
167:                    try {
168:                        if (ts.getActive()) {
169:                            if (!neverCreateBean)
170:                                createBean = true;
171:                            if (ts.getType() != ChannelDescription.MULTICAST)
172:                                forwardableData = true;
173:                        } else {
174:                            SessionUtilities
175:                                    .getLoggingInterface()
176:                                    .debugMsg(
177:                                            SessionUtilities
178:                                                    .getLoggingInterface().WARNING,
179:                                            SessionUtilities
180:                                                    .getLoggingInterface().NETWORK,
181:                                            "DataReceiverAdapter received non-control object over not-activated Channel");
182:                        }
183:                    } catch (Exception e) {
184:                        e.printStackTrace();
185:                    }
186:                }
187:
188:                if (createBean) {
189:                    try {
190:                        Hashtable props = new Hashtable();
191:                        // firsttime we need to get the channel key
192:                        if (channelDatabaseKey == null)
193:                            setChannelDatabaseKey(channelInfo.getDatabaseID());
194:
195:                        props.put("channelKey", channelDatabaseKey); // need to make ADSKey for ADS
196:                        props.put("userObject", SessionUtilities
197:                                .convertObjectToByteArray(dso));
198:                        props.put("sequence", new Integer(sequence));
199:                        // no callback for Data (use null)
200:                        server.saveDataToDatabase("DSData3Home", props,
201:                                (PersistDataCallbackInterface) null);
202:                        sequence++;
203:                        // for Data objects, we don't try to get a reference to them now
204:                    } catch (Exception e) {
205:                        e.printStackTrace();
206:                    }
207:                }
208:
209:                if (forwardableData) {
210:                    try {
211:                        switch (dso.type) {
212:                        case DataShareObject.SENDTOALL:
213:                            // send data to all sockets...
214:                            for (int x = 0; x < sockets.length; x++) {
215:                                if (sockets[x].getActive()) {
216:                                    SessionUtilities
217:                                            .getLoggingInterface()
218:                                            .debugMsg(
219:                                                    SessionUtilities
220:                                                            .getLoggingInterface().DEBUG,
221:                                                    SessionUtilities
222:                                                            .getLoggingInterface().NETWORK,
223:                                                    "2-Sending object to "
224:                                                            + sockets[x]
225:                                                                    .getClientKey());
226:                                    sockets[x].sendData(dso);
227:                                }
228:                            }
229:                            break;
230:                        case DataShareObject.SENDTOCLIENT:
231:                            // send data to one socket...
232:                            for (int x = 0; x < sockets.length; x++) {
233:                                if (sockets[x].getActive()
234:                                        && sockets[x].getClientKey().equals(
235:                                                dso.destinationClientKey)) {
236:                                    SessionUtilities
237:                                            .getLoggingInterface()
238:                                            .debugMsg(
239:                                                    SessionUtilities
240:                                                            .getLoggingInterface().DEBUG,
241:                                                    SessionUtilities
242:                                                            .getLoggingInterface().NETWORK,
243:                                                    "2-Sending object to "
244:                                                            + sockets[x]
245:                                                                    .getClientKey());
246:                                    sockets[x].sendData(dso);
247:                                    break;
248:                                }
249:                            }
250:                            break;
251:                        case DataShareObject.SENDTOOTHERS:
252:                            // send data to all but one socket...
253:                            for (int x = 0; x < sockets.length; x++) {
254:                                if (sockets[x].getActive()
255:                                        && !sockets[x].getClientKey().equals(
256:                                                dso.sendingClientKey)) {
257:                                    SessionUtilities
258:                                            .getLoggingInterface()
259:                                            .debugMsg(
260:                                                    SessionUtilities
261:                                                            .getLoggingInterface().DEBUG,
262:                                                    SessionUtilities
263:                                                            .getLoggingInterface().NETWORK,
264:                                                    "2-Sending object to "
265:                                                            + sockets[x]
266:                                                                    .getClientKey());
267:                                    sockets[x].sendData(dso);
268:                                }
269:                            }
270:                            break;
271:                        default:
272:                            SessionUtilities
273:                                    .getLoggingInterface()
274:                                    .debugMsg(
275:                                            SessionUtilities
276:                                                    .getLoggingInterface().WARNING,
277:                                            SessionUtilities
278:                                                    .getLoggingInterface().NETWORK,
279:                                            "DataReceiverAdapter received unknown type field value-> "
280:                                                    + dso.type);
281:                        }
282:                    } catch (Exception e) {
283:                        e.printStackTrace();
284:                    }
285:                }
286:            }
287:
288:            /**
289:             * tries to find the socket for a particular consumer, returns null if not successful
290:             */
291:            public SocketAdapter findConsumerSocket(ConsumerInfo ci) {
292:                SocketAdapter sa = null;
293:                for (int x = 0; x < sockets.length; x++) {
294:                    if (sockets[x].getClientKey().equals(ci.getKeyValue())) {
295:                        sa = sockets[x];
296:                        break;
297:                    }
298:                }
299:                return sa;
300:            }
301:
302:            /**
303:             * This is called through the DataReceiverInterface when the connection has been lost/closed
304:             * so that the connection may be removed from our tables and other clients notified
305:             */
306:            public void connectionLost(SocketAdapter ts) {
307:                SessionUtilities.getLoggingInterface().debugMsg(
308:                        SessionUtilities.getLoggingInterface().DEBUG,
309:                        SessionUtilities.getLoggingInterface().NETWORK,
310:                        "Lost the connection to " + ts.getKeyValue());
311:                // find this ts in our hashtable and remove it and tell others...
312:                synchronized (allMySocketHandlers) {
313:                    SessionUtilities.getLoggingInterface().debugMsg(
314:                            SessionUtilities.getLoggingInterface().DEBUG,
315:                            SessionUtilities.getLoggingInterface().NETWORK,
316:                            "connectionLost(), removing consumer "
317:                                    + ts.getClientKey()
318:                                    + " from DataReceiverAdapter "
319:                                    + this .threadedServer.getKeyValue());
320:
321:                    HistoryThread ht = (HistoryThread) historyThreads.get(ts
322:                            .getKeyValue());
323:                    if (ht != null)
324:                        ht.cancel();
325:
326:                    allMySocketHandlers.remove(ts.getClientKey());
327:                    allSockets.remove(ts);
328:                    convertAllMySocketHandlersToSockets();
329:                    server.removeConsumer(ts.getClientKey(), sessionInfo
330:                            .getKeyValue(), channelInfo.getKeyValue());
331:                }
332:            }
333:
334:            /**
335:             * This is called locally when the Multicast connection has been lost/closed
336:             * so that the connection may be removed from our tables and other clients notified.
337:             * This is different from other types of connections in that we have only one connection
338:             * that is shared among all Multicast clients.
339:             */
340:            private void connectionLost(String clientKey) {
341:                SessionUtilities.getLoggingInterface().debugMsg(
342:                        SessionUtilities.getLoggingInterface().DEBUG,
343:                        SessionUtilities.getLoggingInterface().NETWORK,
344:                        "Lost the Multicast connection to " + clientKey);
345:                // find this ts in our hashtable and remove it and tell others...
346:                synchronized (allMySocketHandlers) {
347:                    SessionUtilities.getLoggingInterface().debugMsg(
348:                            SessionUtilities.getLoggingInterface().DEBUG,
349:                            SessionUtilities.getLoggingInterface().NETWORK,
350:                            "connectionLost (multicast), removing consumer "
351:                                    + clientKey + " from DataReceiverAdapter "
352:                                    + this .threadedServer.getKeyValue());
353:                    SocketAdapter ts = (SocketAdapter) allMySocketHandlers
354:                            .remove(clientKey);
355:                    if (ts != null)
356:                        allSockets.remove(ts);
357:                    convertAllMySocketHandlersToSockets();
358:                    server.removeConsumer(clientKey, sessionInfo.getKeyValue(),
359:                            channelInfo.getKeyValue());
360:                    SessionUtilities
361:                            .getLoggingInterface()
362:                            .debugMsg(
363:                                    SessionUtilities.getLoggingInterface().DEBUG,
364:                                    SessionUtilities.getLoggingInterface().NETWORK,
365:                                    "***** client should have been told to shutdown the function for this socket!!");
366:                }
367:            }
368:
369:            public void newConnection(SocketAdapter ts) {
370:                SessionUtilities.getLoggingInterface().debugMsg(
371:                        SessionUtilities.getLoggingInterface().DEBUG,
372:                        SessionUtilities.getLoggingInterface().NETWORK,
373:                        "New connection - " + ts.getKeyValue());
374:                allSockets.add(ts);
375:            }
376:
377:            /**
378:             * attempts to close all connections and stop all Threads associated with our Channel.
379:             * This should only be called prior to exiting as it is not guaranteed to be the most
380:             * graceful way to shutdown.
381:             */
382:            public void
383:   stopAllConnectionsAndThreads()
384:      {
385:      getSocketServer().close();  // close our socket server so no more connections are made
386:      // shutdown all clients getting History
387:      for( Enumeration enum = historyThreads.elements(); enum.hasMoreElements(); )
388:         {
389:         HistoryThread historyThread = (HistoryThread)enum.nextElement();
390:         historyThread.cancel();
391:         historyThread.queue.reset();
392:         }
393:      this .removeAllConsumerConnections(); // get rid of all activated consumers
394:      // now just in case we forgot any clients...
395:      for( Enumeration enum = allSockets.elements(); enum.hasMoreElements(); )
396:         {
397:         SocketAdapter sa = (SocketAdapter)enum.nextElement();
398:         sa.close();
399:         }
400:      }
401:
402:            /**
403:             * called when a client 'activates' this ThreadedSocket
404:             * for Multicast, we will have multiple copies of same socket
405:             */
406:            public synchronized void connectionActivated(SocketAdapter ts,
407:                    ActivateConnectionObject aco) {
408:                SessionUtilities.getLoggingInterface().debugMsg(
409:                        SessionUtilities.getLoggingInterface().DEBUG,
410:                        SessionUtilities.getLoggingInterface().CLIENT,
411:                        "Consumer " + aco.clientKeyValue
412:                                + " requested us to activate Connection "
413:                                + ts.getKeyValue());
414:
415:                // test to see if we know about this client...
416:                if (server.isClientRegistered(aco.clientKeyValue)) {
417:                    // yes, we know about this client
418:                    // need to set clientClass here so we know class of client for this data connection
419:                    String clientClass = server.getClientInfo(
420:                            aco.clientKeyValue).getClientClass();
421:                    ts.setClientClass(clientClass);
422:
423:                    // activate connection
424:                    ts.setActive(true);
425:
426:                    // display the consumer, if channel has history, they may already be displayed
427:                    displayConsumer(ts, aco.clientKeyValue);
428:                } else {
429:                    // no, we do not know about this client
430:                    SessionUtilities.getLoggingInterface().debugMsg(
431:                            SessionUtilities.getLoggingInterface().DEBUG,
432:                            SessionUtilities.getLoggingInterface().CLIENT,
433:                            "Ignoring ActivateCommand, non-current consumer-> "
434:                                    + aco.clientKeyValue);
435:                }
436:            }
437:
438:            /**
439:             * puts the user into the tree for this channel/function, and sets the SocketAdapter's owner's name
440:             */
441:            private void displayConsumer(SocketAdapter ts, String clientKeyValue) {
442:                // save the client key in the Session
443:                ts.setClientKey(clientKeyValue);
444:
445:                // let server send another update and make the clients overwrite this consumer entry (activate may change for consumer)
446:                if (allMySocketHandlers.containsKey(clientKeyValue)) {
447:                    SessionUtilities.getLoggingInterface().debugMsg(
448:                            SessionUtilities.getLoggingInterface().DEBUG,
449:                            SessionUtilities.getLoggingInterface().CLIENT,
450:                            "DRA.displayClient() already has client "
451:                                    + clientKeyValue + " in channel "
452:                                    + ts.getKeyValue());
453:                } else {
454:                    SessionUtilities.getLoggingInterface().debugMsg(
455:                            SessionUtilities.getLoggingInterface().DEBUG,
456:                            SessionUtilities.getLoggingInterface().CLIENT,
457:                            "DRA.displayClient() adding consumer "
458:                                    + clientKeyValue + " to channel "
459:                                    + ts.getKeyValue());
460:                    allMySocketHandlers.put(clientKeyValue, ts);
461:                    convertAllMySocketHandlersToSockets();
462:                }
463:                server.addConsumer(clientKeyValue, sessionInfo.getKeyValue(),
464:                        channelInfo.getKeyValue(), ts.getActive());
465:            }
466:
467:            /**
468:             * called when we loose our ServerSocket, need to figure out what to do about it...
469:             */
470:            public void lostServerSocket(String keyValue) {
471:                SessionUtilities.getLoggingInterface().debugMsg(
472:                        SessionUtilities.getLoggingInterface().DEBUG,
473:                        SessionUtilities.getLoggingInterface().NETWORK,
474:                        "DataRecieverAdapter:Lost our socketServer - "
475:                                + keyValue);
476:            }
477:
478:            /**
479:             * Take our hashtable of connections and convert it to an array
480:             */
481:            private void convertAllMySocketHandlersToSockets() {
482:                synchronized (sockets) {
483:                    Object[] objects = allMySocketHandlers.values().toArray();
484:                    sockets = new SocketAdapter[objects.length];
485:                    for (int x = 0; x < sockets.length; x++)
486:                        sockets[x] = (SocketAdapter) objects[x];
487:                }
488:            }
489:
490:            //   /**
491:            //    * returns the key for this instance's Session
492:            //    */
493:            //   public String
494:            //   getSessionKey()
495:            //      {
496:            //      return sessionInfo.getDatabaseID();
497:            //      }
498:
499:            /**
500:             * retrieves the channelDatabaseKey for this channel
501:             */
502:            public String getChannelDatabaseKey() {
503:                return this .channelDatabaseKey;
504:            }
505:
506:            /**
507:             * sets the channelDatabaseKey for this channel
508:             */
509:            public void setChannelDatabaseKey(String channelDatabaseKey) {
510:                if (this .channelDatabaseKey == null) // can only set it once
511:                {
512:                    this .channelDatabaseKey = channelDatabaseKey;
513:                }
514:            }
515:
516:            /**
517:             * returns the next port to use for a connection
518:             */
519:            public int getNextPort() {
520:                return server.getNextPort();
521:            }
522:
523:            /**
524:             * returns the key for this instance's Channel
525:             */
526:            public String getChannelKey() {
527:                return channelInfo.getDatabaseID();
528:            }
529:
530:            /**
531:             * call this when a consumer needs to be removed...takes care to close the connection
532:             */
533:            public void removeConsumerConnection(String clientKeyValue) {
534:                SocketAdapter this SA = null;
535:                if (allMySocketHandlers != null) {
536:                    try {
537:                        if (allMySocketHandlers.containsKey(clientKeyValue)) {
538:                            SessionUtilities
539:                                    .getLoggingInterface()
540:                                    .debugMsg(
541:                                            SessionUtilities
542:                                                    .getLoggingInterface().DEBUG,
543:                                            SessionUtilities
544:                                                    .getLoggingInterface().CLIENT,
545:                                            "Removing consumer "
546:                                                    + clientKeyValue
547:                                                    + " from DataReceiverAdapter "
548:                                                    + this .threadedServer
549:                                                            .getKeyValue());
550:                            this SA = (SocketAdapter) allMySocketHandlers
551:                                    .remove(clientKeyValue);
552:                            allSockets.remove(this SA);
553:                            if (this SA.getType() == ChannelDescription.MULTICAST) {
554:                                // for multicast, only one socket server and one socket, don't close it
555:                                connectionLost(clientKeyValue);
556:                            } else {
557:                                // close connection and it will notify us
558:                                this SA.close();
559:                            }
560:                        }
561:                    } catch (Exception e) {
562:                        SessionUtilities
563:                                .getLoggingInterface()
564:                                .debugMsg(
565:                                        SessionUtilities.getLoggingInterface().ERROR,
566:                                        SessionUtilities.getLoggingInterface().CLIENT,
567:                                        "Problem removing client or closing connection...");
568:                        e.printStackTrace();
569:                    }
570:                }
571:            }
572:
573:            /**
574:             * call this when all consumers need to be removed...takes care to close the connection,
575:             */
576:            public void removeAllConsumerConnections()
577:      {
578:      SocketAdapter this SA = null;
579:      if(allMySocketHandlers != null)
580:         {
581:         for(Enumeration enum = allMySocketHandlers.elements(); enum.hasMoreElements();)
582:            {
583:            this SA = (SocketAdapter)enum.nextElement();
584:            SessionUtilities.getLoggingInterface().debugMsg(SessionUtilities.getLoggingInterface().DEBUG,
585:               SessionUtilities.getLoggingInterface().CLIENT,
586:               "Removing consumer " + this SA.getClientKey());
587:            SocketAdapter ts = (SocketAdapter)allMySocketHandlers.remove(this SA.getClientKey());
588:            if(ts != null)
589:               allSockets.remove(this SA);
590:            if(this SA.getType() == ChannelDescription.MULTICAST)
591:               {
592:               // for multicast, only one socket server and one socket, don't close it
593:               connectionLost(this SA.getClientKey());
594:               }
595:            else
596:               {
597:               // close connection and it will notify us
598:               this SA.close();
599:               }
600:            }
601:         }
602:      }
603:
604:            /**
605:             * this method handles RequestHistory objects, and either sends the history objects or
606:             * cancels the history that is being sent
607:             */
608:            private void handleHistory(RequestHistory rh, SocketAdapter ts) {
609:                if (rh.typeOfRequest == RequestHistory.DATA) {
610:                    HistoryThread historyThread = new HistoryThread(
611:                            this ,
612:                            ts,
613:                            server.getPersistenceInterface(),
614:                            channelInfo.getConnectionDescriptor().channelDescription);
615:                    historyThreads.put(ts.getKeyValue(), historyThread);
616:                    ts.setClientKey(rh.userName); // this will be written over later with the activation, but lets us see who is getting history for now
617:                    historyThread.start();
618:                } else if (rh.typeOfRequest == RequestHistory.CANCEL) {
619:                    HistoryThread historyThread = (HistoryThread) historyThreads
620:                            .remove(ts.getKeyValue());
621:                    historyThread.cancel();
622:                }
623:            }
624:        }
625:
626:        /**
627:         * Sends the EJBs for this channel to the client
628:         */
629:        class HistoryThread extends Thread implements  FifoConsumer {
630:            boolean finished = false;
631:            boolean canceled = false;
632:            DataReceiverAdapter dra;
633:            SocketAdapter ts;
634:            String threadName;
635:            int sequenceNumber = 0;
636:            int incrementSize = 100; // how many to retrieve at one time, will be set to value passed in by ChannelDescription
637:            boolean retrieveMultipleEJBs = false;
638:            int delayCount = 0;
639:            ChannelDescription channelDescription;
640:            FifoQueue queue = new FifoQueue();
641:            RetreiveDataBeans getDataBeans = new RetreiveDataBeans();
642:            PersistenceInterface persistenceInterface = null;
643:            // these attributes used when 'data pacing' is in effect (use original data times to help pace data)
644:            long lastDataSentSystemTime = 0l; // when we sent the last chunk of data
645:            long previousDataTime = 0l; // original time of last chuck of data sent
646:            int preSendThisMany = 20; // send this many packets to pre-charge our buffers before actual pacing begins
647:            int sentWithOutPacing = 0; // how many we have sent without pacing, compared to above value to determine if we should pace yet
648:
649:            public HistoryThread(DataReceiverAdapter parent, SocketAdapter sa,
650:                    PersistenceInterface pi, ChannelDescription cd) {
651:                dra = parent;
652:                ts = sa;
653:                persistenceInterface = pi;
654:                channelDescription = cd;
655:                threadName = "DataShare.HistoryThead-" + sa.getKeyValue();
656:                this .setName(threadName);
657:            }
658:
659:            /**
660:             * only called if user cancels
661:             */
662:            public void cancel() {
663:                canceled = true;
664:                this .setName("Stopped--" + threadName);
665:                finished = true;
666:                getDataBeans.cancelHistory();
667:                queue.reset();
668:                this .sendMoreData(); // just to release the Thread so it can exit
669:            }
670:
671:            public void run() {
672:                this .setPriority(Thread.currentThread().getPriority() + 1);
673:                DataShareObject dso = null;
674:                DataShareObject[] dsoArray = null;
675:                incrementSize = channelDescription.historyCountInc; // how many to retrieve at one time
676:                if (incrementSize > 0)
677:                    retrieveMultipleEJBs = true;
678:                delayCount = channelDescription.historyDelay;
679:
680:                if (SessionUtilities.getVerbose()) {
681:                    String delayType;
682:                    if (delayCount > 0)
683:                        delayType = ", with a delay between packets of "
684:                                + delayCount + "msec";
685:                    else if (delayCount < 0)
686:                        delayType = ", with the data timing similar to original data ('paced')";
687:                    else
688:                        delayType = " with no delay between data packets";
689:
690:                    SessionUtilities.getLoggingInterface().debugMsg(
691:                            SessionUtilities.getLoggingInterface().DEBUG,
692:                            SessionUtilities.getLoggingInterface().DATABASE,
693:                            threadName + " is retrieving/sending "
694:                                    + incrementSize + " EJBs per call"
695:                                    + delayType);
696:                }
697:
698:                // true only if retrieving multiple EJBs per call
699:                if (retrieveMultipleEJBs) {
700:                    queue.setConsumer(this ); // the queue will now call our newFifoDataAvailable from it's Thread whenever data is available
701:                    getDataBeans.retrieveAllData(persistenceInterface, dra
702:                            .getChannelDatabaseKey());
703:                }
704:
705:                while (!finished) {
706:                    try {
707:                        if (retrieveMultipleEJBs) // if getting multiple EJBs at a time, use the threading of FifoQueue
708:                        {
709:                            dsoArray = getDataBeans.getNextData(incrementSize);
710:
711:                            // dsoArray of null means no more data available, zero length means none right now
712:                            if (dsoArray == null) {
713:                                SessionUtilities
714:                                        .getLoggingInterface()
715:                                        .debugMsg(
716:                                                SessionUtilities
717:                                                        .getLoggingInterface().DEBUG,
718:                                                SessionUtilities
719:                                                        .getLoggingInterface().DATABASE,
720:                                                "No more history available for Channel "
721:                                                        + channelDescription.channelName);
722:                                finished = true;
723:                                break;
724:                            }
725:
726:                            if (dsoArray.length > 0) {
727:                                //sequenceNumber += incrementSize;
728:                                //sequenceNumber += dsoArray.length;  // increment sequence number by what was returned
729:                                for (int x = 0; x < dsoArray.length; x++) {
730:                                    try {
731:                                        queue.write(dsoArray[x]);
732:                                    } catch (Exception ee) {
733:                                        ee.printStackTrace();
734:                                    }
735:                                }
736:                                this .waitToGetMoreData(); // wait for thread to ask for more data
737:                            }
738:                        } else // retreive EJBs one at a time and send them one at a time
739:                        {
740:                            dso = RetreiveDataBeans.getData(
741:                                    persistenceInterface, dra
742:                                            .getChannelDatabaseKey(),
743:                                    sequenceNumber++);
744:                            if (dso != null) {
745:                                ts.sendData(dso);
746:                                yield();
747:                            } else
748:                                finished = true;
749:                        }
750:                    } catch (Exception e) {
751:                        e.printStackTrace();
752:                        finished = true;
753:                    }
754:                } // end of while(!finished)
755:
756:                try {
757:                    // tell client that no more history will be sent
758:                    HistoryFinishedObject hfo = new HistoryFinishedObject(dra
759:                            .getChannelDatabaseKey());
760:                    dso = new DataShareObject(SessionUtilities
761:                            .convertObjectToByteArray(hfo),
762:                            DataShareObject.SENDTOALL, ts.getClientKey());
763:                    SessionUtilities.getLoggingInterface().debugMsg(
764:                            SessionUtilities.getLoggingInterface().DEBUG,
765:                            SessionUtilities.getLoggingInterface().DATABASE,
766:                            "Sending HistoryFinishedObject to client "
767:                                    + ts.getClientKey() + " for Channel "
768:                                    + channelDescription.channelName);
769:
770:                    // send history finished; if this is an unreliable channel, send a few more end of history objects...
771:                    int sendThisManyHistoryFinishedObjects = 1;
772:                    if (ts.getType() == ChannelDescription.UDP
773:                            || ts.getType() == ChannelDescription.MULTICAST)
774:                        sendThisManyHistoryFinishedObjects = 3;
775:                    for (int x = 0; x < sendThisManyHistoryFinishedObjects; x++) {
776:                        if (retrieveMultipleEJBs)
777:                            queue.write(dso); // put HistoryFinishedObject at end of queue
778:                        else
779:                            ts.sendData(dso); //  send HistoryFinishedObject normally
780:                    }
781:                } catch (Exception e) {
782:                    SessionUtilities.getLoggingInterface().debugMsg(
783:                            SessionUtilities.getLoggingInterface().ERROR,
784:                            SessionUtilities.getLoggingInterface().DATABASE,
785:                            "Problem sending HistoryFinishedObject to client "
786:                                    + ts.getClientKey() + " for Channel "
787:                                    + channelDescription.channelName);
788:                    e.printStackTrace();
789:                }
790:
791:                // if xmitting from queue thread, must wait until queue is empty or we cancel before shutting down
792:                // this thread, or the HistoryFinishedObject will not get sent to client...
793:                while (retrieveMultipleEJBs && queue.size() > 0 && !canceled) {
794:                    SessionUtilities.getLoggingInterface().debugMsg(
795:                            SessionUtilities.getLoggingInterface().DEBUG,
796:                            SessionUtilities.getLoggingInterface().DATABASE,
797:                            "Thread named " + this .getName()
798:                                    + " will exit when " + queue.size()
799:                                    + " more beans are sent");
800:                    SessionUtilities.delay(1000);
801:                }
802:
803:                SessionUtilities.getLoggingInterface().debugMsg(
804:                        SessionUtilities.getLoggingInterface().DEBUG,
805:                        SessionUtilities.getLoggingInterface().DATABASE,
806:                        "Thread named " + this .getName() + " has stopped");
807:            }
808:
809:            // releases the blocked thread so it can exit or look for more data
810:            public synchronized void sendMoreData() {
811:                SessionUtilities
812:                        .getLoggingInterface()
813:                        .debugMsg(
814:                                SessionUtilities.getLoggingInterface().DEBUG,
815:                                SessionUtilities.getLoggingInterface().DATABASE,
816:                                "More History data has been requested by history xmitter thread...");
817:                notifyAll();
818:            }
819:
820:            public synchronized void waitToGetMoreData() {
821:                try {
822:                    wait();
823:                } catch (InterruptedException e) {
824:                }
825:            }
826:
827:            /**
828:             * called when data is available from the FIFO, used only when getting multiple EJBs at a time.
829:             * This method is called from the FifoQueue thread so any delay here will affect it only.
830:             */
831:            public void newFifoDataAvailable(Object object) {
832:
833:                if (delayCount >= 0) // send data after a fixed delay time
834:                {
835:                    if (object != null)
836:                        ts.sendData((DataShareObject) object);
837:                    if (delayCount > 0) {
838:                        SessionUtilities
839:                                .getLoggingInterface()
840:                                .debugMsg(
841:                                        SessionUtilities.getLoggingInterface().DEBUG,
842:                                        SessionUtilities.getLoggingInterface().DATABASE,
843:                                        "delaying " + delayCount
844:                                                + " in newFifoDataAvailable...");
845:                        SessionUtilities.delay(delayCount);
846:                    }
847:                } else if (delayCount < 0) // send data paced by original data (sort of...)
848:                {
849:                    paceTheData((DataShareObject) object); // will calculate a delay, if needed
850:                }
851:
852:                if (queue.size() == incrementSize / 2 || queue.size() == 0) // wait until half our queue has been sent (test for zero if last chunk less than half)
853:                    sendMoreData(); // cause the next chunk of EJBs to be put into our queue
854:            }
855:
856:            /**
857:             * calculates how long to delay before returning so that the next data can be sent, assumes
858:             * that data was just sent and that data will be sent again soon after the return.  Note that if the
859:             * original data had packets seperated by long delays, this algorithm will remove all the delays.
860:             * The goal is to send the data with small delays with about the same pacing as it had originally,
861:             * but to remove the long delays altogether.  Long delay is currently defined below.
862:             */
863:            private void paceTheData(DataShareObject dso) {
864:                long originalDataDelay = 0;
865:                long originalDataTime = 0;
866:                long systemDelta = 0;
867:                long dataSentSystemTime = 0;
868:                long longDelay = 2000l; // 2 seconds is a long delay
869:
870:                if (dso != null) {
871:                    originalDataTime = dso.getCreationDate().getTime();
872:                    originalDataDelay = originalDataTime - previousDataTime; // delta time between original datum
873:                    systemDelta = System.currentTimeMillis()
874:                            - lastDataSentSystemTime; // milliseconds since we sent last data
875:
876:                    if (originalDataDelay > longDelay || // if there was a long delay in original data,
877:                            systemDelta >= originalDataDelay) // or we have already waited long enough
878:                    {
879:                        // then send data immediately
880:                        ts.sendData(dso);
881:                        sentWithOutPacing++;
882:                        dataSentSystemTime = System.currentTimeMillis();
883:                    } else // we may be delaying before sending data...
884:                    {
885:                        int delayTime = new Long(originalDataDelay
886:                                - systemDelta).intValue();
887:                        if (sentWithOutPacing++ > preSendThisMany)
888:                            SessionUtilities.delay(delayTime);
889:                        ts.sendData(dso);
890:                        dataSentSystemTime = System.currentTimeMillis();
891:                    }
892:
893:                    SessionUtilities
894:                            .getLoggingInterface()
895:                            .debugMsg(
896:                                    SessionUtilities.getLoggingInterface().DEBUG,
897:                                    SessionUtilities.getLoggingInterface().DATABASE,
898:                                    "paceTheData: originalDataTime = "
899:                                            + originalDataTime
900:                                            + ", data sent at "
901:                                            + lastDataSentSystemTime
902:                                            + ", dataDelta = "
903:                                            + originalDataDelay
904:                                            + ", our delay = "
905:                                            + (dataSentSystemTime - lastDataSentSystemTime));
906:
907:                    previousDataTime = originalDataTime; // save it for the next time
908:                    lastDataSentSystemTime = dataSentSystemTime; // save it for next time too
909:                } else
910:                    SessionUtilities.getLoggingInterface().debugMsg(
911:                            SessionUtilities.getLoggingInterface().DEBUG,
912:                            SessionUtilities.getLoggingInterface().DATABASE,
913:                            "paceTheData received a null DataShareObject");
914:            }
915:
916:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.