Source Code Cross Referenced for MERGE.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: MERGE.java,v 1.13.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.*;
006:        import org.jgroups.stack.Protocol;
007:        import org.jgroups.stack.RouterStub;
008:        import org.jgroups.util.Util;
009:
010:        import java.io.IOException;
011:        import java.io.ObjectInput;
012:        import java.io.ObjectOutput;
013:        import java.util.*;
014:
015:        /**
016:         * Simple and stupid MERGE protocol (does not take into account state transfer).
017:         * Periodically mcasts a HELLO message with its own address. When a HELLO message is
018:         * received from a member that has the same group (UDP discards all messages with a group
019:         * name different that our own), but is not currently in the group, a MERGE event is sent
020:         * up the stack.  The protocol starts working upon receiving a View in which it is the coordinator.
021:         *
022:         * @author Gianluca Collot, Jan 2001
023:         */
024:        public class MERGE extends Protocol implements  Runnable {
025:            final Vector members = new Vector();
026:            Address local_addr = null;
027:            String group_addr = null;
028:            final String groupname = null;
029:            Thread hello_thread = null; // thread that periodically mcasts HELLO messages
030:            long timeout = 5000; // timeout between mcasting of HELLO messages
031:
032:            String router_host = null;
033:            int router_port = 0;
034:
035:            RouterStub client = null;
036:            boolean is_server = false;
037:            boolean is_coord = false;
038:            boolean merging = false;
039:
040:            public String getName() {
041:                return "MERGE";
042:            }
043:
044:            public boolean setProperties(Properties props) {
045:                String str;
046:
047:                super .setProperties(props);
048:                str = props.getProperty("timeout"); // max time to wait for initial members
049:                if (str != null) {
050:                    timeout = Long.parseLong(str);
051:                    props.remove("timeout");
052:                }
053:
054:                str = props.getProperty("router_host"); // host to send gossip queries (if gossip enabled)
055:                if (str != null) {
056:                    router_host = str;
057:                    props.remove("router_host");
058:                }
059:
060:                str = props.getProperty("router_port");
061:                if (str != null) {
062:                    router_port = Integer.parseInt(str);
063:                    props.remove("router_port");
064:                }
065:
066:                if (router_host != null && router_port != 0)
067:                    client = new RouterStub(router_host, router_port);
068:
069:                if (props.size() > 0) {
070:                    log.error("the following properties are not recognized: "
071:                            + props);
072:                    return false;
073:                }
074:                return true;
075:            }
076:
077:            public void start() throws Exception {
078:                if (hello_thread == null) {
079:                    hello_thread = new Thread(this , "MERGE Thread");
080:                    hello_thread.setDaemon(true);
081:                    hello_thread.start();
082:                }
083:            }
084:
085:            public void stop() {
086:                Thread tmp = null;
087:                if (hello_thread != null && hello_thread.isAlive()) {
088:                    tmp = hello_thread;
089:                    hello_thread = null;
090:                    tmp.interrupt();
091:                    try {
092:                        tmp.join(1000);
093:                    } catch (Exception ex) {
094:                    }
095:                }
096:                hello_thread = null;
097:            }
098:
099:            public void up(Event evt) {
100:                Message msg;
101:                Object obj;
102:                MergeHeader hdr;
103:                Address sender;
104:                boolean contains;
105:                Vector tmp;
106:
107:                switch (evt.getType()) {
108:
109:                case Event.MSG:
110:                    msg = (Message) evt.getArg();
111:                    obj = msg.getHeader(getName());
112:                    if (obj == null || !(obj instanceof  MergeHeader)) {
113:                        passUp(evt);
114:                        return;
115:                    }
116:                    hdr = (MergeHeader) msg.removeHeader(getName());
117:
118:                    switch (hdr.type) {
119:
120:                    case MergeHeader.HELLO: // if coord: handle, else: discard
121:                        if (!is_server || !is_coord) {
122:                            return;
123:                        }
124:                        if (merging) {
125:                            return;
126:                        }
127:                        sender = msg.getSrc();
128:                        if ((sender != null) && (members.size() >= 0)) {
129:                            synchronized (members) {
130:                                contains = members.contains(sender);
131:                            }
132:                            //merge only with lower addresses :prevents cycles and ensures that the new coordinator is correct.
133:                            if (!contains && sender.compareTo(local_addr) < 0) {
134:                                if (log.isInfoEnabled())
135:                                    log.info("membership " + members
136:                                            + " does not contain " + sender
137:                                            + "; merging it");
138:                                tmp = new Vector();
139:                                tmp.addElement(sender);
140:                                merging = true;
141:                                passUp(new Event(Event.MERGE, tmp));
142:                            }
143:                        }
144:                        return;
145:
146:                    default:
147:                        if (log.isErrorEnabled())
148:                            log.error("got MERGE hdr with unknown type ("
149:                                    + hdr.type + ')');
150:                        return;
151:                    }
152:
153:                case Event.SET_LOCAL_ADDRESS:
154:                    local_addr = (Address) evt.getArg();
155:                    passUp(evt);
156:                    break;
157:
158:                default:
159:                    passUp(evt); // Pass up to the layer above us
160:                    break;
161:                }
162:            }
163:
164:            public void down(Event evt) {
165:
166:                switch (evt.getType()) {
167:
168:                case Event.TMP_VIEW:
169:                    passDown(evt);
170:                    break;
171:
172:                case Event.MERGE_DENIED:
173:                    merging = false;
174:                    passDown(evt);
175:                    break;
176:
177:                case Event.VIEW_CHANGE:
178:                    merging = false;
179:                    synchronized (members) {
180:                        members.clear();
181:                        members.addAll(((View) evt.getArg()).getMembers());
182:                        if ((members == null) || (members.size() == 0)) {
183:                            if (log.isFatalEnabled())
184:                                log
185:                                        .fatal("received VIEW_CHANGE with null or empty vector");
186:                            System.exit(6);
187:                        }
188:                    }
189:                    is_coord = members.elementAt(0).equals(local_addr);
190:                    passDown(evt);
191:                    if (is_coord) {
192:                        if (log.isInfoEnabled())
193:                            log.info("start sending Hellos");
194:                        try {
195:                            start();
196:                        } catch (Exception ex) {
197:                            if (log.isWarnEnabled())
198:                                log.warn("exception calling start(): " + ex);
199:                        }
200:                    } else {
201:                        if (log.isInfoEnabled())
202:                            log.info("stop sending Hellos");
203:                        stop();
204:                    }
205:                    break;
206:
207:                case Event.BECOME_SERVER: // called after client has join and is fully working group member
208:                    passDown(evt);
209:                    try {
210:                        start();
211:                        is_server = true;
212:                    } catch (Exception ex) {
213:                        if (log.isWarnEnabled())
214:                            log.warn("exception calling start(): " + ex);
215:                    }
216:                    break;
217:
218:                case Event.CONNECT:
219:                    group_addr = (String) evt.getArg();
220:                    passDown(evt);
221:                    break;
222:
223:                case Event.DISCONNECT:
224:                    if (local_addr != null && evt.getArg() != null
225:                            && local_addr.equals(evt.getArg()))
226:                        stop();
227:                    passDown(evt);
228:                    break;
229:
230:                default:
231:                    passDown(evt); // Pass on to the layer below us
232:                    break;
233:                }
234:            }
235:
236:            /**
237:             * If IP multicast: periodically mcast a HELLO message
238:             * If gossiping: periodically retrieve the membership. Any members not part of our
239:             * own membership are merged (passing MERGE event up).
240:             */
241:            public void run() {
242:                Message hello_msg;
243:                MergeHeader hdr;
244:                List rsps;
245:                Vector members_to_merge = new Vector(), tmp;
246:                Object mbr;
247:
248:                try {
249:                    Thread.sleep(3000);
250:                } /// initial sleep; no premature merging
251:                catch (Exception e) {
252:                }
253:
254:                while (hello_thread != null) {
255:                    Util.sleep(timeout);
256:                    if (hello_thread == null)
257:                        break;
258:
259:                    if (client == null) { // plain IP MCAST
260:                        hello_msg = new Message(null);
261:                        hdr = new MergeHeader(MergeHeader.HELLO);
262:                        hello_msg.putHeader(getName(), hdr);
263:                        passDown(new Event(Event.MSG, hello_msg));
264:                    } else { // gossiping; contact Router
265:                        rsps = client.get(group_addr);
266:
267:                        synchronized (members) {
268:                            members_to_merge.removeAllElements();
269:                            for (Iterator it = rsps.iterator(); it.hasNext();) {
270:                                mbr = it.next();
271:                                if (!members.contains(mbr)) {
272:                                    if (log.isInfoEnabled())
273:                                        log.info("membership " + members
274:                                                + " does not contain " + mbr
275:                                                + "; merging it");
276:                                    members_to_merge.addElement(mbr);
277:                                }
278:                            }
279:                            if (members_to_merge.size() > 0) {
280:                                Membership new_membership = new Membership(
281:                                        members_to_merge);
282:                                new_membership.sort();
283:                                Address coord = (Address) new_membership
284:                                        .elementAt(0);
285:                                tmp = new Vector();
286:                                tmp.addElement(coord);
287:                                if (coord.compareTo(local_addr) < 0)
288:                                    passUp(new Event(Event.MERGE, tmp));
289:                            }
290:                        }
291:                    }
292:                }
293:            }
294:
295:            /* -------------------------- Private methods ---------------------------- */
296:
297:            public static class MergeHeader extends Header {
298:                public static final int HELLO = 1; // arg = null
299:
300:                public int type = 0;
301:
302:                public MergeHeader() {
303:                } // used for externalization
304:
305:                public MergeHeader(int type) {
306:                    this .type = type;
307:                }
308:
309:                public String toString() {
310:                    return "[MERGE: type=" + type2Str(type) + ']';
311:                }
312:
313:                String type2Str(int t) {
314:                    switch (t) {
315:                    case HELLO:
316:                        return "HELLO";
317:                    default:
318:                        return "<unkown type (" + t + ")>";
319:                    }
320:                }
321:
322:                public void writeExternal(ObjectOutput out) throws IOException {
323:                    out.writeInt(type);
324:                }
325:
326:                public void readExternal(ObjectInput in) throws IOException,
327:                        ClassNotFoundException {
328:                    type = in.readInt();
329:                }
330:            }
331:
332:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.