Source Code Cross Referenced for VIEW_SYNC.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.protocols;
002:
003:        import org.jgroups.*;
004:        import org.jgroups.stack.Protocol;
005:        import org.jgroups.util.Streamable;
006:        import org.jgroups.util.TimeScheduler;
007:        import org.jgroups.util.Util;
008:
009:        import java.io.*;
010:        import java.util.Properties;
011:        import java.util.Vector;
012:
013:        /**
014:         * Periodically sends the view to the group. When a view is received which is greater than the current view, we
015:         * install it. Otherwise we simply discard it. This is used to solve the problem for unreliable view
016:         * dissemination outlined in JGroups/doc/ReliableViewInstallation.txt. This protocol is supposed to be just below GMS.
017:         * @author Bela Ban
018:         * @version $Id: VIEW_SYNC.java,v 1.10.2.1 2007/04/27 08:03:52 belaban Exp $
019:         */
020:        public class VIEW_SYNC extends Protocol {
021:            Address local_addr = null;
022:            final Vector mbrs = new Vector();
023:            View my_view = null;
024:            ViewId my_vid = null;
025:
026:            /** Sends a VIEW_SYNC message to the group every 20 seconds on average. 0 disables sending of VIEW_SYNC messages */
027:            long avg_send_interval = 60000;
028:
029:            private int num_views_sent = 0;
030:            private int num_views_adjusted = 0;
031:
032:            private volatile ViewSendTask view_send_task = null; // bcasts periodic STABLE message (added to timer below)
033:            final Object view_send_task_mutex = new Object(); // to sync on stable_task
034:            TimeScheduler timer = null; // to send periodic STABLE msgs (and STABILITY messages)
035:            static final String name = "VIEW_SYNC";
036:
037:            public String getName() {
038:                return name;
039:            }
040:
041:            public long getAverageSendInterval() {
042:                return avg_send_interval;
043:            }
044:
045:            public void setAverageSendInterval(long gossip_interval) {
046:                avg_send_interval = gossip_interval;
047:            }
048:
049:            public int getNumViewsSent() {
050:                return num_views_sent;
051:            }
052:
053:            public int getNumViewsAdjusted() {
054:                return num_views_adjusted;
055:            }
056:
057:            public void resetStats() {
058:                super .resetStats();
059:                num_views_adjusted = num_views_sent = 0;
060:            }
061:
062:            public boolean setProperties(Properties props) {
063:                String str;
064:
065:                super .setProperties(props);
066:
067:                str = props.getProperty("avg_send_interval");
068:                if (str != null) {
069:                    avg_send_interval = Long.parseLong(str);
070:                    props.remove("avg_send_interval");
071:                }
072:
073:                if (props.size() > 0) {
074:                    log.error("these properties are not recognized: " + props);
075:                    return false;
076:                }
077:                return true;
078:            }
079:
080:            public void start() throws Exception {
081:                if (stack != null && stack.timer != null)
082:                    timer = stack.timer;
083:                else
084:                    throw new Exception(
085:                            "timer cannot be retrieved from protocol stack");
086:            }
087:
088:            public void stop() {
089:                stopViewSender();
090:            }
091:
092:            /** Sends a VIEW_SYNC_REQ to all members, every member replies with a VIEW multicast */
093:            public void sendViewRequest() {
094:                Message msg = new Message(null);
095:                ViewSyncHeader hdr = new ViewSyncHeader(
096:                        ViewSyncHeader.VIEW_SYNC_REQ, null);
097:                msg.putHeader(name, hdr);
098:                passDown(new Event(Event.MSG, msg));
099:            }
100:
101:            //    public void sendFakeViewForTestingOnly() {
102:            //        ViewId fake_vid=new ViewId(local_addr, my_vid.getId() +2);
103:            //        View fake_view=new View(fake_vid, new Vector(my_view.getMembers()));
104:            //        System.out.println("sending fake view " + fake_view);
105:            //        my_view=fake_view;
106:            //        my_vid=fake_vid;
107:            //        sendView();
108:            //    }
109:
110:            public void up(Event evt) {
111:                Message msg;
112:                ViewSyncHeader hdr;
113:                int type = evt.getType();
114:
115:                switch (type) {
116:
117:                case Event.MSG:
118:                    msg = (Message) evt.getArg();
119:                    hdr = (ViewSyncHeader) msg.removeHeader(name);
120:                    if (hdr == null)
121:                        break;
122:                    Address sender = msg.getSrc();
123:                    switch (hdr.type) {
124:                    case ViewSyncHeader.VIEW_SYNC:
125:                        handleView(hdr.view, sender);
126:                        break;
127:                    case ViewSyncHeader.VIEW_SYNC_REQ:
128:                        if (!sender.equals(local_addr))
129:                            sendView();
130:                        break;
131:                    default:
132:                        if (log.isErrorEnabled())
133:                            log.error("ViewSyncHeader type " + hdr.type
134:                                    + " not known");
135:                    }
136:                    return;
137:
138:                case Event.VIEW_CHANGE:
139:                    View view = (View) evt.getArg();
140:                    handleViewChange(view);
141:                    break;
142:
143:                case Event.SET_LOCAL_ADDRESS:
144:                    local_addr = (Address) evt.getArg();
145:                    break;
146:                }
147:                passUp(evt);
148:            }
149:
150:            public void down(Event evt) {
151:                switch (evt.getType()) {
152:                case Event.VIEW_CHANGE:
153:                    View v = (View) evt.getArg();
154:                    handleViewChange(v);
155:                    break;
156:                }
157:                passDown(evt);
158:            }
159:
160:            /* --------------------------------------- Private Methods ---------------------------------------- */
161:
162:            private void handleView(View v, Address sender) {
163:                Vector members = v.getMembers();
164:                if (!members.contains(local_addr)) {
165:                    if (log.isWarnEnabled())
166:                        log.warn("discarding view as I (" + local_addr
167:                                + ") am not member of view (" + v + ")");
168:                    return;
169:                }
170:
171:                ViewId vid = v.getVid();
172:                int rc = vid.compareTo(my_vid);
173:                if (rc > 0) { // foreign view is greater than my own view; update my own view !
174:                    if (log.isTraceEnabled())
175:                        log.trace("view from " + sender + " (" + vid
176:                                + ") is greater than my own view (" + my_vid
177:                                + ");" + " will update my own view");
178:
179:                    Message view_change = new Message(local_addr, local_addr,
180:                            null);
181:                    org.jgroups.protocols.pbcast.GMS.GmsHeader hdr;
182:                    hdr = new org.jgroups.protocols.pbcast.GMS.GmsHeader(
183:                            org.jgroups.protocols.pbcast.GMS.GmsHeader.VIEW, v);
184:                    view_change.putHeader(GMS.name, hdr);
185:                    passUp(new Event(Event.MSG, view_change));
186:                    num_views_adjusted++;
187:                }
188:            }
189:
190:            private void handleViewChange(View view) {
191:                Vector tmp = view.getMembers();
192:                if (tmp != null) {
193:                    mbrs.clear();
194:                    mbrs.addAll(tmp);
195:                }
196:                my_view = (View) view.clone();
197:                my_vid = my_view.getVid();
198:                if (my_view.size() > 1
199:                        && (view_send_task == null || !view_send_task.running()))
200:                    startViewSender();
201:            }
202:
203:            private void sendView() {
204:                View tmp = (View) (my_view != null ? my_view.clone() : null);
205:                if (tmp == null)
206:                    return;
207:                Message msg = new Message(null); // send to the group
208:                ViewSyncHeader hdr = new ViewSyncHeader(
209:                        ViewSyncHeader.VIEW_SYNC, tmp);
210:                msg.putHeader(name, hdr);
211:                passDown(new Event(Event.MSG, msg));
212:                num_views_sent++;
213:            }
214:
215:            void startViewSender() {
216:                // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
217:                // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
218:                // 1 cycle: on the next message or view, we will start the task
219:                if (view_send_task != null)
220:                    return;
221:                synchronized (view_send_task_mutex) {
222:                    if (view_send_task != null && view_send_task.running()) {
223:                        return; // already running
224:                    }
225:                    view_send_task = new ViewSendTask();
226:                    timer.add(view_send_task, true); // fixed-rate scheduling
227:                }
228:                if (log.isTraceEnabled())
229:                    log.trace("view send task started");
230:            }
231:
232:            void stopViewSender() {
233:                // contrary to startViewSender(), we don't need double-checked locking here because this method is not
234:                // called frequently
235:                synchronized (view_send_task_mutex) {
236:                    if (view_send_task != null) {
237:                        view_send_task.stop();
238:                        if (log.isTraceEnabled())
239:                            log.trace("view send task stopped");
240:                        view_send_task = null;
241:                    }
242:                }
243:            }
244:
245:            /* ------------------------------------End of Private Methods ------------------------------------- */
246:
247:            public static class ViewSyncHeader extends Header implements 
248:                    Streamable {
249:                public static final int VIEW_SYNC = 1; // contains a view
250:                public static final int VIEW_SYNC_REQ = 2; // request to all members to send their views
251:
252:                int type = 0;
253:                View view = null;
254:
255:                public ViewSyncHeader() {
256:                }
257:
258:                public ViewSyncHeader(int type, View view) {
259:                    this .type = type;
260:                    this .view = view;
261:                }
262:
263:                public int getType() {
264:                    return type;
265:                }
266:
267:                public View getView() {
268:                    return view;
269:                }
270:
271:                static String type2String(int t) {
272:                    switch (t) {
273:                    case VIEW_SYNC:
274:                        return "VIEW_SYNC";
275:                    case VIEW_SYNC_REQ:
276:                        return "VIEW_SYNC_REQ";
277:                    default:
278:                        return "<unknown>";
279:                    }
280:                }
281:
282:                public String toString() {
283:                    StringBuffer sb = new StringBuffer();
284:                    sb.append('[');
285:                    sb.append(type2String(type));
286:                    sb.append("]");
287:                    if (view != null)
288:                        sb.append(", view= ").append(view);
289:                    return sb.toString();
290:                }
291:
292:                public void writeExternal(ObjectOutput out) throws IOException {
293:                    out.writeInt(type);
294:                    if (view == null) {
295:                        out.writeBoolean(false);
296:                        return;
297:                    }
298:                    out.writeBoolean(true);
299:                    view.writeExternal(out);
300:                }
301:
302:                public void readExternal(ObjectInput in) throws IOException,
303:                        ClassNotFoundException {
304:                    type = in.readInt();
305:                    boolean available = in.readBoolean();
306:                    if (available) {
307:                        view = new View();
308:                        view.readExternal(in);
309:                    }
310:                }
311:
312:                public long size() {
313:                    long retval = Global.INT_SIZE + Global.BYTE_SIZE
314:                            + Global.BYTE_SIZE; // type + view type + presence for digest
315:                    if (view != null)
316:                        retval += view.serializedSize();
317:                    return retval;
318:                }
319:
320:                public void writeTo(DataOutputStream out) throws IOException {
321:                    out.writeInt(type);
322:                    // 0 == null, 1 == View, 2 == MergeView
323:                    byte b = (byte) (view == null ? 0
324:                            : (view instanceof  MergeView ? 2 : 1));
325:                    out.writeByte(b);
326:                    Util.writeStreamable(view, out);
327:                }
328:
329:                public void readFrom(DataInputStream in) throws IOException,
330:                        IllegalAccessException, InstantiationException {
331:                    type = in.readInt();
332:                    byte b = in.readByte();
333:                    Class clazz = b == 2 ? MergeView.class : View.class;
334:                    view = (View) Util.readStreamable(clazz, in);
335:                }
336:
337:            }
338:
339:            /**
340:             Periodically multicasts a View_SYNC message
341:             */
342:            private class ViewSendTask implements  TimeScheduler.Task {
343:                boolean stopped = false;
344:
345:                public void stop() {
346:                    stopped = true;
347:                }
348:
349:                public boolean running() { // syntactic sugar
350:                    return !stopped;
351:                }
352:
353:                public boolean cancelled() {
354:                    return stopped;
355:                }
356:
357:                public long nextInterval() {
358:                    long interval = computeSleepTime();
359:                    if (interval <= 0)
360:                        return 10000;
361:                    else
362:                        return interval;
363:                }
364:
365:                public void run() {
366:                    sendView();
367:                }
368:
369:                long computeSleepTime() {
370:                    int num_mbrs = Math.max(mbrs.size(), 1);
371:                    return getRandom((num_mbrs * avg_send_interval * 2));
372:                }
373:
374:                long getRandom(long range) {
375:                    return (long) ((Math.random() * range) % range);
376:                }
377:            }
378:
379:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.