Source Code Cross Referenced for TP.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        package org.jgroups.protocols;
0002:
0003:        import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
0004:        import org.jgroups.*;
0005:        import org.jgroups.stack.Protocol;
0006:        import org.jgroups.stack.IpAddress;
0007:        import org.jgroups.util.*;
0008:        import org.jgroups.util.List;
0009:        import org.jgroups.util.Queue;
0010:
0011:        import java.io.DataInputStream;
0012:        import java.io.IOException;
0013:        import java.net.*;
0014:        import java.text.NumberFormat;
0015:        import java.util.*;
0016:
0017:        /**
0018:         * Generic transport - specific implementations should extend this abstract class.
0019:         * Features which are provided to the subclasses include
0020:         * <ul>
0021:         * <li>version checking
0022:         * <li>marshalling and unmarshalling
0023:         * <li>message bundling (handling single messages, and message lists)
0024:         * <li>incoming packet handler
0025:         * <li>loopback
0026:         * </ul>
0027:         * A subclass has to override
0028:         * <ul>
0029:         * <li>{@link #sendToAllMembers(byte[], int, int)}
0030:         * <li>{@link #sendToSingleMember(org.jgroups.Address, byte[], int, int)}
0031:         * <li>{@link #init()}
0032:         * <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves
0033:         * (e.g., created their sockets).
0034:         * <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves
0035:         * <li>{@link #destroy()}
0036:         * </ul>
0037:         * The create() or start() method has to create a local address.<br>
0038:         * The {@link #receive(Address, Address, byte[], int, int)} method must
0039:         * be called by subclasses when a unicast or multicast message has been received.
0040:         * @author Bela Ban
0041:         * @version $Id: TP.java,v 1.77.2.3 2007/04/27 08:03:51 belaban Exp $
0042:         */
0043:        public abstract class TP extends Protocol {
0044:
0045:            /** The address (host and port) of this member */
0046:            protected Address local_addr = null;
0047:
0048:            /** The name of the group to which this member is connected */
0049:            protected String channel_name = null;
0050:
0051:            /** The interface (NIC) which should be used by this transport */
0052:            protected InetAddress bind_addr = null;
0053:
0054:            /** Overrides bind_addr, -Djgroups.bind_addr and -Dbind.address: let's the OS return the local host address */
0055:            boolean use_local_host = false;
0056:
0057:            /** If true, the transport should use all available interfaces to receive multicast messages
0058:             * @deprecated  Use {@link receive_on_all_interfaces} instead */
0059:            boolean bind_to_all_interfaces = false;
0060:
0061:            /** If true, the transport should use all available interfaces to receive multicast messages */
0062:            boolean receive_on_all_interfaces = false;
0063:
0064:            /** List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen
0065:             * on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.
0066:             * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once.
0067:             * If this property is set, it override receive_on_all_interfaces.
0068:             */
0069:            java.util.List receive_interfaces = null;
0070:
0071:            /** If true, the transport should use all available interfaces to send multicast messages. This means
0072:             * the same multicast message is sent N times, so use with care */
0073:            boolean send_on_all_interfaces = false;
0074:
0075:            /** List<NetworkInterface> of interfaces to send multicasts on. The multicast send socket will send the
0076:             * same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or
0077:             * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded.
0078:             * If this property is set, it override send_on_all_interfaces.
0079:             */
0080:            java.util.List send_interfaces = null;
0081:
0082:            /** The port to which the transport binds. 0 means to bind to any (ephemeral) port */
0083:            int bind_port = 0;
0084:            int port_range = 1; // 27-6-2003 bgooren, Only try one port by default
0085:
0086:            /** The members of this group (updated when a member joins or leaves) */
0087:            final protected Vector members = new Vector(11);
0088:
0089:            protected View view = null;
0090:
0091:            /** Pre-allocated byte stream. Used for marshalling messages. Will grow as needed */
0092:            final ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream(
0093:                    1024);
0094:            final ExposedBufferedOutputStream buf_out_stream = new ExposedBufferedOutputStream(
0095:                    out_stream, 1024);
0096:            final ExposedDataOutputStream dos = new ExposedDataOutputStream(
0097:                    buf_out_stream);
0098:
0099:            final ExposedByteArrayInputStream in_stream = new ExposedByteArrayInputStream(
0100:                    new byte[] { '0' });
0101:            final ExposedBufferedInputStream buf_in_stream = new ExposedBufferedInputStream(
0102:                    in_stream);
0103:            final DataInputStream dis = new DataInputStream(buf_in_stream);
0104:
0105:            /** If true, messages sent to self are treated specially: unicast messages are
0106:             * looped back immediately, multicast messages get a local copy first and -
0107:             * when the real copy arrives - it will be discarded. Useful for Window
0108:             * media (non)sense */
0109:            boolean loopback = false;
0110:
0111:            /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
0112:             * to true means that we expect the exact same version on all incoming packets */
0113:            boolean discard_incompatible_packets = false;
0114:
0115:            /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
0116:             * Packet handler is a separate thread taking care of de-serialization, receiver
0117:             * thread(s) simply put packet in queue and return immediately. Setting this to
0118:             * true adds one more thread */
0119:            boolean use_incoming_packet_handler = true;
0120:
0121:            /** Used by packet handler to store incoming DatagramPackets */
0122:            Queue incoming_packet_queue = null;
0123:
0124:            /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
0125:             * calls <tt>handleIncomingUdpPacket()</tt> */
0126:            IncomingPacketHandler incoming_packet_handler = null;
0127:
0128:            /** Used by packet handler to store incoming Messages */
0129:            Queue incoming_msg_queue = null;
0130:
0131:            IncomingMessageHandler incoming_msg_handler;
0132:
0133:            /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
0134:             * value uses an additional thread */
0135:            boolean use_outgoing_packet_handler = false;
0136:
0137:            /** Used by packet handler to store outgoing DatagramPackets */
0138:            BoundedLinkedQueue outgoing_queue = null;
0139:
0140:            /** max number of elements in the bounded outgoing_queue */
0141:            int outgoing_queue_max_size = 2000;
0142:
0143:            OutgoingPacketHandler outgoing_packet_handler = null;
0144:
0145:            /** If set it will be added to <tt>local_addr</tt>. Used to implement
0146:             * for example transport independent addresses */
0147:            byte[] additional_data = null;
0148:
0149:            /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
0150:                than the largest datagram packet size in case of UDP */
0151:            int max_bundle_size = AUTOCONF.senseMaxFragSizeStatic();
0152:
0153:            /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
0154:             * max_bundle_timeout has been exceeded (whichever occurs faster)
0155:             */
0156:            long max_bundle_timeout = 20;
0157:
0158:            /** Enabled bundling of smaller messages into bigger ones */
0159:            boolean enable_bundling = false;
0160:
0161:            private Bundler bundler = null;
0162:
0163:            protected TimeScheduler timer = null;
0164:
0165:            private DiagnosticsHandler diag_handler = null;
0166:            boolean enable_diagnostics = true;
0167:            String diagnostics_addr = "224.0.0.75";
0168:            int diagnostics_port = 7500;
0169:
0170:            /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds
0171:             * an entry with key=S and value= sender's IP address and port.
0172:             */
0173:            HashMap addr_translation_table = new HashMap();
0174:
0175:            boolean use_addr_translation = false;
0176:
0177:            TpHeader header;
0178:
0179:            final String name = getName();
0180:
0181:            static final byte LIST = 1; // we have a list of messages rather than a single message when set
0182:            static final byte MULTICAST = 2; // message is a multicast (versus a unicast) message when set
0183:
0184:            long num_msgs_sent = 0, num_msgs_received = 0, num_bytes_sent = 0,
0185:                    num_bytes_received = 0;
0186:
0187:            static NumberFormat f;
0188:
0189:            static {
0190:                f = NumberFormat.getNumberInstance();
0191:                f.setGroupingUsed(false);
0192:                f.setMaximumFractionDigits(2);
0193:            }
0194:
0195:            /**
0196:             * Creates the TP protocol, and initializes the
0197:             * state variables, does however not start any sockets or threads.
0198:             */
0199:            protected TP() {
0200:            }
0201:
0202:            /**
0203:             * debug only
0204:             */
0205:            public String toString() {
0206:                return name + "(local address: " + local_addr + ')';
0207:            }
0208:
0209:            public void resetStats() {
0210:                num_msgs_sent = num_msgs_received = num_bytes_sent = num_bytes_received = 0;
0211:            }
0212:
0213:            public long getNumMessagesSent() {
0214:                return num_msgs_sent;
0215:            }
0216:
0217:            public long getNumMessagesReceived() {
0218:                return num_msgs_received;
0219:            }
0220:
0221:            public long getNumBytesSent() {
0222:                return num_bytes_sent;
0223:            }
0224:
0225:            public long getNumBytesReceived() {
0226:                return num_bytes_received;
0227:            }
0228:
0229:            public String getBindAddress() {
0230:                return bind_addr != null ? bind_addr.toString() : "null";
0231:            }
0232:
0233:            public void setBindAddress(String bind_addr)
0234:                    throws UnknownHostException {
0235:                this .bind_addr = InetAddress.getByName(bind_addr);
0236:            }
0237:
0238:            /** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */
0239:            public boolean getBindToAllInterfaces() {
0240:                return receive_on_all_interfaces;
0241:            }
0242:
0243:            public void setBindToAllInterfaces(boolean flag) {
0244:                this .receive_on_all_interfaces = flag;
0245:            }
0246:
0247:            public boolean isReceiveOnAllInterfaces() {
0248:                return receive_on_all_interfaces;
0249:            }
0250:
0251:            public java.util.List getReceiveInterfaces() {
0252:                return receive_interfaces;
0253:            }
0254:
0255:            public boolean isSendOnAllInterfaces() {
0256:                return send_on_all_interfaces;
0257:            }
0258:
0259:            public java.util.List getSendInterfaces() {
0260:                return send_interfaces;
0261:            }
0262:
0263:            public boolean isDiscardIncompatiblePackets() {
0264:                return discard_incompatible_packets;
0265:            }
0266:
0267:            public void setDiscardIncompatiblePackets(boolean flag) {
0268:                discard_incompatible_packets = flag;
0269:            }
0270:
0271:            public boolean isEnableBundling() {
0272:                return enable_bundling;
0273:            }
0274:
0275:            public void setEnableBundling(boolean flag) {
0276:                enable_bundling = flag;
0277:            }
0278:
0279:            public int getMaxBundleSize() {
0280:                return max_bundle_size;
0281:            }
0282:
0283:            public void setMaxBundleSize(int size) {
0284:                max_bundle_size = size;
0285:            }
0286:
0287:            public long getMaxBundleTimeout() {
0288:                return max_bundle_timeout;
0289:            }
0290:
0291:            public void setMaxBundleTimeout(long timeout) {
0292:                max_bundle_timeout = timeout;
0293:            }
0294:
0295:            public int getOutgoingQueueSize() {
0296:                return outgoing_queue != null ? outgoing_queue.size() : 0;
0297:            }
0298:
0299:            public int getIncomingQueueSize() {
0300:                return incoming_packet_queue != null ? incoming_packet_queue
0301:                        .size() : 0;
0302:            }
0303:
0304:            public Address getLocalAddress() {
0305:                return local_addr;
0306:            }
0307:
0308:            public String getChannelName() {
0309:                return channel_name;
0310:            }
0311:
0312:            public boolean isLoopback() {
0313:                return loopback;
0314:            }
0315:
0316:            public void setLoopback(boolean b) {
0317:                loopback = b;
0318:            }
0319:
0320:            public boolean isUseIncomingPacketHandler() {
0321:                return use_incoming_packet_handler;
0322:            }
0323:
0324:            public boolean isUseOutgoingPacketHandler() {
0325:                return use_outgoing_packet_handler;
0326:            }
0327:
0328:            public int getOutgoingQueueMaxSize() {
0329:                return outgoing_queue != null ? outgoing_queue_max_size : 0;
0330:            }
0331:
0332:            public void setOutgoingQueueMaxSize(int new_size) {
0333:                if (outgoing_queue != null) {
0334:                    outgoing_queue.setCapacity(new_size);
0335:                    outgoing_queue_max_size = new_size;
0336:                }
0337:            }
0338:
0339:            public Map dumpStats() {
0340:                Map retval = super .dumpStats();
0341:                if (retval == null)
0342:                    retval = new HashMap();
0343:                retval.put("num_msgs_sent", new Long(num_msgs_sent));
0344:                retval.put("num_msgs_received", new Long(num_msgs_received));
0345:                retval.put("num_bytes_sent", new Long(num_bytes_sent));
0346:                retval.put("num_bytes_received", new Long(num_bytes_received));
0347:                return retval;
0348:            }
0349:
0350:            /**
0351:             * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
0352:             * messages, one for each member
0353:             * @param data The data to be sent. This is not a copy, so don't modify it
0354:             * @param offset
0355:             * @param length
0356:             * @throws Exception
0357:             */
0358:            public abstract void sendToAllMembers(byte[] data, int offset,
0359:                    int length) throws Exception;
0360:
0361:            /**
0362:             * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
0363:             * messages, one for each member
0364:             * @param dest Must be a non-null unicast address
0365:             * @param data The data to be sent. This is not a copy, so don't modify it
0366:             * @param offset
0367:             * @param length
0368:             * @throws Exception
0369:             */
0370:            public abstract void sendToSingleMember(Address dest, byte[] data,
0371:                    int offset, int length) throws Exception;
0372:
0373:            public abstract String getInfo();
0374:
0375:            public abstract void postUnmarshalling(Message msg, Address dest,
0376:                    Address src, boolean multicast);
0377:
0378:            public abstract void postUnmarshallingList(Message msg,
0379:                    Address dest, boolean multicast);
0380:
0381:            private StringBuffer _getInfo() {
0382:                StringBuffer sb = new StringBuffer();
0383:                sb.append(local_addr).append(" (").append(channel_name).append(
0384:                        ") ").append("\n");
0385:                sb.append("local_addr=").append(local_addr).append("\n");
0386:                sb.append("group_name=").append(channel_name).append("\n");
0387:                sb.append("version=").append(Version.description).append(
0388:                        ", cvs=\"").append(Version.cvs).append("\"\n");
0389:                sb.append("view: ").append(view).append('\n');
0390:                sb.append(getInfo());
0391:                return sb;
0392:            }
0393:
0394:            private void handleDiagnosticProbe(SocketAddress sender,
0395:                    DatagramSocket sock, String request) {
0396:                try {
0397:                    StringTokenizer tok = new StringTokenizer(request);
0398:                    String req = tok.nextToken();
0399:                    StringBuffer info = new StringBuffer("n/a");
0400:                    if (req.trim().toLowerCase().startsWith("query")) {
0401:                        ArrayList l = new ArrayList(tok.countTokens());
0402:                        while (tok.hasMoreTokens())
0403:                            l.add(tok.nextToken().trim().toLowerCase());
0404:
0405:                        info = _getInfo();
0406:
0407:                        if (l.contains("jmx")) {
0408:                            Channel ch = stack.getChannel();
0409:                            if (ch != null) {
0410:                                Map m = ch.dumpStats();
0411:                                StringBuffer sb = new StringBuffer();
0412:                                sb.append("stats:\n");
0413:                                for (Iterator it = m.entrySet().iterator(); it
0414:                                        .hasNext();) {
0415:                                    sb.append(it.next()).append("\n");
0416:                                }
0417:                                info.append(sb);
0418:                            }
0419:                        }
0420:                        if (l.contains("props")) {
0421:                            String p = stack.printProtocolSpecAsXML();
0422:                            info.append("\nprops:\n").append(p);
0423:                        }
0424:                    }
0425:
0426:                    byte[] diag_rsp = info.toString().getBytes();
0427:                    if (log.isDebugEnabled())
0428:                        log.debug("sending diag response to " + sender);
0429:                    sendResponse(sock, sender, diag_rsp);
0430:                } catch (Throwable t) {
0431:                    if (log.isErrorEnabled())
0432:                        log.error("failed sending diag rsp to " + sender, t);
0433:                }
0434:            }
0435:
0436:            private static void sendResponse(DatagramSocket sock,
0437:                    SocketAddress sender, byte[] buf) throws IOException {
0438:                DatagramPacket p = new DatagramPacket(buf, 0, buf.length,
0439:                        sender);
0440:                sock.send(p);
0441:            }
0442:
0443:            /* ------------------------------------------------------------------------------- */
0444:
0445:            /*------------------------------ Protocol interface ------------------------------ */
0446:
0447:            public void init() throws Exception {
0448:                super .init();
0449:                if (bind_addr != null) {
0450:                    Map m = new HashMap(1);
0451:                    m.put("bind_addr", bind_addr);
0452:                    passUp(new Event(Event.CONFIG, m));
0453:                }
0454:            }
0455:
0456:            /**
0457:             * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
0458:             */
0459:            public void start() throws Exception {
0460:                timer = stack.timer;
0461:                if (timer == null)
0462:                    throw new Exception("timer is null");
0463:
0464:                if (enable_diagnostics) {
0465:                    diag_handler = new DiagnosticsHandler();
0466:                    diag_handler.start();
0467:                }
0468:
0469:                if (use_incoming_packet_handler) {
0470:                    incoming_packet_queue = new Queue();
0471:                    incoming_packet_handler = new IncomingPacketHandler();
0472:                    incoming_packet_handler.start();
0473:                }
0474:
0475:                if (loopback) {
0476:                    incoming_msg_queue = new Queue();
0477:                    incoming_msg_handler = new IncomingMessageHandler();
0478:                    incoming_msg_handler.start();
0479:                }
0480:
0481:                if (use_outgoing_packet_handler) {
0482:                    outgoing_queue = new BoundedLinkedQueue(
0483:                            outgoing_queue_max_size);
0484:                    outgoing_packet_handler = new OutgoingPacketHandler();
0485:                    outgoing_packet_handler.start();
0486:                }
0487:
0488:                if (enable_bundling) {
0489:                    bundler = new Bundler();
0490:                }
0491:
0492:                passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
0493:            }
0494:
0495:            public void stop() {
0496:                if (diag_handler != null) {
0497:                    diag_handler.stop();
0498:                    diag_handler = null;
0499:                }
0500:
0501:                // 1. Stop the outgoing packet handler thread
0502:                if (outgoing_packet_handler != null)
0503:                    outgoing_packet_handler.stop();
0504:
0505:                // 2. Stop the incoming packet handler thread
0506:                if (incoming_packet_handler != null)
0507:                    incoming_packet_handler.stop();
0508:
0509:                // 3. Finally stop the incoming message handler
0510:                if (incoming_msg_handler != null)
0511:                    incoming_msg_handler.stop();
0512:            }
0513:
0514:            /**
0515:             * Setup the Protocol instance according to the configuration string
0516:             * @return true if no other properties are left.
0517:             *         false if the properties still have data in them, ie ,
0518:             *         properties are left over and not handled by the protocol stack
0519:             */
0520:            public boolean setProperties(Properties props) {
0521:                super .setProperties(props);
0522:
0523:                boolean ignore_systemprops = Util
0524:                        .isBindAddressPropertyIgnored();
0525:                String str = Util.getProperty(new String[] { Global.BIND_ADDR,
0526:                        Global.BIND_ADDR_OLD }, props, "bind_addr",
0527:                        ignore_systemprops, null);
0528:
0529:                if (str != null) {
0530:                    try {
0531:                        bind_addr = InetAddress.getByName(str);
0532:                    } catch (UnknownHostException unknown) {
0533:                        if (log.isFatalEnabled())
0534:                            log
0535:                                    .fatal("(bind_addr): host " + str
0536:                                            + " not known");
0537:                        return false;
0538:                    }
0539:                    props.remove("bind_addr");
0540:                }
0541:
0542:                str = props.getProperty("use_local_host");
0543:                if (str != null) {
0544:                    use_local_host = new Boolean(str).booleanValue();
0545:                    props.remove("use_local_host");
0546:                }
0547:
0548:                str = props.getProperty("bind_to_all_interfaces");
0549:                if (str != null) {
0550:                    receive_on_all_interfaces = new Boolean(str).booleanValue();
0551:                    props.remove("bind_to_all_interfaces");
0552:                    log
0553:                            .warn("bind_to_all_interfaces has been deprecated; use receive_on_all_interfaces instead");
0554:                }
0555:
0556:                str = props.getProperty("receive_on_all_interfaces");
0557:                if (str != null) {
0558:                    receive_on_all_interfaces = new Boolean(str).booleanValue();
0559:                    props.remove("receive_on_all_interfaces");
0560:                }
0561:
0562:                str = props.getProperty("receive_interfaces");
0563:                if (str != null) {
0564:                    try {
0565:                        receive_interfaces = parseInterfaceList(str);
0566:                        props.remove("receive_interfaces");
0567:                    } catch (Exception e) {
0568:                        log.error("error determining interfaces (" + str + ")",
0569:                                e);
0570:                        return false;
0571:                    }
0572:                }
0573:
0574:                str = props.getProperty("send_on_all_interfaces");
0575:                if (str != null) {
0576:                    send_on_all_interfaces = new Boolean(str).booleanValue();
0577:                    props.remove("send_on_all_interfaces");
0578:                }
0579:
0580:                str = props.getProperty("send_interfaces");
0581:                if (str != null) {
0582:                    try {
0583:                        send_interfaces = parseInterfaceList(str);
0584:                        props.remove("send_interfaces");
0585:                    } catch (Exception e) {
0586:                        log.error("error determining interfaces (" + str + ")",
0587:                                e);
0588:                        return false;
0589:                    }
0590:                }
0591:
0592:                str = props.getProperty("bind_port");
0593:                if (str != null) {
0594:                    bind_port = Integer.parseInt(str);
0595:                    props.remove("bind_port");
0596:                }
0597:
0598:                str = props.getProperty("port_range");
0599:                if (str != null) {
0600:                    port_range = Integer.parseInt(str);
0601:                    props.remove("port_range");
0602:                }
0603:
0604:                str = props.getProperty("loopback");
0605:                if (str != null) {
0606:                    loopback = Boolean.valueOf(str).booleanValue();
0607:                    props.remove("loopback");
0608:                }
0609:
0610:                str = props.getProperty("discard_incompatible_packets");
0611:                if (str != null) {
0612:                    discard_incompatible_packets = Boolean.valueOf(str)
0613:                            .booleanValue();
0614:                    props.remove("discard_incompatible_packets");
0615:                }
0616:
0617:                // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
0618:                str = props.getProperty("use_packet_handler");
0619:                if (str != null) {
0620:                    use_incoming_packet_handler = Boolean.valueOf(str)
0621:                            .booleanValue();
0622:                    props.remove("use_packet_handler");
0623:                    if (log.isWarnEnabled())
0624:                        log
0625:                                .warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
0626:                }
0627:
0628:                str = props.getProperty("use_incoming_packet_handler");
0629:                if (str != null) {
0630:                    use_incoming_packet_handler = Boolean.valueOf(str)
0631:                            .booleanValue();
0632:                    props.remove("use_incoming_packet_handler");
0633:                }
0634:
0635:                str = props.getProperty("use_outgoing_packet_handler");
0636:                if (str != null) {
0637:                    use_outgoing_packet_handler = Boolean.valueOf(str)
0638:                            .booleanValue();
0639:                    props.remove("use_outgoing_packet_handler");
0640:                }
0641:
0642:                str = props.getProperty("outgoing_queue_max_size");
0643:                if (str != null) {
0644:                    outgoing_queue_max_size = Integer.parseInt(str);
0645:                    props.remove("outgoing_queue_max_size");
0646:                    if (outgoing_queue_max_size <= 0) {
0647:                        if (log.isWarnEnabled())
0648:                            log.warn("outgoing_queue_max_size of "
0649:                                    + outgoing_queue_max_size
0650:                                    + " is invalid, setting it to 1");
0651:                        outgoing_queue_max_size = 1;
0652:                    }
0653:                }
0654:
0655:                str = props.getProperty("max_bundle_size");
0656:                if (str != null) {
0657:                    int bundle_size = Integer.parseInt(str);
0658:                    if (bundle_size > max_bundle_size) {
0659:                        if (log.isErrorEnabled())
0660:                            log
0661:                                    .error("max_bundle_size ("
0662:                                            + bundle_size
0663:                                            + ") is greater than largest TP fragmentation size ("
0664:                                            + max_bundle_size + ')');
0665:                        return false;
0666:                    }
0667:                    if (bundle_size <= 0) {
0668:                        if (log.isErrorEnabled())
0669:                            log.error("max_bundle_size (" + bundle_size
0670:                                    + ") is <= 0");
0671:                        return false;
0672:                    }
0673:                    max_bundle_size = bundle_size;
0674:                    props.remove("max_bundle_size");
0675:                }
0676:
0677:                str = props.getProperty("max_bundle_timeout");
0678:                if (str != null) {
0679:                    max_bundle_timeout = Long.parseLong(str);
0680:                    if (max_bundle_timeout <= 0) {
0681:                        if (log.isErrorEnabled())
0682:                            log.error("max_bundle_timeout of "
0683:                                    + max_bundle_timeout + " is invalid");
0684:                        return false;
0685:                    }
0686:                    props.remove("max_bundle_timeout");
0687:                }
0688:
0689:                str = props.getProperty("enable_bundling");
0690:                if (str != null) {
0691:                    enable_bundling = Boolean.valueOf(str).booleanValue();
0692:                    props.remove("enable_bundling");
0693:                }
0694:
0695:                str = props.getProperty("use_addr_translation");
0696:                if (str != null) {
0697:                    use_addr_translation = Boolean.valueOf(str).booleanValue();
0698:                    props.remove("use_addr_translation");
0699:                }
0700:
0701:                str = props.getProperty("enable_diagnostics");
0702:                if (str != null) {
0703:                    enable_diagnostics = Boolean.valueOf(str).booleanValue();
0704:                    props.remove("enable_diagnostics");
0705:                }
0706:
0707:                str = props.getProperty("diagnostics_addr");
0708:                if (str != null) {
0709:                    diagnostics_addr = str;
0710:                    props.remove("diagnostics_addr");
0711:                }
0712:
0713:                str = props.getProperty("diagnostics_port");
0714:                if (str != null) {
0715:                    diagnostics_port = Integer.parseInt(str);
0716:                    props.remove("diagnostics_port");
0717:                }
0718:
0719:                if (enable_bundling) {
0720:                    //if (use_outgoing_packet_handler == false)
0721:                    //  if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
0722:                    // use_outgoing_packet_handler=true;
0723:                }
0724:
0725:                return true;
0726:            }
0727:
0728:            /**
0729:             * This prevents the up-handler thread to be created, which essentially is superfluous:
0730:             * messages are received from the network rather than from a layer below.
0731:             * DON'T REMOVE !
0732:             */
0733:            public void startUpHandler() {
0734:            }
0735:
0736:            /**
0737:             * handle the UP event.
0738:             * @param evt - the event being send from the stack
0739:             */
0740:            public void up(Event evt) {
0741:                switch (evt.getType()) {
0742:                case Event.CONFIG:
0743:                    passUp(evt);
0744:                    if (log.isDebugEnabled())
0745:                        log.debug("received CONFIG event: " + evt.getArg());
0746:                    handleConfigEvent((HashMap) evt.getArg());
0747:                    return;
0748:                }
0749:                passUp(evt);
0750:            }
0751:
0752:            /**
0753:             * Caller by the layer above this layer. Usually we just put this Message
0754:             * into the send queue and let one or more worker threads handle it. A worker thread
0755:             * then removes the Message from the send queue, performs a conversion and adds the
0756:             * modified Message to the send queue of the layer below it, by calling down()).
0757:             */
0758:            public void down(Event evt) {
0759:                if (evt.getType() != Event.MSG) { // unless it is a message handle it and respond
0760:                    handleDownEvent(evt);
0761:                    return;
0762:                }
0763:
0764:                Message msg = (Message) evt.getArg();
0765:                if (header != null) {
0766:                    // added patch by Roland Kurmann (March 20 2003)
0767:                    // msg.putHeader(name, new TpHeader(channel_name));
0768:                    msg.putHeader(name, header);
0769:                }
0770:
0771:                // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
0772:                // This way, we still have performance numbers for TP
0773:                if (observer != null)
0774:                    observer.passDown(evt);
0775:
0776:                setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!
0777:                if (log.isTraceEnabled()) {
0778:                    StringBuffer sb = new StringBuffer("sending msg to ")
0779:                            .append(msg.getDest()).append(" (src=").append(
0780:                                    msg.getSrc()).append("), headers are ")
0781:                            .append(msg.getHeaders());
0782:                    log.trace(sb.toString());
0783:                }
0784:
0785:                // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
0786:                // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
0787:                // we will discard our own multicast message
0788:                Address dest = msg.getDest();
0789:                boolean multicast = dest == null || dest.isMulticastAddress();
0790:                if (loopback && (multicast || dest.equals(local_addr))) {
0791:                    Message copy = msg.copy();
0792:
0793:                    // copy.removeHeader(name); // we don't remove the header
0794:                    copy.setSrc(local_addr);
0795:                    // copy.setDest(dest);
0796:
0797:                    if (log.isTraceEnabled())
0798:                        log.trace(new StringBuffer("looping back message ")
0799:                                .append(copy));
0800:                    try {
0801:                        incoming_msg_queue.add(copy);
0802:                    } catch (QueueClosedException e) {
0803:                        // log.error("failed adding looped back message to incoming_msg_queue", e);
0804:                    }
0805:
0806:                    if (!multicast)
0807:                        return;
0808:                }
0809:
0810:                try {
0811:                    if (use_outgoing_packet_handler)
0812:                        outgoing_queue.put(msg);
0813:                    else
0814:                        send(msg, dest, multicast);
0815:                } catch (QueueClosedException closed_ex) {
0816:                } catch (InterruptedException interruptedEx) {
0817:                } catch (Throwable e) {
0818:                    if (log.isErrorEnabled()) {
0819:                        String dst = msg.getDest() == null ? "null" : msg
0820:                                .getDest().toString();
0821:                        log.error("failed sending message to " + dst + " ("
0822:                                + msg.getLength() + " bytes)", e.getCause());
0823:                    }
0824:                }
0825:            }
0826:
0827:            /*--------------------------- End of Protocol interface -------------------------- */
0828:
0829:            /* ------------------------------ Private Methods -------------------------------- */
0830:
0831:            /**
0832:             * If the sender is null, set our own address. We cannot just go ahead and set the address
0833:             * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
0834:             * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
0835:             * have to return all unstable messages with the FLUSH_OK response.
0836:             */
0837:            private void setSourceAddress(Message msg) {
0838:                if (msg.getSrc() == null)
0839:                    msg.setSrc(local_addr);
0840:            }
0841:
0842:            /**
0843:             * Subclasses must call this method when a unicast or multicast message has been received.
0844:             * Declared final so subclasses cannot override this method.
0845:             *
0846:             * @param dest
0847:             * @param sender
0848:             * @param data
0849:             * @param offset
0850:             * @param length
0851:             */
0852:            protected final void receive(Address dest, Address sender,
0853:                    byte[] data, int offset, int length) {
0854:                if (data == null)
0855:                    return;
0856:
0857:                //        if(length == 4) {  // received a diagnostics probe
0858:                //            if(data[offset] == 'd' && data[offset+1] == 'i' && data[offset+2] == 'a' && data[offset+3] == 'g') {
0859:                //                handleDiagnosticProbe(sender);
0860:                //                return;
0861:                //            }
0862:                //        }
0863:
0864:                boolean mcast = dest == null || dest.isMulticastAddress();
0865:                if (log.isTraceEnabled()) {
0866:                    StringBuffer sb = new StringBuffer("received (");
0867:                    sb.append(mcast ? "mcast) " : "ucast) ").append(length)
0868:                            .append(" bytes from ").append(sender);
0869:                    log.trace(sb.toString());
0870:                }
0871:
0872:                try {
0873:                    if (use_incoming_packet_handler) {
0874:                        byte[] tmp = new byte[length];
0875:                        System.arraycopy(data, offset, tmp, 0, length);
0876:                        incoming_packet_queue.add(new IncomingQueueEntry(dest,
0877:                                sender, tmp, 0, length));
0878:                    } else
0879:                        handleIncomingPacket(dest, sender, data, offset, length);
0880:                } catch (Throwable t) {
0881:                    if (log.isErrorEnabled())
0882:                        log.error(
0883:                                new StringBuffer("failed handling data from ")
0884:                                        .append(sender), t);
0885:                }
0886:            }
0887:
0888:            /**
0889:             * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
0890:             * mcast or unicast socket reads can be concurrent.
0891:             * Correction (bela April 19 2005): we access no instance variables, all vars are allocated on the stack, so
0892:             * this method should be reentrant: removed 'synchronized' keyword
0893:             */
0894:            private void handleIncomingPacket(Address dest, Address sender,
0895:                    byte[] data, int offset, int length) {
0896:                Message msg = null;
0897:                List l = null; // used if bundling is enabled
0898:                short version;
0899:                boolean is_message_list, multicast;
0900:                byte flags;
0901:
0902:                try {
0903:                    synchronized (in_stream) {
0904:                        in_stream.setData(data, offset, length);
0905:                        buf_in_stream.reset(length);
0906:                        version = dis.readShort();
0907:                        if (Version.isBinaryCompatible(version) == false) {
0908:                            if (log.isWarnEnabled()) {
0909:                                StringBuffer sb = new StringBuffer();
0910:                                sb.append("packet from ").append(sender)
0911:                                        .append(" has different version (")
0912:                                        .append(version);
0913:                                sb.append(") from ours (").append(
0914:                                        Version.printVersion()).append("). ");
0915:                                if (discard_incompatible_packets)
0916:                                    sb.append("Packet is discarded");
0917:                                else
0918:                                    sb.append("This may cause problems");
0919:                                log.warn(sb);
0920:                            }
0921:                            if (discard_incompatible_packets)
0922:                                return;
0923:                        }
0924:
0925:                        flags = dis.readByte();
0926:                        is_message_list = (flags & LIST) == LIST;
0927:                        multicast = (flags & MULTICAST) == MULTICAST;
0928:
0929:                        if (is_message_list)
0930:                            l = bufferToList(dis, dest, multicast);
0931:                        else
0932:                            msg = bufferToMessage(dis, dest, sender, multicast);
0933:                    }
0934:
0935:                    LinkedList msgs = new LinkedList();
0936:                    if (is_message_list) {
0937:                        for (Enumeration en = l.elements(); en
0938:                                .hasMoreElements();)
0939:                            msgs.add(en.nextElement());
0940:                    } else
0941:                        msgs.add(msg);
0942:
0943:                    Address src;
0944:                    for (Iterator it = msgs.iterator(); it.hasNext();) {
0945:                        msg = (Message) it.next();
0946:                        src = msg.getSrc();
0947:                        if (loopback) {
0948:                            if (multicast && src != null
0949:                                    && local_addr.equals(src)) { // discard own loopback multicast packets
0950:                                it.remove();
0951:                            }
0952:                        } else
0953:                            handleIncomingMessage(msg);
0954:                    }
0955:                    if (incoming_msg_queue != null && msgs.size() > 0)
0956:                        incoming_msg_queue.addAll(msgs);
0957:                } catch (QueueClosedException closed_ex) {
0958:                    ; // swallow exception
0959:                } catch (Throwable t) {
0960:                    if (log.isErrorEnabled())
0961:                        log.error("failed unmarshalling message", t);
0962:                }
0963:            }
0964:
0965:            private void handleIncomingMessage(Message msg) {
0966:                Event evt;
0967:                TpHeader hdr;
0968:
0969:                if (stats) {
0970:                    num_msgs_received++;
0971:                    num_bytes_received += msg.getLength();
0972:                }
0973:
0974:                evt = new Event(Event.MSG, msg);
0975:                if (log.isTraceEnabled()) {
0976:                    StringBuffer sb = new StringBuffer("message is ").append(
0977:                            msg).append(", headers are ").append(
0978:                            msg.getHeaders());
0979:                    log.trace(sb);
0980:                }
0981:
0982:                /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
0983:                 * This allows e.g. PerfObserver to get the time of reception of a message */
0984:                if (observer != null)
0985:                    observer.up(evt, up_queue.size());
0986:
0987:                hdr = (TpHeader) msg.getHeader(name); // replaced removeHeader() with getHeader()
0988:                if (hdr != null) {
0989:
0990:                    /* Discard all messages destined for a channel with a different name */
0991:                    String ch_name = hdr.channel_name;
0992:
0993:                    // Discard if message's group name is not the same as our group name unless the
0994:                    // message is a diagnosis message (special group name DIAG_GROUP)
0995:                    if (ch_name != null && channel_name != null
0996:                            && !channel_name.equals(ch_name)
0997:                            && !ch_name.equals(Util.DIAG_GROUP)) {
0998:                        if (log.isWarnEnabled())
0999:                            log
1000:                                    .warn(new StringBuffer(
1001:                                            "discarded message from different group \"")
1002:                                            .append(ch_name).append(
1003:                                                    "\" (our group is \"")
1004:                                            .append(channel_name).append(
1005:                                                    "\"). Sender was ").append(
1006:                                                    msg.getSrc()));
1007:                        return;
1008:                    }
1009:                } else {
1010:                    if (log.isTraceEnabled())
1011:                        log
1012:                                .trace(new StringBuffer(
1013:                                        "message does not have a transport header, msg is ")
1014:                                        .append(msg).append(", headers are ")
1015:                                        .append(msg.getHeaders()).append(
1016:                                                ", will be discarded"));
1017:                    return;
1018:                }
1019:                passUp(evt);
1020:            }
1021:
1022:            /** Internal method to serialize and send a message. This method is not reentrant */
1023:            private void send(Message msg, Address dest, boolean multicast)
1024:                    throws Exception {
1025:                if (enable_bundling) {
1026:                    bundler.send(msg, dest);
1027:                    return;
1028:                }
1029:
1030:                // Needs to be synchronized because we can have possible concurrent access, e.g.
1031:                // Discovery uses a separate thread to send out discovery messages
1032:                // We would *not* need to sync between send(), OutgoingPacketHandler and BundlingOutgoingPacketHandler,
1033:                // because only *one* of them is enabled
1034:                Buffer buf;
1035:                synchronized (out_stream) {
1036:                    buf = messageToBuffer(msg, multicast);
1037:                    doSend(buf, dest, multicast);
1038:                }
1039:            }
1040:
1041:            private void doSend(Buffer buf, Address dest, boolean multicast)
1042:                    throws Exception {
1043:                if (stats) {
1044:                    num_msgs_sent++;
1045:                    num_bytes_sent += buf.getLength();
1046:                }
1047:                if (multicast) {
1048:                    sendToAllMembers(buf.getBuf(), buf.getOffset(), buf
1049:                            .getLength());
1050:                } else {
1051:                    sendToSingleMember(dest, buf.getBuf(), buf.getOffset(), buf
1052:                            .getLength());
1053:                }
1054:            }
1055:
1056:            /**
1057:             * This method needs to be synchronized on out_stream when it is called
1058:             * @param msg
1059:             * @return
1060:             * @throws java.io.IOException
1061:             */
1062:            private Buffer messageToBuffer(Message msg, boolean multicast)
1063:                    throws Exception {
1064:                Buffer retval;
1065:                byte flags = 0;
1066:
1067:                out_stream.reset();
1068:                buf_out_stream.reset(out_stream.getCapacity());
1069:                dos.reset();
1070:                dos.writeShort(Version.version); // write the version
1071:                if (multicast)
1072:                    flags += MULTICAST;
1073:                dos.writeByte(flags);
1074:                // preMarshalling(msg, dest, src);  // allows for optimization by subclass
1075:                msg.writeTo(dos);
1076:                // postMarshalling(msg, dest, src); // allows for optimization by subclass
1077:                dos.flush();
1078:                retval = new Buffer(out_stream.getRawBuffer(), 0, out_stream
1079:                        .size());
1080:                return retval;
1081:            }
1082:
1083:            private Message bufferToMessage(DataInputStream instream,
1084:                    Address dest, Address sender, boolean multicast)
1085:                    throws Exception {
1086:                Message msg = new Message(false); // don't create headers, readFrom() will do this
1087:                msg.readFrom(instream);
1088:                postUnmarshalling(msg, dest, sender, multicast); // allows for optimization by subclass
1089:                return msg;
1090:            }
1091:
1092:            private Buffer listToBuffer(List l, boolean multicast)
1093:                    throws Exception {
1094:                Buffer retval;
1095:                Address src;
1096:                Message msg;
1097:                byte flags = 0;
1098:                int len = l != null ? l.size() : 0;
1099:                boolean src_written = false;
1100:                out_stream.reset();
1101:                buf_out_stream.reset(out_stream.getCapacity());
1102:                dos.reset();
1103:                dos.writeShort(Version.version);
1104:                flags += LIST;
1105:                if (multicast)
1106:                    flags += MULTICAST;
1107:                dos.writeByte(flags);
1108:                dos.writeInt(len);
1109:                for (Enumeration en = l.elements(); en.hasMoreElements();) {
1110:                    msg = (Message) en.nextElement();
1111:                    src = msg.getSrc();
1112:                    if (!src_written) {
1113:                        Util.writeAddress(src, dos);
1114:                        src_written = true;
1115:                    }
1116:                    // msg.setSrc(null);
1117:                    msg.writeTo(dos);
1118:                    // msg.setSrc(src);
1119:                }
1120:                dos.flush();
1121:                retval = new Buffer(out_stream.getRawBuffer(), 0, out_stream
1122:                        .size());
1123:                return retval;
1124:            }
1125:
1126:            private List bufferToList(DataInputStream instream, Address dest,
1127:                    boolean multicast) throws Exception {
1128:                List l = new List();
1129:                DataInputStream in = null;
1130:                int len;
1131:                Message msg;
1132:                Address src;
1133:
1134:                try {
1135:                    len = instream.readInt();
1136:                    src = Util.readAddress(instream);
1137:                    for (int i = 0; i < len; i++) {
1138:                        msg = new Message(false); // don't create headers, readFrom() will do this
1139:                        msg.readFrom(instream);
1140:                        postUnmarshallingList(msg, dest, multicast);
1141:                        msg.setSrc(src);
1142:                        l.add(msg);
1143:                    }
1144:                    return l;
1145:                } finally {
1146:                    Util.close(in);
1147:                }
1148:            }
1149:
1150:            /**
1151:             *
1152:             * @param s
1153:             * @return List<NetworkInterface>
1154:             */
1155:            private java.util.List parseInterfaceList(String s)
1156:                    throws Exception {
1157:                java.util.List interfaces = new ArrayList(10);
1158:                if (s == null)
1159:                    return null;
1160:
1161:                StringTokenizer tok = new StringTokenizer(s, ",");
1162:                String interface_name;
1163:                NetworkInterface intf;
1164:
1165:                while (tok.hasMoreTokens()) {
1166:                    interface_name = tok.nextToken();
1167:
1168:                    // try by name first (e.g. (eth0")
1169:                    intf = NetworkInterface.getByName(interface_name);
1170:
1171:                    // next try by IP address or symbolic name
1172:                    if (intf == null)
1173:                        intf = NetworkInterface.getByInetAddress(InetAddress
1174:                                .getByName(interface_name));
1175:
1176:                    if (intf == null)
1177:                        throw new Exception("interface " + interface_name
1178:                                + " not found");
1179:                    if (interfaces.contains(intf)) {
1180:                        log.warn("did not add interface " + interface_name
1181:                                + " (already present in " + print(interfaces)
1182:                                + ")");
1183:                    } else {
1184:                        interfaces.add(intf);
1185:                    }
1186:                }
1187:                return interfaces;
1188:            }
1189:
1190:            private static String print(java.util.List interfaces) {
1191:                StringBuffer sb = new StringBuffer();
1192:                boolean first = true;
1193:                NetworkInterface intf;
1194:                for (Iterator it = interfaces.iterator(); it.hasNext();) {
1195:                    intf = (NetworkInterface) it.next();
1196:                    if (first) {
1197:                        first = false;
1198:                    } else {
1199:                        sb.append(", ");
1200:                    }
1201:                    sb.append(intf.getName());
1202:                }
1203:                return sb.toString();
1204:            }
1205:
1206:            protected void handleDownEvent(Event evt) {
1207:                switch (evt.getType()) {
1208:
1209:                case Event.TMP_VIEW:
1210:                case Event.VIEW_CHANGE:
1211:                    synchronized (members) {
1212:                        view = (View) evt.getArg();
1213:                        members.clear();
1214:                        Vector tmpvec = view.getMembers();
1215:                        members.addAll(tmpvec);
1216:                    }
1217:                    break;
1218:
1219:                case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
1220:                    passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
1221:                    break;
1222:
1223:                case Event.CONNECT:
1224:                    channel_name = (String) evt.getArg();
1225:                    header = new TpHeader(channel_name);
1226:                    setThreadNames();
1227:
1228:                    // removed March 18 2003 (bela), not needed (handled by GMS)
1229:                    // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
1230:                    // be needed if we run without GMS though
1231:                    passUp(new Event(Event.CONNECT_OK));
1232:                    break;
1233:
1234:                case Event.DISCONNECT:
1235:                    unsetThreadNames();
1236:                    passUp(new Event(Event.DISCONNECT_OK));
1237:                    break;
1238:
1239:                case Event.CONFIG:
1240:                    if (log.isDebugEnabled())
1241:                        log.debug("received CONFIG event: " + evt.getArg());
1242:                    handleConfigEvent((HashMap) evt.getArg());
1243:                    break;
1244:                }
1245:            }
1246:
1247:            protected void setThreadNames() {
1248:                if (channel_name != null) {
1249:                    String tmp, prefix = Global.THREAD_PREFIX;
1250:                    if (incoming_packet_handler != null) {
1251:                        tmp = incoming_packet_handler.getName();
1252:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
1253:                            tmp += prefix + channel_name + ")";
1254:                            incoming_packet_handler.setName(tmp);
1255:                        }
1256:                    }
1257:                    if (incoming_msg_handler != null) {
1258:                        tmp = incoming_msg_handler.getName();
1259:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
1260:                            tmp += prefix + channel_name + ")";
1261:                            incoming_msg_handler.setName(tmp);
1262:                        }
1263:                    }
1264:                    if (outgoing_packet_handler != null) {
1265:                        tmp = outgoing_packet_handler.getName();
1266:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
1267:                            tmp += prefix + channel_name + ")";
1268:                            outgoing_packet_handler.setName(tmp);
1269:                        }
1270:                    }
1271:                    if (diag_handler != null) {
1272:                        tmp = diag_handler.getName();
1273:                        if (tmp != null && tmp.indexOf(prefix) == -1) {
1274:                            tmp += prefix + channel_name + ")";
1275:                            diag_handler.setName(tmp);
1276:                        }
1277:                    }
1278:                }
1279:            }
1280:
1281:            protected void unsetThreadNames() {
1282:                if (channel_name != null) {
1283:                    String tmp, prefix = Global.THREAD_PREFIX;
1284:                    int index;
1285:
1286:                    tmp = incoming_packet_handler != null ? incoming_packet_handler
1287:                            .getName()
1288:                            : null;
1289:                    if (tmp != null) {
1290:                        index = tmp.indexOf(prefix);
1291:                        if (index > -1) {
1292:                            tmp = tmp.substring(0, index);
1293:                            incoming_packet_handler.setName(tmp);
1294:                        }
1295:                    }
1296:
1297:                    tmp = incoming_msg_handler != null ? incoming_msg_handler
1298:                            .getName() : null;
1299:                    if (tmp != null) {
1300:                        index = tmp.indexOf(prefix);
1301:                        if (index > -1) {
1302:                            tmp = tmp.substring(0, index);
1303:                            incoming_msg_handler.setName(tmp);
1304:                        }
1305:                    }
1306:
1307:                    tmp = outgoing_packet_handler != null ? outgoing_packet_handler
1308:                            .getName()
1309:                            : null;
1310:                    if (tmp != null) {
1311:                        index = tmp.indexOf(prefix);
1312:                        if (index > -1) {
1313:                            tmp = tmp.substring(0, index);
1314:                            outgoing_packet_handler.setName(tmp);
1315:                        }
1316:                    }
1317:                    tmp = diag_handler != null ? diag_handler.getName() : null;
1318:                    if (tmp != null) {
1319:                        index = tmp.indexOf(prefix);
1320:                        if (index > -1) {
1321:                            tmp = tmp.substring(0, index);
1322:                            diag_handler.setName(tmp);
1323:                        }
1324:                    }
1325:                }
1326:            }
1327:
1328:            protected void handleConfigEvent(HashMap map) {
1329:                if (map == null)
1330:                    return;
1331:                if (map.containsKey("additional_data")) {
1332:                    additional_data = (byte[]) map.get("additional_data");
1333:                    if (local_addr instanceof  IpAddress)
1334:                        ((IpAddress) local_addr)
1335:                                .setAdditionalData(additional_data);
1336:                }
1337:            }
1338:
1339:            /* ----------------------------- End of Private Methods ---------------------------------------- */
1340:
1341:            /* ----------------------------- Inner Classes ---------------------------------------- */
1342:
1343:            static class IncomingQueueEntry {
1344:                Address dest = null;
1345:                Address sender = null;
1346:                byte[] buf;
1347:                int offset, length;
1348:
1349:                IncomingQueueEntry(Address dest, Address sender, byte[] buf,
1350:                        int offset, int length) {
1351:                    this .dest = dest;
1352:                    this .sender = sender;
1353:                    this .buf = buf;
1354:                    this .offset = offset;
1355:                    this .length = length;
1356:                }
1357:            }
1358:
1359:            /**
1360:             * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
1361:             * to the higher layer (done in handleIncomingUdpPacket()).
1362:             */
1363:            class IncomingPacketHandler implements  Runnable {
1364:                Thread t = null;
1365:
1366:                String getName() {
1367:                    return t != null ? t.getName() : null;
1368:                }
1369:
1370:                void setName(String thread_name) {
1371:                    if (t != null)
1372:                        t.setName(thread_name);
1373:                }
1374:
1375:                void start() {
1376:                    if (t == null || !t.isAlive()) {
1377:                        t = new Thread(Util.getGlobalThreadGroup(), this ,
1378:                                "IncomingPacketHandler");
1379:                        t.setDaemon(true);
1380:                        t.start();
1381:                    }
1382:                }
1383:
1384:                void stop() {
1385:                    Thread tmp = t;
1386:                    t = null;
1387:                    incoming_packet_queue.close(true); // should terminate the packet_handler thread too
1388:                    if (tmp != null) {
1389:                        try {
1390:                            tmp.join(10000);
1391:                        } catch (InterruptedException e) {
1392:                        }
1393:                        if (tmp.isAlive()) {
1394:                            if (log.isWarnEnabled())
1395:                                log
1396:                                        .warn("IncomingPacketHandler thread was interrupted, but is still alive");
1397:                        }
1398:                    }
1399:                }
1400:
1401:                public void run() {
1402:                    IncomingQueueEntry entry;
1403:                    while (!incoming_packet_queue.closed()
1404:                            && Thread.currentThread().equals(t)) {
1405:                        try {
1406:                            entry = (IncomingQueueEntry) incoming_packet_queue
1407:                                    .remove();
1408:                            handleIncomingPacket(entry.dest, entry.sender,
1409:                                    entry.buf, entry.offset, entry.length);
1410:                        } catch (QueueClosedException closed_ex) {
1411:                            break;
1412:                        } catch (Throwable ex) {
1413:                            if (log.isErrorEnabled())
1414:                                log.error("error processing incoming packet",
1415:                                        ex);
1416:                        }
1417:                    }
1418:                    if (log.isTraceEnabled())
1419:                        log.trace("incoming packet handler terminating");
1420:                }
1421:            }
1422:
1423:            class IncomingMessageHandler implements  Runnable {
1424:                Thread t;
1425:                int i = 0;
1426:
1427:                String getName() {
1428:                    return t != null ? t.getName() : null;
1429:                }
1430:
1431:                void setName(String thread_name) {
1432:                    if (t != null)
1433:                        t.setName(thread_name);
1434:                }
1435:
1436:                public void start() {
1437:                    if (t == null || !t.isAlive()) {
1438:                        t = new Thread(Util.getGlobalThreadGroup(), this ,
1439:                                "IncomingMessageHandler");
1440:                        t.setDaemon(true);
1441:                        t.start();
1442:                    }
1443:                }
1444:
1445:                public void stop() {
1446:                    incoming_msg_queue.close(true);
1447:                    t = null;
1448:                }
1449:
1450:                public void run() {
1451:                    Message msg;
1452:                    while (!incoming_msg_queue.closed()
1453:                            && Thread.currentThread().equals(t)) {
1454:                        try {
1455:                            msg = (Message) incoming_msg_queue.remove();
1456:                            handleIncomingMessage(msg);
1457:                        } catch (QueueClosedException closed_ex) {
1458:                            break;
1459:                        } catch (Throwable ex) {
1460:                            if (log.isErrorEnabled())
1461:                                log.error("error processing incoming message",
1462:                                        ex);
1463:                        }
1464:                    }
1465:                    if (log.isTraceEnabled())
1466:                        log.trace("incoming message handler terminating");
1467:                }
1468:            }
1469:
1470:            /**
1471:             * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them
1472:             * using the unicast or multicast socket
1473:             */
1474:            class OutgoingPacketHandler implements  Runnable {
1475:                Thread t = null;
1476:                byte[] buf;
1477:                DatagramPacket packet;
1478:
1479:                String getName() {
1480:                    return t != null ? t.getName() : null;
1481:                }
1482:
1483:                void setName(String thread_name) {
1484:                    if (t != null)
1485:                        t.setName(thread_name);
1486:                }
1487:
1488:                void start() {
1489:                    if (t == null || !t.isAlive()) {
1490:                        t = new Thread(Util.getGlobalThreadGroup(), this ,
1491:                                "OutgoingPacketHandler");
1492:                        t.setDaemon(true);
1493:                        t.start();
1494:                    }
1495:                }
1496:
1497:                void stop() {
1498:                    Thread tmp = t;
1499:                    t = null;
1500:                    if (tmp != null) {
1501:                        tmp.interrupt();
1502:                    }
1503:                }
1504:
1505:                public void run() {
1506:                    Message msg;
1507:
1508:                    while (t != null && Thread.currentThread().equals(t)) {
1509:                        try {
1510:                            msg = (Message) outgoing_queue.take();
1511:                            handleMessage(msg);
1512:                        } catch (QueueClosedException closed_ex) {
1513:                            break;
1514:                        } catch (InterruptedException interruptedEx) {
1515:                        } catch (Throwable th) {
1516:                            if (log.isErrorEnabled())
1517:                                log.error("exception sending packet", th);
1518:                        }
1519:                        msg = null; // let's give the garbage collector a hand... this is probably useless though
1520:                    }
1521:                    if (log.isTraceEnabled())
1522:                        log.trace("outgoing message handler terminating");
1523:                }
1524:
1525:                protected void handleMessage(Message msg) throws Throwable {
1526:                    Address dest = msg.getDest();
1527:                    send(msg, dest, dest == null || dest.isMulticastAddress());
1528:                }
1529:
1530:            }
1531:
1532:            /**
1533:             * Bundles smaller messages into bigger ones. Collects messages in a list until
1534:             * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until
1535:             * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages
1536:             * are unbundled at the receiver.
1537:             */
1538:            //    private class BundlingOutgoingPacketHandler extends OutgoingPacketHandler {
1539:            //        /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1540:            //        final HashMap       msgs=new HashMap(11);
1541:            //        long                count=0;    // current number of bytes accumulated
1542:            //        int                 num_msgs=0;
1543:            //        long                start=0;
1544:            //        long                wait_time=0; // wait for removing messages from the queue
1545:            //
1546:            //
1547:            //
1548:            //        private void init() {
1549:            //            wait_time=start=count=0;
1550:            //        }
1551:            //
1552:            //        void start() {
1553:            //            init();
1554:            //            super.start();
1555:            //            t.setName("BundlingOutgoingPacketHandler");
1556:            //        }
1557:            //
1558:            //        void stop() {
1559:            //            // bundleAndSend();
1560:            //            super.stop();
1561:            //        }
1562:            //
1563:            //        public void run() {
1564:            //            Message msg;
1565:            //            long    length;
1566:            //            while(t != null && Thread.currentThread().equals(t)) {
1567:            //                try {
1568:            //                    msg=(Message)outgoing_queue.poll(wait_time);
1569:            //                    if(msg == null)
1570:            //                        throw new TimeoutException();
1571:            //                    length=msg.size();
1572:            //                    checkLength(length);
1573:            //                    if(start == 0)
1574:            //                        start=System.currentTimeMillis();
1575:            //
1576:            //                    if(count + length >= max_bundle_size) {
1577:            //                        bundleAndSend();
1578:            //                        count=0;
1579:            //                        start=System.currentTimeMillis();
1580:            //                    }
1581:            //
1582:            //                    addMessage(msg);
1583:            //                    count+=length;
1584:            //
1585:            //                    wait_time=max_bundle_timeout - (System.currentTimeMillis() - start);
1586:            //                    if(wait_time <= 0) {
1587:            //                        bundleAndSend();
1588:            //                        init();
1589:            //                    }
1590:            //                }
1591:            //                catch(QueueClosedException queue_closed_ex) {
1592:            //                    bundleAndSend();
1593:            //                    break;
1594:            //                }
1595:            //                catch(TimeoutException timeout_ex) {
1596:            //                    bundleAndSend();
1597:            //                    init();
1598:            //                }
1599:            //                catch(Throwable ex) {
1600:            //                    log.error("failure in bundling", ex);
1601:            //                }
1602:            //            }
1603:            //            if(log.isTraceEnabled()) log.trace("BundlingOutgoingPacketHandler thread terminated");
1604:            //        }
1605:            //
1606:            //
1607:            //
1608:            //
1609:            //        private void checkLength(long len) throws Exception {
1610:            //            if(len > max_bundle_size)
1611:            //                throw new Exception("message size (" + len + ") is greater than max bundling size (" + max_bundle_size +
1612:            //                        "). Set the fragmentation/bundle size in FRAG and TP correctly");
1613:            //        }
1614:            //
1615:            //
1616:            //        private void addMessage(Message msg) { // no sync needed, never called by multiple threads concurrently
1617:            //            List    tmp;
1618:            //            Address dst=msg.getDest();
1619:            //            tmp=(List)msgs.get(dst);
1620:            //            if(tmp == null) {
1621:            //                tmp=new List();
1622:            //                msgs.put(dst, tmp);
1623:            //            }
1624:            //            tmp.add(msg);
1625:            //            num_msgs++;
1626:            //        }
1627:            //
1628:            //
1629:            //
1630:            //        private void bundleAndSend() {
1631:            //            Map.Entry      entry;
1632:            //            Address        dst;
1633:            //            Buffer         buffer;
1634:            //            List           l;
1635:            //            long           stop_time=System.currentTimeMillis();
1636:            //
1637:            //            if(msgs.size() == 0)
1638:            //                return;
1639:            //
1640:            //            try {
1641:            //                if(log.isTraceEnabled()) {
1642:            //                    StringBuffer sb=new StringBuffer("sending ").append(num_msgs).append(" msgs (");
1643:            //                    sb.append(count).append(" bytes, ").append(stop_time-start).append("ms)");
1644:            //                    sb.append(" to ").append(msgs.size()).append(" destination(s)");
1645:            //                    if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
1646:            //                    log.trace(sb.toString());
1647:            //                }
1648:            //                boolean multicast;
1649:            //                for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) {
1650:            //                    entry=(Map.Entry)it.next();
1651:            //                    l=(List)entry.getValue();
1652:            //                    if(l.size() == 0)
1653:            //                        continue;
1654:            //                    dst=(Address)entry.getKey();
1655:            //                    multicast=dst == null || dst.isMulticastAddress();
1656:            //                    synchronized(out_stream) {
1657:            //                        try {
1658:            //                            buffer=listToBuffer(l, multicast);
1659:            //                            doSend(buffer, dst, multicast);
1660:            //                        }
1661:            //                        catch(Throwable e) {
1662:            //                            if(log.isErrorEnabled()) log.error("exception sending msg", e);
1663:            //                        }
1664:            //                    }
1665:            //                }
1666:            //            }
1667:            //            finally {
1668:            //                msgs.clear();
1669:            //                num_msgs=0;
1670:            //            }
1671:            //        }
1672:            //    }
1673:
1674:            private class Bundler {
1675:                /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1676:                final HashMap msgs = new HashMap(36);
1677:                long count = 0; // current number of bytes accumulated
1678:                int num_msgs = 0;
1679:                long start = 0;
1680:                BundlingTimer bundling_timer = null;
1681:
1682:                private synchronized void send(Message msg, Address dest)
1683:                        throws Exception {
1684:                    long length = msg.size();
1685:                    checkLength(length);
1686:
1687:                    if (start == 0)
1688:                        start = System.currentTimeMillis();
1689:                    if (count + length >= max_bundle_size) {
1690:                        cancelTimer();
1691:                        bundleAndSend(); // clears msgs and resets num_msgs
1692:                    }
1693:
1694:                    addMessage(msg, dest);
1695:                    count += length;
1696:                    startTimer(); // start timer if not running
1697:                }
1698:
1699:                /** Never called concurrently with cancelTimer - no need for synchronization */
1700:                private void startTimer() {
1701:                    if (bundling_timer == null || bundling_timer.cancelled()) {
1702:                        bundling_timer = new BundlingTimer();
1703:                        timer.add(bundling_timer);
1704:                    }
1705:                }
1706:
1707:                /** Never called concurrently with startTimer() - no need for synchronization */
1708:                private void cancelTimer() {
1709:                    if (bundling_timer != null) {
1710:                        bundling_timer.cancel();
1711:                        bundling_timer = null;
1712:                    }
1713:                }
1714:
1715:                private void addMessage(Message msg, Address dest) { // no sync needed, never called by multiple threads concurrently
1716:                    List tmp;
1717:                    synchronized (msgs) {
1718:                        tmp = (List) msgs.get(dest);
1719:                        if (tmp == null) {
1720:                            tmp = new List();
1721:                            msgs.put(dest, tmp);
1722:                        }
1723:                        tmp.add(msg);
1724:                        num_msgs++;
1725:                    }
1726:                }
1727:
1728:                private void bundleAndSend() {
1729:                    Map.Entry entry;
1730:                    Address dst;
1731:                    Buffer buffer;
1732:                    List l;
1733:                    Map copy;
1734:
1735:                    synchronized (msgs) {
1736:                        if (msgs.size() == 0)
1737:                            return;
1738:                        copy = new HashMap(msgs);
1739:                        if (log.isTraceEnabled()) {
1740:                            long stop = System.currentTimeMillis();
1741:                            double percentage = 100.0 / max_bundle_size * count;
1742:                            StringBuffer sb = new StringBuffer("sending ")
1743:                                    .append(num_msgs).append(" msgs (");
1744:                            num_msgs = 0;
1745:                            sb
1746:                                    .append(count)
1747:                                    .append(
1748:                                            " bytes ("
1749:                                                    + f.format(percentage)
1750:                                                    + "% of max_bundle_size), collected in "
1751:                                                    + +(stop - start)
1752:                                                    + "ms) to ").append(
1753:                                            copy.size()).append(
1754:                                            " destination(s)");
1755:                            if (copy.size() > 1)
1756:                                sb.append(" (dests=").append(copy.keySet())
1757:                                        .append(")");
1758:                            log.trace(sb.toString());
1759:                        }
1760:                        msgs.clear();
1761:                        count = 0;
1762:                    }
1763:
1764:                    try {
1765:                        boolean multicast;
1766:                        for (Iterator it = copy.entrySet().iterator(); it
1767:                                .hasNext();) {
1768:                            entry = (Map.Entry) it.next();
1769:                            l = (List) entry.getValue();
1770:                            if (l.size() == 0)
1771:                                continue;
1772:                            dst = (Address) entry.getKey();
1773:                            multicast = dst == null || dst.isMulticastAddress();
1774:                            synchronized (out_stream) {
1775:                                try {
1776:                                    buffer = listToBuffer(l, multicast);
1777:                                    doSend(buffer, dst, multicast);
1778:                                } catch (Throwable e) {
1779:                                    if (log.isErrorEnabled())
1780:                                        log.error("exception sending msg: "
1781:                                                + e.toString(), e.getCause());
1782:                                }
1783:                            }
1784:                        }
1785:                    } finally {
1786:                        start = 0;
1787:                    }
1788:                }
1789:
1790:                private void checkLength(long len) throws Exception {
1791:                    if (len > max_bundle_size)
1792:                        throw new Exception(
1793:                                "message size ("
1794:                                        + len
1795:                                        + ") is greater than max bundling size ("
1796:                                        + max_bundle_size
1797:                                        + "). Set the fragmentation/bundle size in FRAG and TP correctly");
1798:                }
1799:
1800:                private class BundlingTimer implements  TimeScheduler.Task {
1801:                    boolean cancelled = false;
1802:
1803:                    void cancel() {
1804:                        cancelled = true;
1805:                    }
1806:
1807:                    public boolean cancelled() {
1808:                        return cancelled;
1809:                    }
1810:
1811:                    public long nextInterval() {
1812:                        return max_bundle_timeout;
1813:                    }
1814:
1815:                    public void run() {
1816:                        bundleAndSend();
1817:                        cancelled = true;
1818:                    }
1819:                }
1820:            }
1821:
1822:            private class DiagnosticsHandler implements  Runnable {
1823:                Thread t = null;
1824:                MulticastSocket diag_sock = null;
1825:
1826:                DiagnosticsHandler() {
1827:                }
1828:
1829:                String getName() {
1830:                    return t != null ? t.getName() : null;
1831:                }
1832:
1833:                void setName(String thread_name) {
1834:                    if (t != null)
1835:                        t.setName(thread_name);
1836:                }
1837:
1838:                void start() throws IOException {
1839:                    diag_sock = new MulticastSocket(diagnostics_port);
1840:                    java.util.List interfaces = Util
1841:                            .getAllAvailableInterfaces();
1842:                    bindToInterfaces(interfaces, diag_sock);
1843:
1844:                    if (t == null || !t.isAlive()) {
1845:                        t = new Thread(Util.getGlobalThreadGroup(), this ,
1846:                                "DiagnosticsHandler");
1847:                        t.setDaemon(true);
1848:                        t.start();
1849:                    }
1850:                }
1851:
1852:                void stop() {
1853:                    if (diag_sock != null)
1854:                        diag_sock.close();
1855:                    t = null;
1856:                }
1857:
1858:                public void run() {
1859:                    byte[] buf = new byte[1500]; // MTU on most LANs
1860:                    DatagramPacket packet;
1861:                    while (!diag_sock.isClosed()
1862:                            && Thread.currentThread().equals(t)) {
1863:                        packet = new DatagramPacket(buf, 0, buf.length);
1864:                        try {
1865:                            diag_sock.receive(packet);
1866:                            handleDiagnosticProbe(packet.getSocketAddress(),
1867:                                    diag_sock, new String(packet.getData(),
1868:                                            packet.getOffset(), packet
1869:                                                    .getLength()));
1870:                        } catch (IOException e) {
1871:                        }
1872:                    }
1873:                }
1874:
1875:                private void bindToInterfaces(java.util.List interfaces,
1876:                        MulticastSocket s) {
1877:                    SocketAddress group_addr = new InetSocketAddress(
1878:                            diagnostics_addr, diagnostics_port);
1879:                    for (Iterator it = interfaces.iterator(); it.hasNext();) {
1880:                        NetworkInterface i = (NetworkInterface) it.next();
1881:                        try {
1882:                            if (i.getInetAddresses().hasMoreElements()) { // fix for VM crash - suggested by JJalenak@netopia.com
1883:                                s.joinGroup(group_addr, i);
1884:                                if (log.isTraceEnabled())
1885:                                    log.trace("joined " + group_addr + " on "
1886:                                            + i.getName());
1887:                            }
1888:                        } catch (IOException e) {
1889:                            log.warn("failed to join " + group_addr + " on "
1890:                                    + i.getName() + ": " + e);
1891:                        }
1892:                    }
1893:                }
1894:            }
1895:
1896:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.