Source Code Cross Referenced for FC.java in  » Net » JGroups-2.4.1-sp3 » org » jgroups » protocols » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » JGroups 2.4.1 sp3 » org.jgroups.protocols 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        package org.jgroups.protocols;
002:
003:        import EDU.oswego.cs.dl.util.concurrent.*;
004:        import org.jgroups.*;
005:        import org.jgroups.stack.Protocol;
006:        import org.jgroups.util.BoundedList;
007:        import org.jgroups.util.Streamable;
008:        import org.jgroups.util.Util;
009:
010:        import java.io.*;
011:        import java.util.*;
012:        import java.util.Map.Entry;
013:
014:        /**
015:         * Simple flow control protocol based on a credit system. Each sender has a number of credits (bytes
016:         * to send). When the credits have been exhausted, the sender blocks. Each receiver also keeps track of
017:         * how many credits it has received from a sender. When credits for a sender fall below a threshold,
018:         * the receiver sends more credits to the sender. Works for both unicast and multicast messages.
019:         * <p/>
020:         * Note that this protocol must be located towards the top of the stack, or all down_threads from JChannel to this
021:         * protocol must be set to false ! This is in order to block JChannel.send()/JChannel.down().
022:         * <br/>This is the second simplified implementation of the same model. The algorithm is sketched out in
023:         * doc/FlowControl.txt
024:         * <br/>
025:         * Changes (Brian) April 2006:
026:         * <ol>
027:         * <li>Receivers now send credits to a sender when more than min_credits have been received (rather than when min_credits
028:         * are left)
029:         * <li>Receivers don't send the full credits (max_credits), but rather tha actual number of bytes received
030:         * <ol/>
031:         * @author Bela Ban
032:         * @version $Id: FC.java,v 1.53.2.11 2007/04/27 08:03:51 belaban Exp $
033:         */
034:        public class FC extends Protocol {
035:
036:            /**
037:             * HashMap<Address,Long>: keys are members, values are credits left. For each send, the
038:             * number of credits is decremented by the message size
039:             */
040:            final Map sent = new HashMap(11);
041:            // final Map sent=new ConcurrentHashMap(11);
042:
043:            /**
044:             * HashMap<Address,Long>: keys are members, values are credits left (in bytes).
045:             * For each receive, the credits for the sender are decremented by the size of the received message.
046:             * When the credits are 0, we refill and send a CREDIT message to the sender. Sender blocks until CREDIT
047:             * is received after reaching <tt>min_credits</tt> credits.
048:             */
049:            final Map received = new ConcurrentReaderHashMap(11);
050:            // final Map received=new ConcurrentHashMap(11);
051:
052:            /**
053:             * List of members from whom we expect credits
054:             */
055:            final List creditors = new ArrayList(11);
056:
057:            /** Peers who have asked for credit that we didn't have */
058:            final Set pending_requesters = new HashSet(11);
059:
060:            /**
061:             * Max number of bytes to send per receiver until an ack must
062:             * be received before continuing sending
063:             */
064:            private long max_credits = 500000;
065:            private Long max_credits_constant = new Long(max_credits);
066:
067:            /**
068:             * Max time (in milliseconds) to block. If credit hasn't been received after max_block_time, we send
069:             * a REPLENISHMENT request to the members from which we expect credits. A value <= 0 means to
070:             * wait forever.
071:             */
072:            private long max_block_time = 5000;
073:
074:            /**
075:             * If credits fall below this limit, we send more credits to the sender. (We also send when
076:             * credits are exhausted (0 credits left))
077:             */
078:            private double min_threshold = 0.25;
079:
080:            /**
081:             * Computed as <tt>max_credits</tt> times <tt>min_theshold</tt>. If explicitly set, this will
082:             * override the above computation
083:             */
084:            private long min_credits = 0;
085:
086:            /**
087:             * Whether FC is still running, this is set to false when the protocol terminates (on stop())
088:             */
089:            private boolean running = true;
090:
091:            /**
092:             * Determines whether or not to block on down(). Set when not enough credit is available to send a message
093:             * to all or a single member
094:             */
095:            private boolean insufficient_credit = false;
096:
097:            /**
098:             * the lowest credits of any destination (sent_msgs)
099:             */
100:            private long lowest_credit = max_credits;
101:
102:            /**
103:             * Lock to be used with the CondVar below.
104:             */
105:            final Sync lock = new ReentrantLock();
106:
107:            /**
108:             * Mutex to block on down()
109:             */
110:            final CondVar mutex = new CondVar(lock);
111:
112:            /**
113:             * Whether an up thread that comes back down should be allowed to
114:             * bypass blocking if all credits are exhausted. Avoids JGRP-465.
115:             */
116:            private boolean ignore_synchronous_response = true;
117:
118:            /**
119:             * Thread that carries messages through up() and shouldn't be blocked
120:             * in down() if ignore_synchronous_response==true. JGRP-465.
121:             */
122:            private Thread ignore_thread;
123:
124:            static final String name = "FC";
125:
126:            private long start_blocking = 0;
127:
128:            /**
129:             * Map<Address, Long> of the last time we requested credit
130:             */
131:            private final Map last_credit_request = new ConcurrentHashMap();
132:
133:            private int num_blockings = 0;
134:            private int num_credit_requests_received = 0,
135:                    num_credit_requests_sent = 0;
136:            private int num_credit_responses_sent = 0,
137:                    num_credit_responses_received = 0;
138:            private long total_time_blocking = 0;
139:
140:            final BoundedList last_blockings = new BoundedList(50);
141:
142:            final static FcHeader REPLENISH_HDR = new FcHeader(
143:                    FcHeader.REPLENISH);
144:            final static FcHeader CREDIT_REQUEST_HDR = new FcHeader(
145:                    FcHeader.CREDIT_REQUEST);
146:
147:            public final String getName() {
148:                return name;
149:            }
150:
151:            public void resetStats() {
152:                super .resetStats();
153:                num_blockings = 0;
154:                num_credit_responses_sent = num_credit_responses_received = num_credit_requests_received = num_credit_requests_sent = 0;
155:                total_time_blocking = 0;
156:                last_blockings.removeAll();
157:            }
158:
159:            public long getMaxCredits() {
160:                return max_credits;
161:            }
162:
163:            public void setMaxCredits(long max_credits) {
164:                this .max_credits = max_credits;
165:                max_credits_constant = new Long(this .max_credits);
166:            }
167:
168:            public double getMinThreshold() {
169:                return min_threshold;
170:            }
171:
172:            public void setMinThreshold(double min_threshold) {
173:                this .min_threshold = min_threshold;
174:            }
175:
176:            public long getMinCredits() {
177:                return min_credits;
178:            }
179:
180:            public void setMinCredits(long min_credits) {
181:                this .min_credits = min_credits;
182:            }
183:
184:            public boolean isBlocked() {
185:                return insufficient_credit;
186:            }
187:
188:            public int getNumberOfBlockings() {
189:                return num_blockings;
190:            }
191:
192:            public long getMaxBlockTime() {
193:                return max_block_time;
194:            }
195:
196:            public void setMaxBlockTime(long t) {
197:                max_block_time = t;
198:            }
199:
200:            public long getTotalTimeBlocked() {
201:                return total_time_blocking;
202:            }
203:
204:            public double getAverageTimeBlocked() {
205:                return num_blockings == 0 ? 0.0 : total_time_blocking
206:                        / (double) num_blockings;
207:            }
208:
209:            public int getNumberOfCreditRequestsReceived() {
210:                return num_credit_requests_received;
211:            }
212:
213:            public int getNumberOfCreditRequestsSent() {
214:                return num_credit_requests_sent;
215:            }
216:
217:            public int getNumberOfCreditResponsesReceived() {
218:                return num_credit_responses_received;
219:            }
220:
221:            public int getNumberOfCreditResponsesSent() {
222:                return num_credit_responses_sent;
223:            }
224:
225:            public String printSenderCredits() {
226:                return printMap(sent);
227:            }
228:
229:            public String printReceiverCredits() {
230:                return printMap(received);
231:            }
232:
233:            public String printCredits() {
234:                StringBuffer sb = new StringBuffer();
235:                sb.append("senders:\n").append(printMap(sent)).append(
236:                        "\n\nreceivers:\n").append(printMap(received));
237:                return sb.toString();
238:            }
239:
240:            public Map dumpStats() {
241:                Map retval = super .dumpStats();
242:                if (retval == null)
243:                    retval = new HashMap();
244:                retval.put("senders", printMap(sent));
245:                retval.put("receivers", printMap(received));
246:                retval.put("num_blockings", new Integer(this .num_blockings));
247:                retval.put("avg_time_blocked", new Double(
248:                        getAverageTimeBlocked()));
249:                retval.put("num_replenishments", new Integer(
250:                        this .num_credit_responses_received));
251:                retval.put("total_time_blocked", new Long(total_time_blocking));
252:                return retval;
253:            }
254:
255:            public String showLastBlockingTimes() {
256:                return last_blockings.toString();
257:            }
258:
259:            /**
260:             * Allows to unblock a blocked sender from an external program, e.g. JMX
261:             */
262:            public void unblock() {
263:                if (Util.acquire(lock)) {
264:                    try {
265:                        if (log.isTraceEnabled())
266:                            log
267:                                    .trace("unblocking the sender and replenishing all members, creditors are "
268:                                            + creditors);
269:
270:                        Map.Entry entry;
271:                        for (Iterator it = sent.entrySet().iterator(); it
272:                                .hasNext();) {
273:                            entry = (Map.Entry) it.next();
274:                            entry.setValue(max_credits_constant);
275:                        }
276:
277:                        lowest_credit = computeLowestCredit(sent);
278:                        creditors.clear();
279:                        insufficient_credit = false;
280:                        mutex.broadcast();
281:                    } finally {
282:                        Util.release(lock);
283:                    }
284:                }
285:            }
286:
287:            public boolean setProperties(Properties props) {
288:                String str;
289:                boolean min_credits_set = false;
290:
291:                super .setProperties(props);
292:                str = props.getProperty("max_credits");
293:                if (str != null) {
294:                    max_credits = Long.parseLong(str);
295:                    props.remove("max_credits");
296:                }
297:
298:                str = props.getProperty("min_threshold");
299:                if (str != null) {
300:                    min_threshold = Double.parseDouble(str);
301:                    props.remove("min_threshold");
302:                }
303:
304:                str = props.getProperty("min_credits");
305:                if (str != null) {
306:                    min_credits = Long.parseLong(str);
307:                    props.remove("min_credits");
308:                    min_credits_set = true;
309:                }
310:
311:                if (!min_credits_set)
312:                    min_credits = (long) ((double) max_credits * min_threshold);
313:
314:                str = props.getProperty("max_block_time");
315:                if (str != null) {
316:                    max_block_time = Long.parseLong(str);
317:                    props.remove("max_block_time");
318:                }
319:
320:                str = props.getProperty("ignore_synchronous_response");
321:                if (str != null) {
322:                    ignore_synchronous_response = Boolean.valueOf(str)
323:                            .booleanValue();
324:                    props.remove("ignore_synchronous_response");
325:                }
326:
327:                if (!props.isEmpty()) {
328:                    log.error("the following properties are not recognized: "
329:                            + props);
330:                    return false;
331:                }
332:                max_credits_constant = new Long(max_credits);
333:                return true;
334:            }
335:
336:            public void start() throws Exception {
337:                super .start();
338:                lock.acquire();
339:                try {
340:                    running = true;
341:                    insufficient_credit = false;
342:                    lowest_credit = max_credits;
343:                } finally {
344:                    lock.release();
345:                }
346:            }
347:
348:            public void stop() {
349:                super .stop();
350:                if (Util.acquire(lock)) {
351:                    try {
352:                        running = false;
353:                        ignore_thread = null;
354:                        mutex.broadcast(); // notify all threads waiting on the mutex that we are done
355:                    } finally {
356:                        Util.release(lock);
357:                    }
358:                }
359:            }
360:
361:            /**
362:             * We need to receive view changes concurrent to messages on the down events: a message might blocks, e.g.
363:             * because we don't have enough credits to send to member P. However, if member P crashed, we need to unblock !
364:             * @param evt
365:             */
366:            protected void receiveDownEvent(Event evt) {
367:                if (evt.getType() == Event.VIEW_CHANGE) {
368:                    View v = (View) evt.getArg();
369:                    Vector mbrs = v.getMembers();
370:                    handleViewChange(mbrs);
371:                }
372:                super .receiveDownEvent(evt);
373:            }
374:
375:            public void down(Event evt) {
376:                switch (evt.getType()) {
377:                case Event.MSG:
378:                    handleDownMessage(evt);
379:                    return;
380:                }
381:                passDown(evt); // this could potentially use the lower protocol's thread which may block
382:            }
383:
384:            public void up(Event evt) {
385:                switch (evt.getType()) {
386:
387:                case Event.MSG:
388:
389:                    // JGRP-465. We only deal with msgs to avoid having to use
390:                    // a concurrent collection; ignore views, suspicions, etc 
391:                    // which can come up on unusual threads.
392:                    if (ignore_thread == null && ignore_synchronous_response)
393:                        ignore_thread = Thread.currentThread();
394:
395:                    Message msg = (Message) evt.getArg();
396:                    FcHeader hdr = (FcHeader) msg.removeHeader(name);
397:                    if (hdr != null) {
398:                        switch (hdr.type) {
399:                        case FcHeader.REPLENISH:
400:                            num_credit_responses_received++;
401:                            handleCredit(msg.getSrc(), (Number) msg.getObject());
402:                            break;
403:                        case FcHeader.CREDIT_REQUEST:
404:                            num_credit_requests_received++;
405:                            Address sender = msg.getSrc();
406:                            Long sent_credits = (Long) msg.getObject();
407:                            handleCreditRequest(sender, sent_credits);
408:                            break;
409:                        default:
410:                            log.error("header type " + hdr.type + " not known");
411:                            break;
412:                        }
413:                        return; // don't pass message up
414:                    } else {
415:                        adjustCredit(msg);
416:                    }
417:                    break;
418:
419:                case Event.VIEW_CHANGE:
420:                    handleViewChange(((View) evt.getArg()).getMembers());
421:                    break;
422:                }
423:
424:                passUp(evt);
425:            }
426:
427:            private void handleDownMessage(Event evt) {
428:                Message msg = (Message) evt.getArg();
429:                int length = msg.getLength();
430:                Address dest = msg.getDest();
431:
432:                if (Util.acquire(lock)) {
433:                    try {
434:                        if (lowest_credit <= length) {
435:                            if (ignore_synchronous_response
436:                                    && ignore_thread == Thread.currentThread()) { // JGRP-465
437:                                if (log.isTraceEnabled())
438:                                    log
439:                                            .trace("Bypassing blocking to avoid deadlocking "
440:                                                    + Thread.currentThread());
441:                            } else {
442:                                determineCreditors(dest, length);
443:
444:                                long blockStart = System.currentTimeMillis();
445:                                if (!insufficient_credit) {
446:                                    insufficient_credit = true;
447:                                    start_blocking = blockStart;
448:                                    if (log.isTraceEnabled()) {
449:                                        log
450:                                                .trace("Starting blocking. lowest_credit="
451:                                                        + lowest_credit
452:                                                        + "; msg length ="
453:                                                        + length);
454:                                    }
455:                                }
456:                                num_blockings++;
457:
458:                                while (insufficient_credit && running) {
459:                                    try {
460:                                        mutex.timedwait(max_block_time);
461:                                    } catch (InterruptedException e) {
462:                                    }
463:                                    if (insufficient_credit && running) {
464:                                        long waitTime = System
465:                                                .currentTimeMillis()
466:                                                - blockStart;
467:                                        if (log.isTraceEnabled()) {
468:                                            log
469:                                                    .trace("Still waiting for credits -- waiting "
470:                                                            + waitTime + " ms");
471:                                        }
472:
473:                                        // Only ask for credit if we blocked over max_block_time,
474:                                        // otherwise it's not an emergency
475:                                        if (waitTime >= max_block_time) {
476:
477:                                            // Creditors may have been cleared but credit
478:                                            // receipt was insufficient to let all
479:                                            // blocked threads proceed. So, redetermine
480:                                            determineCreditors(dest, length);
481:
482:                                            Map sent_copy = new HashMap(sent);
483:                                            sent_copy.keySet().retainAll(
484:                                                    creditors);
485:                                            // we need to send the credit requests down *without* holding the lock, otherwise we might
486:                                            // run into the deadlock described in http://jira.jboss.com/jira/browse/JGRP-292
487:                                            Util.release(lock);
488:                                            try {
489:                                                for (Iterator it = sent_copy
490:                                                        .entrySet().iterator(); it
491:                                                        .hasNext();) {
492:                                                    Map.Entry e = (Entry) it
493:                                                            .next();
494:                                                    sendCreditRequest(
495:                                                            (Address) e
496:                                                                    .getKey(),
497:                                                            (Long) e.getValue());
498:                                                }
499:                                            } finally {
500:                                                Util.acquire(lock);
501:                                            }
502:                                        }
503:                                    }
504:                                }
505:
506:                                long block_time = System.currentTimeMillis()
507:                                        - blockStart;
508:                                if (log.isTraceEnabled())
509:                                    log.trace("total time blocked: "
510:                                            + block_time + " ms");
511:                                total_time_blocking += block_time;
512:                                last_blockings.add(new Long(block_time));
513:                            }
514:                        }
515:
516:                        long tmp = decrementCredit(sent, dest, length);
517:                        if (tmp != -1)
518:                            lowest_credit = Math.min(tmp, lowest_credit);
519:
520:                    } finally {
521:                        Util.release(lock);
522:                    }
523:                }
524:
525:                // send message - either after regular processing, or after blocking (when enough credits available again)
526:                passDown(evt);
527:            }
528:
529:            /**
530:             * Checks whether one member (unicast msg) or all members (multicast msg) have enough credits. Add those
531:             * that don't to the creditors list
532:             * @param dest
533:             * @param length
534:             */
535:            private void determineCreditors(Address dest, int length) {
536:                boolean multicast = dest == null || dest.isMulticastAddress();
537:                Address mbr;
538:                Long credits;
539:                if (multicast) {
540:                    Map.Entry entry;
541:                    for (Iterator it = sent.entrySet().iterator(); it.hasNext();) {
542:                        entry = (Map.Entry) it.next();
543:                        mbr = (Address) entry.getKey();
544:                        credits = (Long) entry.getValue();
545:                        if (credits.longValue() <= length) {
546:                            if (!creditors.contains(mbr))
547:                                creditors.add(mbr);
548:                        }
549:                    }
550:                } else {
551:                    credits = (Long) sent.get(dest);
552:                    if (credits != null && credits.longValue() <= length) {
553:                        if (!creditors.contains(dest))
554:                            creditors.add(dest);
555:                    }
556:                }
557:            }
558:
559:            /**
560:             * Decrements credits from a single member, or all members in sent_msgs, depending on whether it is a multicast
561:             * or unicast message. No need to acquire mutex (must already be held when this method is called)
562:             * @param dest
563:             * @param credits
564:             * @return The lowest number of credits left, or -1 if a unicast member was not found
565:             */
566:            private long decrementCredit(Map m, Address dest, long credits) {
567:                boolean multicast = dest == null || dest.isMulticastAddress();
568:                long lowest = max_credits, tmp;
569:                Long val;
570:
571:                if (multicast) {
572:                    if (m.isEmpty())
573:                        return -1;
574:                    Map.Entry entry;
575:                    for (Iterator it = m.entrySet().iterator(); it.hasNext();) {
576:                        entry = (Map.Entry) it.next();
577:                        val = (Long) entry.getValue();
578:                        tmp = val.longValue();
579:                        tmp -= credits;
580:                        entry.setValue(new Long(tmp));
581:                        lowest = Math.min(tmp, lowest);
582:                    }
583:                    return lowest;
584:                } else {
585:                    val = (Long) m.get(dest);
586:                    if (val != null) {
587:                        lowest = val.longValue();
588:                        lowest -= credits;
589:                        m.put(dest, new Long(lowest));
590:                        return lowest;
591:                    }
592:                }
593:                return -1;
594:            }
595:
596:            private void handleCredit(Address sender, Number increase) {
597:                if (sender == null)
598:                    return;
599:                StringBuffer sb = null;
600:
601:                if (Util.acquire(lock)) {
602:                    try {
603:                        Long old_credit = (Long) sent.get(sender);
604:                        long increased = old_credit.longValue()
605:                                + increase.longValue();
606:                        Long new_credit = new Long(Math.min(max_credits,
607:                                increased));
608:
609:                        if (log.isTraceEnabled()) {
610:                            sb = new StringBuffer();
611:                            sb.append("received " + increase + " credit from ")
612:                                    .append(sender).append(", old credit was ")
613:                                    .append(old_credit).append(
614:                                            ", new credits are ").append(
615:                                            new_credit);
616:                            if (increased > max_credits)
617:                                sb.append(" ignored over-credit of "
618:                                        + (increased - max_credits));
619:                        }
620:
621:                        sent.put(sender, new_credit);
622:                        lowest_credit = computeLowestCredit(sent);
623:                        if (!creditors.isEmpty()) { // we are blocked because we expect credit from one or more members
624:
625:                            if (log.isTraceEnabled())
626:                                sb.append(".\nCreditors before are: ").append(
627:                                        creditors);
628:
629:                            creditors.remove(sender);
630:
631:                            if (log.isTraceEnabled()) {
632:                                sb.append("\nCreditors after removal of ")
633:                                        .append(sender).append(" are: ")
634:                                        .append(creditors).append(
635:                                                "; lowest_credit=").append(
636:                                                lowest_credit);
637:                            }
638:                        }
639:
640:                        if (insufficient_credit && lowest_credit > 0
641:                                && creditors.isEmpty()) {
642:                            insufficient_credit = false;
643:                            mutex.broadcast();
644:                            if (log.isTraceEnabled())
645:                                sb
646:                                        .append("\nTotal block time = "
647:                                                + (System.currentTimeMillis() - start_blocking));
648:                        }
649:
650:                        if (log.isTraceEnabled())
651:                            log.trace(sb.toString());
652:                    } finally {
653:                        Util.release(lock);
654:                    }
655:                } else {
656:                    if (log.isWarnEnabled())
657:                        log.warn(increase + " credits from " + sender
658:                                + " were dropped, lock could not be acquired");
659:                }
660:            }
661:
662:            private static long computeLowestCredit(Map m) {
663:                Collection credits = m.values(); // List of Longs (credits)
664:                Long retval = (Long) Collections.min(credits);
665:                return retval.longValue();
666:            }
667:
668:            /**
669:             * Check whether sender has enough credits left. If not, send him some more
670:             * @param msg
671:             */
672:            private void adjustCredit(Message msg) {
673:                Address src = msg.getSrc();
674:                long length = msg.getLength(); // we don't care about headers for the purpose of flow control
675:
676:                if (src == null) {
677:                    if (log.isErrorEnabled())
678:                        log.error("src is null");
679:                    return;
680:                }
681:
682:                if (length == 0)
683:                    return; // no effect
684:
685:                long remaining_cred = decrementCredit(received, src, length);
686:                long credit_response = max_credits - remaining_cred;
687:                if (credit_response >= min_credits) {
688:                    received.put(src, max_credits_constant);
689:                    if (!pending_requesters.isEmpty())
690:                        pending_requesters.remove(src);
691:                    if (log.isTraceEnabled())
692:                        log.trace("sending " + credit_response
693:                                + " replenishment credits to " + src);
694:                    sendCredit(src, credit_response);
695:                }
696:            }
697:
698:            private void handleCreditRequest(Address sender, Long sender_credit) {
699:                if (sender == null)
700:                    return;
701:
702:                if (Util.acquire(lock)) {
703:                    long credit_response = 0;
704:                    try {
705:                        Long old_credit = (Long) received.get(sender);
706:                        if (old_credit != null) {
707:                            credit_response = max_credits
708:                                    - old_credit.longValue();
709:                        }
710:
711:                        if (credit_response > 0) {
712:                            if (log.isTraceEnabled())
713:                                log.trace("received credit request from "
714:                                        + sender + ": sending "
715:                                        + credit_response + " credits");
716:                            received.put(sender, max_credits_constant);
717:                            pending_requesters.remove(sender);
718:                        } else {
719:                            if (pending_requesters.contains(sender)) {
720:                                // a sender might have negative credits, e.g. -20000. If we subtracted -20000 from max_credits,
721:                                // we'd end up with max_credits + 20000, and send too many credits back. So if the sender's
722:                                // credits is negative, we simply send max_credits back
723:                                long credits_left = sender_credit.longValue();
724:                                if (credits_left < 0)
725:                                    credits_left = 0;
726:                                credit_response = max_credits - credits_left;
727:                                // credit_response = max_credits;
728:                                received.put(sender, max_credits_constant);
729:                                pending_requesters.remove(sender);
730:                                if (log.isWarnEnabled())
731:                                    log
732:                                            .warn("Received two credit requests from "
733:                                                    + sender
734:                                                    + " without any intervening messages; sending "
735:                                                    + credit_response
736:                                                    + " credits");
737:                            } else {
738:                                pending_requesters.add(sender);
739:                                if (log.isTraceEnabled())
740:                                    log.trace("received credit request from "
741:                                            + sender
742:                                            + " but have no credits available");
743:                            }
744:                        }
745:                    } finally {
746:                        Util.release(lock);
747:                    }
748:
749:                    if (credit_response > 0)
750:                        sendCredit(sender, credit_response);
751:                }
752:            }
753:
754:            /**
755:             * Returns the max credits. Handling a credit request should be the exception, not the normal case.
756:             * @param sender
757:             * todo: see if this solves Brian's deadlock problems. If not, use the (commented) method above !
758:             */
759:            //    private void handleCreditRequest(Address sender) {
760:            //        if(sender == null) {
761:            //            if(log.isWarnEnabled())
762:            //                log.warn("sender is null, not able to send credits");
763:            //            return;
764:            //        }
765:            //        
766:            //        if(log.isTraceEnabled()) {
767:            //           Long recL = (Long) received.get(sender);
768:            //           long rec = recL == null ? 0 : max_credits - recL.longValue();
769:            //           log.trace("received credit request from " + sender + ": sending " + 
770:            //                      max_credits + " credits: had received " + rec + " bytes");
771:            //        }
772:            //        
773:            //        received.put(sender, max_credits_constant);
774:            //        sendCredit(sender, max_credits);
775:            //    }
776:
777:            private void sendCredit(Address dest, long credit) {
778:                Number number;
779:                if (credit < Integer.MAX_VALUE)
780:                    number = new Integer((int) credit);
781:                else
782:                    number = new Long(credit);
783:                Message msg = new Message(dest, null, number);
784:                msg.putHeader(name, REPLENISH_HDR);
785:                passDown(new Event(Event.MSG, msg));
786:                num_credit_responses_sent++;
787:            }
788:
789:            /**
790:             * Sends a credit request to dest. If the last credit request was sent shortly before (less than max_block_time
791:             * milliseconds ago), then we discard the request. This ensures that credit requests are not sent more frequently
792:             * than every max_block_time milliseconds, preventing credit request storms
793:             * @param dest
794:             * @param credit_balance
795:             */
796:            private void sendCreditRequest(final Address dest,
797:                    final Long credit_balance) {
798:                if (max_block_time > 0) {
799:                    // This call is made with the lock released, so ensure the get/put is atomic
800:                    long now = System.currentTimeMillis();
801:                    Long last = (Long) last_credit_request.get(dest);
802:                    if (last != null && now - last.longValue() < max_block_time) {
803:                        return;
804:                    }
805:                    last_credit_request.put(dest, new Long(now));
806:                }
807:
808:                if (log.isTraceEnabled())
809:                    log.trace("sending credit request to " + dest
810:                            + "; balance=" + credit_balance);
811:
812:                Message msg = new Message(dest, null, credit_balance);
813:                msg.putHeader(name, CREDIT_REQUEST_HDR);
814:                passDown(new Event(Event.MSG, msg));
815:                num_credit_requests_sent++;
816:            }
817:
818:            private void handleViewChange(Vector mbrs) {
819:                Address addr;
820:                if (mbrs == null)
821:                    return;
822:                if (log.isTraceEnabled())
823:                    log.trace("new membership: " + mbrs);
824:
825:                if (Util.acquire(lock)) {
826:                    try {
827:                        // add members not in membership to received and sent hashmap (with full credits)
828:                        for (int i = 0; i < mbrs.size(); i++) {
829:                            addr = (Address) mbrs.elementAt(i);
830:                            if (!received.containsKey(addr))
831:                                received.put(addr, max_credits_constant);
832:                            if (!sent.containsKey(addr))
833:                                sent.put(addr, max_credits_constant);
834:                        }
835:                        // remove members that left
836:                        for (Iterator it = received.keySet().iterator(); it
837:                                .hasNext();) {
838:                            addr = (Address) it.next();
839:                            if (!mbrs.contains(addr))
840:                                it.remove();
841:                        }
842:
843:                        // remove members that left
844:                        for (Iterator it = sent.keySet().iterator(); it
845:                                .hasNext();) {
846:                            addr = (Address) it.next();
847:                            if (!mbrs.contains(addr))
848:                                it.remove(); // modified the underlying map
849:                        }
850:
851:                        // remove all creditors which are not in the new view
852:                        for (int i = 0; i < creditors.size(); i++) {
853:                            Address creditor = (Address) creditors.get(i);
854:                            if (!mbrs.contains(creditor))
855:                                creditors.remove(creditor);
856:                        }
857:
858:                        if (log.isTraceEnabled())
859:                            log.trace("creditors are " + creditors);
860:                        if (insufficient_credit && creditors.isEmpty()) {
861:                            lowest_credit = computeLowestCredit(sent);
862:                            insufficient_credit = false;
863:                            mutex.broadcast();
864:                        }
865:
866:                        // keep it simple and just clear the last_credit_request Map
867:                        // at worst we get an extra credit request
868:                        last_credit_request.clear();
869:                    } finally {
870:                        Util.release(lock);
871:                    }
872:                }
873:            }
874:
875:            private static String printMap(Map m) {
876:                Map.Entry entry;
877:                StringBuffer sb = new StringBuffer();
878:                for (Iterator it = m.entrySet().iterator(); it.hasNext();) {
879:                    entry = (Map.Entry) it.next();
880:                    sb.append(entry.getKey()).append(": ").append(
881:                            entry.getValue()).append("\n");
882:                }
883:                return sb.toString();
884:            }
885:
886:            public static class FcHeader extends Header implements  Streamable {
887:                public static final byte REPLENISH = 1;
888:                public static final byte CREDIT_REQUEST = 2; // the sender of the message is the requester
889:
890:                byte type = REPLENISH;
891:
892:                public FcHeader() {
893:
894:                }
895:
896:                public FcHeader(byte type) {
897:                    this .type = type;
898:                }
899:
900:                public long size() {
901:                    return Global.BYTE_SIZE;
902:                }
903:
904:                public void writeExternal(ObjectOutput out) throws IOException {
905:                    out.writeByte(type);
906:                }
907:
908:                public void readExternal(ObjectInput in) throws IOException,
909:                        ClassNotFoundException {
910:                    type = in.readByte();
911:                }
912:
913:                public void writeTo(DataOutputStream out) throws IOException {
914:                    out.writeByte(type);
915:                }
916:
917:                public void readFrom(DataInputStream in) throws IOException,
918:                        IllegalAccessException, InstantiationException {
919:                    type = in.readByte();
920:                }
921:
922:                public String toString() {
923:                    switch (type) {
924:                    case REPLENISH:
925:                        return "REPLENISH";
926:                    case CREDIT_REQUEST:
927:                        return "CREDIT_REQUEST";
928:                    default:
929:                        return "<invalid type>";
930:                    }
931:                }
932:            }
933:
934:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.