Source Code Cross Referenced for FRAG.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        // $Id: FRAG.java,v 1.32.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003:        package org.jgroups.protocols;
004:
005:        import org.jgroups.Address;
006:        import org.jgroups.Event;
007:        import org.jgroups.Message;
008:        import org.jgroups.View;
009:        import org.jgroups.stack.Protocol;
010:        import org.jgroups.util.ExposedByteArrayOutputStream;
011:        import org.jgroups.util.Util;
012:
013:        import java.io.ByteArrayInputStream;
014:        import java.io.DataInputStream;
015:        import java.io.DataOutputStream;
016:        import java.util.*;
017:
018:        /**
019:         * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets.
020:         * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
021:         * to the messages as a header (and removed at the receiving side).<p>
022:         * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
023:         * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the
024:         * fragement ID which ranges from 0 to number_of_fragments-1.<p>
025:         * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
026:         * multicast messages.
027:         * @author Bela Ban
028:         * @author Filip Hanik
029:         * @version $Id: FRAG.java,v 1.32.2.1 2007/04/27 08:03:51 belaban Exp $
030:         */
031:        public class FRAG extends Protocol {
032:            private int frag_size = 8192; // conservative value
033:
034:            /*the fragmentation list contains a fragmentation table per sender
035:             *this way it becomes easier to clean up if a sender (member) leaves or crashes
036:             */
037:            private final FragmentationList fragment_list = new FragmentationList();
038:            private int curr_id = 1;
039:            private final ExposedByteArrayOutputStream bos = new ExposedByteArrayOutputStream(
040:                    1024);
041:            private final Vector members = new Vector(11);
042:            private final static String name = "FRAG";
043:
044:            long num_sent_msgs = 0;
045:            long num_sent_frags = 0;
046:            long num_received_msgs = 0;
047:            long num_received_frags = 0;
048:
049:            public String getName() {
050:                return name;
051:            }
052:
053:            public int getFragSize() {
054:                return frag_size;
055:            }
056:
057:            public void setFragSize(int s) {
058:                frag_size = s;
059:            }
060:
061:            public long getNumberOfSentMessages() {
062:                return num_sent_msgs;
063:            }
064:
065:            public long getNumberOfSentFragments() {
066:                return num_sent_frags;
067:            }
068:
069:            public long getNumberOfReceivedMessages() {
070:                return num_received_msgs;
071:            }
072:
073:            public long getNumberOfReceivedFragments() {
074:                return num_received_frags;
075:            }
076:
077:            /**
078:             * Setup the Protocol instance acording to the configuration string
079:             */
080:            public boolean setProperties(Properties props) {
081:                String str;
082:
083:                super .setProperties(props);
084:                str = props.getProperty("frag_size");
085:                if (str != null) {
086:                    frag_size = Integer.parseInt(str);
087:                    props.remove("frag_size");
088:                }
089:
090:                if (props.size() > 0) {
091:                    log
092:                            .error("FRAG.setProperties(): the following properties are not recognized: "
093:                                    + props);
094:                    return false;
095:                }
096:                return true;
097:            }
098:
099:            public void resetStats() {
100:                super .resetStats();
101:                num_sent_msgs = num_sent_frags = num_received_msgs = num_received_frags = 0;
102:            }
103:
104:            /**
105:             * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
106:             * add a header if framentation is needed !
107:             */
108:            public void down(Event evt) {
109:                switch (evt.getType()) {
110:
111:                case Event.MSG:
112:                    Message msg = (Message) evt.getArg();
113:                    long size = msg.size();
114:                    num_sent_msgs++;
115:                    if (size > frag_size) {
116:                        if (log.isTraceEnabled()) {
117:                            StringBuffer sb = new StringBuffer(
118:                                    "message size is ");
119:                            sb.append(size).append(
120:                                    ", will fragment (frag_size=").append(
121:                                    frag_size).append(')');
122:                            log.trace(sb.toString());
123:                        }
124:                        fragment(msg); // Fragment and pass down
125:                        return;
126:                    }
127:                    break;
128:
129:                case Event.VIEW_CHANGE:
130:                    //don't do anything if this dude is sending out the view change
131:                    //we are receiving a view change,
132:                    //in here we check for the
133:                    View view = (View) evt.getArg();
134:                    Vector new_mbrs = view.getMembers(),
135:                    left_mbrs;
136:                    Address mbr;
137:
138:                    left_mbrs = Util.determineLeftMembers(members, new_mbrs);
139:                    members.clear();
140:                    members.addAll(new_mbrs);
141:
142:                    for (int i = 0; i < left_mbrs.size(); i++) {
143:                        mbr = (Address) left_mbrs.elementAt(i);
144:                        //the new view doesn't contain the sender, he must have left,
145:                        //hence we will clear all his fragmentation tables
146:                        fragment_list.remove(mbr);
147:                        if (log.isTraceEnabled())
148:                            log.trace("[VIEW_CHANGE] removed " + mbr
149:                                    + " from fragmentation table");
150:                    }
151:                    break;
152:
153:                case Event.CONFIG:
154:                    passDown(evt);
155:                    if (log.isDebugEnabled())
156:                        log.debug("received CONFIG event: " + evt.getArg());
157:                    handleConfigEvent((HashMap) evt.getArg());
158:                    return;
159:                }
160:
161:                passDown(evt); // Pass on to the layer below us
162:            }
163:
164:            /**
165:             * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
166:             */
167:            public void up(Event evt) {
168:                switch (evt.getType()) {
169:
170:                case Event.MSG:
171:                    Message msg = (Message) evt.getArg();
172:                    Object obj = msg.getHeader(name);
173:                    if (obj != null && obj instanceof  FragHeader) { // needs to be defragmented
174:                        unfragment(msg); // Unfragment and possibly pass up
175:                        return;
176:                    } else {
177:                        num_received_msgs++;
178:                    }
179:                    break;
180:
181:                case Event.CONFIG:
182:                    passUp(evt);
183:                    if (log.isDebugEnabled())
184:                        log.debug("received CONFIG event: " + evt.getArg());
185:                    handleConfigEvent((HashMap) evt.getArg());
186:                    return;
187:                }
188:
189:                passUp(evt); // Pass up to the layer above us by default
190:            }
191:
192:            /**
193:             * Send all fragments as separate messages (with same ID !).
194:             * Example:
195:             * <pre>
196:             * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
197:             * would be fragmented into:
198:             * <p/>
199:             * [2344,3,0]{dst,src,buf1},
200:             * [2344,3,1]{dst,src,buf2} and
201:             * [2344,3,2]{dst,src,buf3}
202:             * </pre>
203:             */
204:            private void fragment(Message msg) {
205:                DataOutputStream out = null;
206:                byte[] buffer;
207:                byte[] fragments[];
208:                Event evt;
209:                FragHeader hdr;
210:                Message frag_msg;
211:                Address dest = msg.getDest(), src = msg.getSrc();
212:                long id = curr_id++; // used as seqnos
213:                int num_frags;
214:                int size;
215:
216:                try {
217:                    // Write message into a byte buffer and fragment it
218:                    // Synchronization around bos is needed for concurrent access (http://jira.jboss.com/jira/browse/JGRP-215)
219:                    synchronized (bos) {
220:                        bos.reset();
221:                        out = new DataOutputStream(bos);
222:                        msg.writeTo(out);
223:                        out.flush();
224:                        buffer = bos.getRawBuffer();
225:                        fragments = Util.fragmentBuffer(buffer, frag_size, bos
226:                                .size());
227:                    }
228:
229:                    num_frags = fragments.length;
230:                    num_sent_frags += num_frags;
231:
232:                    if (log.isTraceEnabled()) {
233:                        StringBuffer sb = new StringBuffer();
234:                        sb.append("fragmenting packet to ").append(
235:                                dest != null ? dest.toString()
236:                                        : "<all members>");
237:                        sb.append(" (size=").append(buffer.length).append(
238:                                ") into ").append(num_frags);
239:                        sb.append(" fragment(s) [frag_size=").append(frag_size)
240:                                .append(']');
241:                        log.trace(sb.toString());
242:                    }
243:
244:                    for (int i = 0; i < num_frags; i++) {
245:                        frag_msg = new Message(dest, src, fragments[i]);
246:                        hdr = new FragHeader(id, i, num_frags);
247:                        frag_msg.putHeader(name, hdr);
248:                        evt = new Event(Event.MSG, frag_msg);
249:                        passDown(evt);
250:                    }
251:                } catch (Exception e) {
252:                    log.error("exception occurred trying to fragment message",
253:                            e);
254:                } finally {
255:                    Util.close(out);
256:                }
257:            }
258:
259:            /**
260:             * 1. Get all the fragment buffers
261:             * 2. When all are received -> Assemble them into one big buffer
262:             * 3. Read headers and byte buffer from big buffer
263:             * 4. Set headers and buffer in msg
264:             * 5. Pass msg up the stack
265:             */
266:            private void unfragment(Message msg) {
267:                FragmentationTable frag_table;
268:                Address sender = msg.getSrc();
269:                Message assembled_msg;
270:                FragHeader hdr = (FragHeader) msg.removeHeader(name);
271:                byte[] m;
272:                ByteArrayInputStream bis;
273:                DataInputStream in = null;
274:
275:                frag_table = fragment_list.get(sender);
276:                if (frag_table == null) {
277:                    frag_table = new FragmentationTable(sender);
278:                    try {
279:                        fragment_list.add(sender, frag_table);
280:                    } catch (IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread
281:                        frag_table = fragment_list.get(sender);
282:                    }
283:                }
284:                num_received_frags++;
285:                m = frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg
286:                        .getBuffer());
287:                if (m != null) {
288:                    try {
289:                        bis = new ByteArrayInputStream(m);
290:                        in = new DataInputStream(bis);
291:                        assembled_msg = new Message(false);
292:                        assembled_msg.readFrom(in);
293:                        if (log.isTraceEnabled())
294:                            log.trace("assembled_msg is " + assembled_msg);
295:                        assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
296:                        num_received_msgs++;
297:                        passUp(new Event(Event.MSG, assembled_msg));
298:                    } catch (Exception e) {
299:                        log.error("failed unfragmenting a message", e);
300:                    } finally {
301:                        Util.close(in);
302:                    }
303:                }
304:            }
305:
306:            void handleConfigEvent(HashMap map) {
307:                if (map == null)
308:                    return;
309:                if (map.containsKey("frag_size")) {
310:                    frag_size = ((Integer) map.get("frag_size")).intValue();
311:                    if (log.isDebugEnabled())
312:                        log.debug("setting frag_size=" + frag_size);
313:                }
314:            }
315:
316:            /**
317:             * A fragmentation list keeps a list of fragmentation tables
318:             * sorted by an Address ( the sender ).
319:             * This way, if the sender disappears or leaves the group half way
320:             * sending the content, we can simply remove this members fragmentation
321:             * table and clean up the memory of the receiver.
322:             * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
323:             */
324:            static class FragmentationList {
325:                /* initialize the hashtable to hold all the fragmentation tables
326:                 * 11 is the best growth capacity to start with<br/>
327:                 * HashMap<Address,FragmentationTable>
328:                 */
329:                private final HashMap frag_tables = new HashMap(11);
330:
331:                /**
332:                 * Adds a fragmentation table for this particular sender
333:                 * If this sender already has a fragmentation table, an IllegalArgumentException
334:                 * will be thrown.
335:                 * @param sender - the address of the sender, cannot be null
336:                 * @param table  - the fragmentation table of this sender, cannot be null
337:                 * @throws IllegalArgumentException if an entry for this sender already exist
338:                 */
339:                public void add(Address sender, FragmentationTable table)
340:                        throws IllegalArgumentException {
341:                    FragmentationTable healthCheck;
342:
343:                    synchronized (frag_tables) {
344:                        healthCheck = (FragmentationTable) frag_tables
345:                                .get(sender);
346:                        if (healthCheck == null) {
347:                            frag_tables.put(sender, table);
348:                        } else {
349:                            throw new IllegalArgumentException(
350:                                    "Sender <"
351:                                            + sender
352:                                            + "> already exists in the fragementation list");
353:                        }
354:                    }
355:                }
356:
357:                /**
358:                 * returns a fragmentation table for this sender
359:                 * returns null if the sender doesn't have a fragmentation table
360:                 * @return the fragmentation table for this sender, or null if no table exist
361:                 */
362:                public FragmentationTable get(Address sender) {
363:                    synchronized (frag_tables) {
364:                        return (FragmentationTable) frag_tables.get(sender);
365:                    }
366:                }
367:
368:                /**
369:                 * returns true if this sender already holds a
370:                 * fragmentation for this sender, false otherwise
371:                 * @param sender - the sender, cannot be null
372:                 * @return true if this sender already has a fragmentation table
373:                 */
374:                public boolean containsSender(Address sender) {
375:                    synchronized (frag_tables) {
376:                        return frag_tables.containsKey(sender);
377:                    }
378:                }
379:
380:                /**
381:                 * removes the fragmentation table from the list.
382:                 * after this operation, the fragementation list will no longer
383:                 * hold a reference to this sender's fragmentation table
384:                 * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
385:                 * @return true if the table was removed, false if the sender doesn't have an entry
386:                 */
387:                public boolean remove(Address sender) {
388:                    synchronized (frag_tables) {
389:                        boolean result = containsSender(sender);
390:                        frag_tables.remove(sender);
391:                        return result;
392:                    }
393:                }
394:
395:                /**
396:                 * returns a list of all the senders that have fragmentation tables opened.
397:                 * @return an array of all the senders in the fragmentation list
398:                 */
399:                public Address[] getSenders() {
400:                    Address[] result;
401:                    int index = 0;
402:
403:                    synchronized (frag_tables) {
404:                        result = new Address[frag_tables.size()];
405:                        for (Iterator it = frag_tables.keySet().iterator(); it
406:                                .hasNext();) {
407:                            result[index++] = (Address) it.next();
408:                        }
409:                    }
410:                    return result;
411:                }
412:
413:                public String toString() {
414:                    Map.Entry entry;
415:                    StringBuffer buf = new StringBuffer(
416:                            "Fragmentation list contains ");
417:                    synchronized (frag_tables) {
418:                        buf.append(frag_tables.size()).append(" tables\n");
419:                        for (Iterator it = frag_tables.entrySet().iterator(); it
420:                                .hasNext();) {
421:                            entry = (Map.Entry) it.next();
422:                            buf.append(entry.getKey()).append(": ").append(
423:                                    entry.getValue()).append("\n");
424:                        }
425:                    }
426:                    return buf.toString();
427:                }
428:
429:            }
430:
431:            /**
432:             * Keeps track of the fragments that are received.
433:             * Reassembles fragements into entire messages when all fragments have been received.
434:             * The fragmentation holds a an array of byte arrays for a unique sender
435:             * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
436:             */
437:            static class FragmentationTable {
438:                private final Address sender;
439:                /* the hashtable that holds the fragmentation entries for this sender*/
440:                private final Hashtable h = new Hashtable(11); // keys: frag_ids, vals: Entrys
441:
442:                FragmentationTable(Address sender) {
443:                    this .sender = sender;
444:                }
445:
446:                /**
447:                 * inner class represents an entry for a message
448:                 * each entry holds an array of byte arrays sorted
449:                 * once all the byte buffer entries have been filled
450:                 * the fragmentation is considered complete.
451:                 */
452:                static class Entry {
453:                    //the total number of fragment in this message
454:                    int tot_frags = 0;
455:                    // each fragment is a byte buffer
456:                    byte[] fragments[] = null;
457:                    //the number of fragments we have received
458:                    int number_of_frags_recvd = 0;
459:                    // the message ID
460:                    long msg_id = -1;
461:
462:                    /**
463:                     * Creates a new entry
464:                     *
465:                     * @param tot_frags the number of fragments to expect for this message
466:                     */
467:                    Entry(long msg_id, int tot_frags) {
468:                        this .msg_id = msg_id;
469:                        this .tot_frags = tot_frags;
470:                        fragments = new byte[tot_frags][];
471:                        for (int i = 0; i < tot_frags; i++) {
472:                            fragments[i] = null;
473:                        }
474:                    }
475:
476:                    /**
477:                     * adds on fragmentation buffer to the message
478:                     *
479:                     * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
480:                     * @param frag    the byte buffer containing the data for this fragmentation, should not be null
481:                     */
482:                    public void set(int frag_id, byte[] frag) {
483:                        fragments[frag_id] = frag;
484:                        number_of_frags_recvd++;
485:                    }
486:
487:                    /**
488:                     * returns true if this fragmentation is complete
489:                     * ie, all fragmentations have been received for this buffer
490:                     */
491:                    public boolean isComplete() {
492:                        /*first make the simple check*/
493:                        if (number_of_frags_recvd < tot_frags) {
494:                            return false;
495:                        }
496:                        /*then double check just in case*/
497:                        for (int i = 0; i < fragments.length; i++) {
498:                            if (fragments[i] == null)
499:                                return false;
500:                        }
501:                        /*all fragmentations have been received*/
502:                        return true;
503:                    }
504:
505:                    /**
506:                     * Assembles all the fragmentations into one buffer
507:                     * this method does not check if the fragmentation is complete
508:                     *
509:                     * @return the complete message in one buffer
510:                     */
511:                    public byte[] assembleBuffer() {
512:                        return Util.defragmentBuffer(fragments);
513:                    }
514:
515:                    /**
516:                     * debug only
517:                     */
518:                    public String toString() {
519:                        StringBuffer ret = new StringBuffer();
520:                        ret.append("[tot_frags=").append(tot_frags).append(
521:                                ", number_of_frags_recvd=").append(
522:                                number_of_frags_recvd).append(']');
523:                        return ret.toString();
524:                    }
525:
526:                    public int hashCode() {
527:                        return super .hashCode();
528:                    }
529:                }
530:
531:                /**
532:                 * Creates a new entry if not yet present. Adds the fragment.
533:                 * If all fragements for a given message have been received,
534:                 * an entire message is reassembled and returned.
535:                 * Otherwise null is returned.
536:                 *
537:                 * @param id        - the message ID, unique for a sender
538:                 * @param frag_id   the index of this fragmentation (0..tot_frags-1)
539:                 * @param tot_frags the total number of fragmentations expected
540:                 * @param fragment  - the byte buffer for this fragment
541:                 */
542:                public synchronized byte[] add(long id, int frag_id,
543:                        int tot_frags, byte[] fragment) {
544:
545:                    /*initialize the return value to default not complete */
546:                    byte[] retval = null;
547:
548:                    Entry e = (Entry) h.get(new Long(id));
549:
550:                    if (e == null) { // Create new entry if not yet present
551:                        e = new Entry(id, tot_frags);
552:                        h.put(new Long(id), e);
553:                    }
554:
555:                    e.set(frag_id, fragment);
556:                    if (e.isComplete()) {
557:                        retval = e.assembleBuffer();
558:                        h.remove(new Long(id));
559:                    }
560:
561:                    return retval;
562:                }
563:
564:                public void reset() {
565:                }
566:
567:                public String toString() {
568:                    StringBuffer buf = new StringBuffer(
569:                            "Fragmentation Table Sender:").append(sender)
570:                            .append("\n\t");
571:                    java.util.Enumeration e = this .h.elements();
572:                    while (e.hasMoreElements()) {
573:                        Entry entry = (Entry) e.nextElement();
574:                        int count = 0;
575:                        for (int i = 0; i < entry.fragments.length; i++) {
576:                            if (entry.fragments[i] != null) {
577:                                count++;
578:                            }
579:                        }
580:                        buf.append("Message ID:").append(entry.msg_id).append(
581:                                "\n\t");
582:                        buf.append("Total Frags:").append(entry.tot_frags)
583:                                .append("\n\t");
584:                        buf.append("Frags Received:").append(count).append(
585:                                "\n\n");
586:                    }
587:                    return buf.toString();
588:                }
589:            }
590:
591:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.