Source Code Cross Referenced for Horizontal.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » cluster » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.cluster 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /**
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */package org.griphyn.cPlanner.cluster;
015:
016:        import org.griphyn.cPlanner.classes.ADag;
017:        import org.griphyn.cPlanner.classes.SubInfo;
018:        import org.griphyn.cPlanner.classes.PCRelation;
019:        import org.griphyn.cPlanner.classes.AggregatedJob;
020:
021:        import org.griphyn.cPlanner.common.PegasusProperties;
022:        import org.griphyn.cPlanner.common.LogManager;
023:
024:        import org.griphyn.cPlanner.cluster.aggregator.JobAggregatorInstanceFactory;
025:
026:        import org.griphyn.cPlanner.namespace.VDS;
027:
028:        import org.griphyn.cPlanner.partitioner.Partition;
029:
030:        import org.griphyn.cPlanner.provenance.pasoa.XMLProducer;
031:        import org.griphyn.cPlanner.provenance.pasoa.producer.XMLProducerFactory;
032:
033:        import org.griphyn.cPlanner.provenance.pasoa.PPS;
034:        import org.griphyn.cPlanner.provenance.pasoa.pps.PPSFactory;
035:
036:        import java.util.Collections;
037:        import java.util.List;
038:        import java.util.ArrayList;
039:        import java.util.LinkedList;
040:        import java.util.Map;
041:        import java.util.HashMap;
042:        import java.util.Iterator;
043:        import java.util.Comparator;
044:        import java.util.Set;
045:        import java.util.StringTokenizer;
046:        import org.griphyn.cPlanner.classes.PegasusBag;
047:
048:        /**
049:         * The horizontal clusterer, that clusters jobs on the same level.
050:         *
051:         * @author Karan Vahi
052:         * @version $Revision: 450 $
053:         */
054:
055:        public class Horizontal implements  Clusterer,
056:                org.griphyn.cPlanner.engine.Refiner {//reqd for PASOA integration
057:
058:            /**
059:             * The default collapse factor for collapsing jobs with same logical name
060:             * scheduled onto the same execution pool.
061:             */
062:            public static final int DEFAULT_COLLAPSE_FACTOR = 3;
063:
064:            /**
065:             * A short description about the partitioner.
066:             */
067:            public static final String DESCRIPTION = "Horizontal Clustering";
068:
069:            /**
070:             * A singleton access to the job comparator.
071:             */
072:            private static Comparator mJobComparator = null;
073:
074:            /**
075:             * The handle to the logger object.
076:             */
077:            protected LogManager mLogger;
078:
079:            /**
080:             * The handle to the properties object holding all the properties.
081:             */
082:            protected PegasusProperties mProps;
083:
084:            /**
085:             * The handle to the job aggregator factory.
086:             */
087:            protected JobAggregatorInstanceFactory mJobAggregatorFactory;
088:
089:            /**
090:             * ADag object containing the jobs that have been scheduled by the site
091:             * selector.
092:             */
093:            private ADag mScheduledDAG;
094:
095:            /**
096:             * Map to hold the jobs sorted by the label of jobs in dax.
097:             * The key is the logical job name and value is the list of jobs with that
098:             * logical name.
099:             *
100:             * This no longer used, and would be removed later.
101:             */
102:            private Map mJobMap;
103:
104:            /**
105:             * A Map to store all the job(SubInfo) objects indexed by their logical ID found in
106:             * the dax. This should actually be in the ADag structure.
107:             */
108:            private Map mSubInfoMap;
109:
110:            /**
111:             * Map to hold the collapse values for the various execution pools. The
112:             * values are gotten from the properties file or can be gotten from the
113:             * resource information catalog a.k.a MDS.
114:             */
115:            private Map mCollapseMap;
116:
117:            /**
118:             * Replacement table, that identifies the corresponding fat job for a job.
119:             */
120:            private Map mReplacementTable;
121:
122:            /**
123:             * The XML Producer object that records the actions.
124:             */
125:            private XMLProducer mXMLStore;
126:
127:            /**
128:             * The handle to the provenance store implementation.
129:             */
130:            private PPS mPPS;
131:
132:            /**
133:             * Singleton access to the job comparator.
134:             *
135:             * @return the job comparator.
136:             */
137:            private Comparator jobComparator() {
138:                return (mJobComparator == null) ? new JobComparator()
139:                        : mJobComparator;
140:            }
141:
142:            /**
143:             * The default constructor.
144:             */
145:            public Horizontal() {
146:                mLogger = LogManager.getInstance();
147:                mJobAggregatorFactory = new JobAggregatorInstanceFactory();
148:            }
149:
150:            /**
151:             * Returns a reference to the workflow that is being refined by the refiner.
152:             *
153:             *
154:             * @return ADAG object.
155:             */
156:            public ADag getWorkflow() {
157:                return this .mScheduledDAG;
158:            }
159:
160:            /**
161:             * Returns a reference to the XMLProducer, that generates the XML fragment
162:             * capturing the actions of the refiner. This is used for provenace
163:             * purposes.
164:             *
165:             * @return XMLProducer
166:             */
167:            public XMLProducer getXMLProducer() {
168:                return this .mXMLStore;
169:            }
170:
171:            /**
172:             *Initializes the Clusterer impelementation
173:             *
174:             * @param dag  the workflow that is being clustered.
175:             * @param bag   the bag of objects that is useful for initialization.
176:             *
177:             * @throws ClustererException in case of error.
178:             */
179:            public void initialize(ADag dag, PegasusBag bag)
180:                    throws ClustererException {
181:                mScheduledDAG = dag;
182:                mProps = bag.getPegasusProperties();
183:                mJobAggregatorFactory.initialize(dag, bag);
184:
185:                mJobMap = new HashMap();
186:                mCollapseMap = this .constructMap(mProps.getCollapseFactors());
187:                mReplacementTable = new HashMap();
188:                mSubInfoMap = new HashMap();
189:
190:                for (Iterator it = mScheduledDAG.vJobSubInfos.iterator(); it
191:                        .hasNext();) {
192:                    //pass the jobs to the callback
193:                    SubInfo job = (SubInfo) it.next();
194:                    mSubInfoMap.put(job.getLogicalID(), job);
195:                }
196:
197:                //load the PPS implementation
198:                mXMLStore = XMLProducerFactory.loadXMLProducer(mProps);
199:                mPPS = PPSFactory.loadPPS(this .mProps);
200:
201:                mXMLStore.add("<workflow url=\"" + null + "\">");
202:
203:                //call the begin workflow method
204:                try {
205:                    mPPS.beginWorkflowRefinementStep(this ,
206:                            PPS.REFINEMENT_CLUSTER, false);
207:                } catch (Exception e) {
208:                    throw new ClustererException("PASOA Exception", e);
209:                }
210:
211:                //clear the XML store
212:                mXMLStore.clear();
213:
214:            }
215:
216:            /**
217:             * Determine the clusters for a partition. The partition is assumed to
218:             * contain independant jobs, and multiple clusters maybe created for the
219:             * partition. Internally the jobs are grouped according to transformation name
220:             * and then according to the execution site. Each group
221:             * (having same transformation name and scheduled on same site), is then
222:             * clustered.
223:             * The number of clustered jobs created for each group is dependant on the
224:             * following VDS profiles that can be associated with the jobs.
225:             * <pre>
226:             *       1) bundle   (dictates the number of clustered jobs that are created)
227:             *       2) collapse (the number of jobs that make a single clustered job)
228:             * </pre>
229:             *
230:             * In case of both parameters being associated with the jobs in a group, the
231:             * bundle parameter overrides collapse parameter.
232:             *
233:             * @param partition   the partition for which the clusters need to be
234:             *                    determined.
235:             *
236:             * @throws ClustererException in case of error.
237:             *
238:             * @see VDS#BUNDLE_KEY
239:             * @see VDS#COLLAPSE_KEY
240:             */
241:            public void determineClusters(Partition partition)
242:                    throws ClustererException {
243:                Set s = partition.getNodeIDs();
244:                List l = new ArrayList(s.size());
245:                mLogger.log("Collapsing jobs in partition " + partition.getID()
246:                        + " " + s, LogManager.DEBUG_MESSAGE_LEVEL);
247:
248:                for (Iterator it = s.iterator(); it.hasNext();) {
249:                    SubInfo job = (SubInfo) mSubInfoMap.get(it.next());
250:                    l.add(job);
251:                }
252:                //group the jobs by their transformation names
253:                Collections.sort(l, jobComparator());
254:                //traverse through the list and collapse jobs
255:                //referring to same logical transformation
256:                SubInfo previous = null;
257:                List clusterList = new LinkedList();
258:                SubInfo job = null;
259:                for (Iterator it = l.iterator(); it.hasNext();) {
260:                    job = (SubInfo) it.next();
261:                    if (previous == null
262:                            || job.getCompleteTCName().equals(
263:                                    previous.getCompleteTCName())) {
264:                        clusterList.add(job);
265:                    } else {
266:                        //at boundary collapse jobs
267:                        collapseJobs(previous.getStagedExecutableBaseName(),
268:                                clusterList, partition.getID());
269:                        clusterList = new LinkedList();
270:                        clusterList.add(job);
271:                    }
272:                    previous = job;
273:                }
274:                //cluster the last clusterList
275:                if (previous != null) {
276:                    collapseJobs(previous.getStagedExecutableBaseName(),
277:                            clusterList, partition.getID());
278:                }
279:
280:            }
281:
282:            /**
283:             * Am empty implementation of the callout, as state is maintained
284:             * internally to determine the relations between the jobs.
285:             *
286:             * @param partitionID   the id of a partition.
287:             * @param parents       the list of <code>String</code> objects that contain
288:             *                      the id's of the parents of the partition.
289:             *
290:             * @throws ClustererException in case of error.
291:             */
292:            public void parents(String partitionID, List parents)
293:                    throws ClustererException {
294:
295:            }
296:
297:            /**
298:             * Collapses the jobs having the same logical name according to the sites
299:             * where they are scheduled.
300:             *
301:             * @param name         the logical name of the jobs in the list passed to
302:             *                     this function.
303:             * @param jobs         the list <code>SubInfo</code> objects corresponding
304:             *                     to the jobs that have the same logical name.
305:             * @param partitionID  the ID of the partition to which the jobs belong.
306:             */
307:            private void collapseJobs(String name, List jobs, String partitionID) {
308:                String key = null;
309:                SubInfo job = null;
310:                List l = null;
311:                //internal map that keeps the jobs according to the execution pool
312:                Map tempMap = new java.util.HashMap();
313:                int[] cFactor = new int[2]; //the collapse factor for collapsing the jobs
314:                cFactor[0] = 0;
315:                cFactor[1] = 0;
316:                AggregatedJob fatJob = null;
317:
318:                mLogger.log("Collapsing jobs of type " + name,
319:                        LogManager.DEBUG_MESSAGE_LEVEL);
320:
321:                //traverse through all the jobs and order them by the
322:                //pool on which they are scheduled
323:                for (Iterator it = jobs.iterator(); it.hasNext();) {
324:
325:                    job = (SubInfo) it.next();
326:                    key = job.executionPool;
327:                    //check if the job logical name is already in the map
328:                    if (tempMap.containsKey(key)) {
329:                        //add the job to the corresponding list.
330:                        l = (List) tempMap.get(key);
331:                        l.add(job);
332:                    } else {
333:                        //first instance of this logical name
334:                        l = new java.util.LinkedList();
335:                        l.add(job);
336:                        tempMap.put(key, l);
337:                    }
338:                }
339:
340:                //iterate through the built up temp map to get jobs per execution pool
341:                String factor = null;
342:                int size = -1;
343:                //the id for the fatjobs. we want ids
344:                //unique across the execution pools for a
345:                //particular type of job being merged.
346:                int id = 1;
347:
348:                for (Iterator it = tempMap.entrySet().iterator(); it.hasNext();) {
349:                    Map.Entry entry = (Map.Entry) it.next();
350:                    l = (List) entry.getValue();
351:                    size = l.size();
352:                    //the pool name on which the job is to run is the key
353:                    key = (String) entry.getKey();
354:
355:                    if (size <= 1) {
356:                        //no need to collapse one job. go to the next iteration
357:                        mLogger.log("\t No collapsing for execution pool "
358:                                + key, LogManager.DEBUG_MESSAGE_LEVEL);
359:                        continue;
360:                    }
361:
362:                    JobAggregator aggregator = mJobAggregatorFactory
363:                            .loadInstance((SubInfo) l.get(0));
364:                    if (aggregator.entryNotInTC(key)) {
365:                        //no need to collapse one job. go to the next iteration
366:                        mLogger.log(
367:                                "\t No collapsing for execution pool because job aggregator entry not in tc "
368:                                        + key, LogManager.DEBUG_MESSAGE_LEVEL);
369:                        continue;
370:                    }
371:
372:                    //checks made ensure that l is not empty at this point
373:                    cFactor = getCollapseFactor(key, (SubInfo) l.get(0), size);
374:                    if (cFactor[0] == 1 && cFactor[1] == 0) {
375:                        mLogger.log("\t Collapse factor of (" + cFactor[0]
376:                                + "," + cFactor[1] + ") determined for pool. "
377:                                + key + ". Skipping collapsing",
378:                                LogManager.DEBUG_MESSAGE_LEVEL);
379:                        continue;
380:                    }
381:
382:                    mLogger.log("\t Collapsing jobs at execution pool " + key
383:                            + " with collapse factor " + cFactor[0] + ","
384:                            + cFactor[1], LogManager.DEBUG_MESSAGE_LEVEL);
385:
386:                    //we do collapsing in chunks of 3 instead of picking up
387:                    //from the properties file. ceiling is (x + y -1)/y
388:                    //cFactor = (size + 2)/3;
389:
390:                    if (cFactor[0] >= size) {
391:                        //means collapse all the jobs in the list as a fat node
392:                        //Note: Passing a link to iterator might be more efficient, as
393:                        //this would only require a single traversal through the list
394:                        fatJob = aggregator.construct(l.subList(0, size), name,
395:                                constructID(partitionID, id));
396:                        updateReplacementTable(l.subList(0, size), fatJob);
397:
398:                        //increment the id
399:                        id++;
400:                        //add the fat job to the dag
401:                        //use the method to add, else add explicitly to DagInfo
402:                        mScheduledDAG.add(fatJob);
403:
404:                        //log the refiner action capturing the creation of the job
405:                        this .logRefinerAction(fatJob, aggregator);
406:                    } else {
407:                        //do collapsing in chunks of cFactor
408:                        int increment = 0;
409:                        for (int i = 0; i < size; i = i + increment) {
410:                            //compute the increment and decrement cFactor[1]
411:                            increment = (cFactor[1] > 0) ? cFactor[0] + 1
412:                                    : cFactor[0];
413:                            cFactor[1]--;
414:
415:                            if (increment == 1) {
416:                                //we can exit out of the loop as we do not want
417:                                //any merging for single jobs
418:                                break;
419:                            } else if ((i + increment) < size) {
420:                                fatJob = aggregator.construct(l.subList(i, i
421:                                        + increment), name, constructID(
422:                                        partitionID, id));
423:
424:                                updateReplacementTable(l.subList(i, i
425:                                        + increment), fatJob);
426:                            } else {
427:                                fatJob = aggregator.construct(l
428:                                        .subList(i, size), name, constructID(
429:                                        partitionID, id));
430:                                updateReplacementTable(l.subList(i, size),
431:                                        fatJob);
432:                            }
433:
434:                            //increment the id
435:                            id++;
436:
437:                            //add the fat job to the dag
438:                            //use the method to add, else add explicitly to DagInfo
439:                            mScheduledDAG.add(fatJob);
440:
441:                            //log the refiner action capturing the creation of the job
442:                            this .logRefinerAction(fatJob, aggregator);
443:                        }
444:                    }
445:
446:                }
447:
448:                //explicity free the map
449:                tempMap = null;
450:            }
451:
452:            /**
453:             * Returns the clustered workflow.
454:             *
455:             * @return  the <code>ADag</code> object corresponding to the clustered workflow.
456:             *
457:             * @throws ClustererException in case of error.
458:             */
459:            public ADag getClusteredDAG() throws ClustererException {
460:                //do all the replacement of jobs in the main data structure
461:                //that needs to be returned
462:                replaceJobs();
463:
464:                //should be in the done method. which is currently not htere in the
465:                //Clusterer API
466:                try {
467:                    mPPS.endWorkflowRefinementStep(this );
468:                } catch (Exception e) {
469:                    throw new ClustererException(
470:                            "PASOA Exception while logging end of clustering refinement",
471:                            e);
472:                }
473:
474:                return mScheduledDAG;
475:            }
476:
477:            /**
478:             * Returns a textual description of the transfer implementation.
479:             *
480:             * @return a short textual description
481:             */
482:            public String description() {
483:                return this .DESCRIPTION;
484:            }
485:
486:            /**
487:             * Records the refiner action into the Provenace Store as a XML fragment.
488:             *
489:             * @param clusteredJob  the clustered job
490:             * @param aggregator    the aggregator that was used to create this clustered job
491:             */
492:            protected void logRefinerAction(AggregatedJob clusteredJob,
493:                    JobAggregator aggregator) {
494:                StringBuffer sb = new StringBuffer();
495:                String indent = "\t";
496:                sb.append(indent);
497:                sb.append("<clustered ");
498:                appendAttribute(sb, "job", clusteredJob.getName());
499:                appendAttribute(sb, "type", aggregator.getCollapserLFN());
500:                sb.append(">").append("\n");
501:
502:                //traverse through all the files
503:                String newIndent = indent + "\t";
504:                List jobs = new ArrayList();
505:                for (Iterator it = clusteredJob.constituentJobsIterator(); it
506:                        .hasNext();) {
507:                    SubInfo job = (SubInfo) it.next();
508:                    jobs.add(job.getName());
509:                    sb.append(newIndent);
510:                    sb.append("<constitutent ");
511:                    appendAttribute(sb, "job", job.getName());
512:                    sb.append("/>");
513:                    sb.append("\n");
514:                }
515:                sb.append(indent);
516:                sb.append("</clustered>");
517:                sb.append("\n");
518:
519:                //log the action for creating the relationship assertions
520:                try {
521:                    mPPS.clusteringOf(clusteredJob.getName(), jobs);
522:                } catch (Exception e) {
523:                    throw new RuntimeException(
524:                            "PASOA Exception while logging relationship assertion for clustering ",
525:                            e);
526:                }
527:
528:                mXMLStore.add(sb.toString());
529:
530:            }
531:
532:            /**
533:             * Appends an xml attribute to the xml feed.
534:             *
535:             * @param xmlFeed  the xmlFeed to which xml is being written
536:             * @param key   the attribute key
537:             * @param value the attribute value
538:             */
539:            protected void appendAttribute(StringBuffer xmlFeed, String key,
540:                    String value) {
541:                xmlFeed.append(key).append("=").append("\"").append(value)
542:                        .append("\" ");
543:            }
544:
545:            /**
546:             * A callback that triggers the collapsing of a partition/level of a graph.
547:             *
548:             * @param partition the partition that needs to be collapsed.
549:             *
550:             */
551:            /*
552:            private void collapseJobs(Partition partition){
553:                Set s = partition.getNodeIDs();
554:                List l = new ArrayList(s.size());
555:                mLogger.log("Collapsing jobs in partition " + partition.getID() +
556:                            " " +  s,
557:                            LogManager.DEBUG_MESSAGE_LEVEL);
558:
559:               for(Iterator it = s.iterator();it.hasNext();){
560:                   SubInfo job = (SubInfo)mSubInfoMap.get(it.next());
561:                   l.add(job);
562:               }
563:               //group the jobs by their transformation names
564:               Collections.sort(l,jobComparator());
565:               //traverse through the list and collapse jobs
566:               //referring to same logical transformation
567:               SubInfo previous = null;
568:               List clusterList = new LinkedList();
569:               SubInfo job = null;
570:               for(Iterator it = l.iterator();it.hasNext();){
571:                   job = (SubInfo)it.next();
572:                   if(previous == null ||
573:                      job.getCompleteTCName().equals(previous.getCompleteTCName())){
574:                       clusterList.add(job);
575:                   }
576:                   else{
577:                       //at boundary collapse jobs
578:                       collapseJobs(previous.getStagedExecutableBaseName(),clusterList,partition.getID());
579:                       clusterList = new LinkedList();
580:                       clusterList.add(job);
581:                   }
582:                   previous = job;
583:               }
584:               //cluster the last clusterList
585:               if(previous != null){
586:                   collapseJobs(previous.getStagedExecutableBaseName(), clusterList, partition.getID());
587:               }
588:
589:               //collapse the jobs in list l
590:            //       collapseJobs(job.logicalName,l,partition.getID());
591:            }
592:             */
593:
594:            /**
595:             * Returns the collapse factor, that is used to chunk up the jobs of a
596:             * particular type on a pool. The collapse factor is determined by
597:             * getting the collapse key in the VDS namespace/profile associated with the
598:             * job in the transformation catalog. Right now tc overrides the property
599:             * from the one in the properties file that specifies per pool.
600:             * There are two orthogonal notions of bundling and collapsing. In case the
601:             * bundle key is specified, it ends up overriding the collapse key, and
602:             * the bundle value is used to generate the collapse values.
603:             *
604:             * @param pool  the pool where the chunking up is occuring
605:             * @param job   the <code>SubInfo</code> object containing the job that
606:             *              is to be chunked up together.
607:             * @param size  the number of jobs that refer to the same logical
608:             *              transformation and are scheduled on the same execution pool.
609:             *
610:             * @return int array of size 2 where int[0] is the the collapse factor
611:             *         int[1] is the number of jobs for whom collapsing is int[0] + 1.
612:             */
613:            public int[] getCollapseFactor(String pool, SubInfo job, int size) {
614:                String factor = null;
615:                int result[] = new int[2];
616:                result[1] = 0;
617:
618:                //the job should have the collapse key from the TC if
619:                //by the user specified
620:                factor = (String) job.vdsNS.get(VDS.COLLAPSE_KEY);
621:
622:                //ceiling is (x + y -1)/y
623:                String bundle = (String) job.vdsNS.get(VDS.BUNDLE_KEY);
624:                if (bundle != null) {
625:                    int b = Integer.parseInt(bundle);
626:                    result[0] = size / b;
627:                    result[1] = size % b;
628:                    return result;
629:                    //doing no boundary condition checks
630:                    //return (size + b -1)/b;
631:                }
632:
633:                //return the appropriate value
634:                result[0] = (factor == null) ? ((factor = (String) mCollapseMap
635:                        .get(pool)) == null) ? this .DEFAULT_COLLAPSE_FACTOR : //the default value
636:                        Integer.parseInt(factor)//use the value in the prop file
637:                        :
638:                        //return the value found in the TC
639:                        Integer.parseInt(factor);
640:                return result;
641:
642:            }
643:
644:            /**
645:             * Given an integer id, returns a string id that is used for the clustered
646:             * job.
647:             *
648:             * @param partitionID  the id of the partition.
649:             * @param id           the integer id from which the string id has to be
650:             *                     constructed. The id should be unique for all the
651:             *                     clustered jobs that are formed for a particular
652:             *                     partition.
653:             *
654:             * @return the id of the clustered job
655:             */
656:            public String constructID(String partitionID, int id) {
657:                StringBuffer sb = new StringBuffer(8);
658:                sb.append("P").append(partitionID).append("_");
659:                sb.append("ID").append(id);
660:
661:                return sb.toString();
662:            }
663:
664:            /**
665:             * Updates the replacement table.
666:             *
667:             * @param jobs       the List of jobs that is being replaced.
668:             * @param mergedJob  the mergedJob that is replacing the jobs in the list.
669:             */
670:            private void updateReplacementTable(List jobs, SubInfo mergedJob) {
671:                if (jobs == null || jobs.isEmpty())
672:                    return;
673:                String mergedJobName = mergedJob.jobName;
674:                for (Iterator it = jobs.iterator(); it.hasNext();) {
675:                    SubInfo job = (SubInfo) it.next();
676:                    //put the entry in the replacement table
677:                    mReplacementTable.put(job.jobName, mergedJobName);
678:                }
679:
680:            }
681:
682:            /**
683:             * Puts the jobs in the abstract workflow into the job that is index
684:             * by the logical name of the jobs.
685:             */
686:            private void assimilateJobs() {
687:                Iterator it = mScheduledDAG.vJobSubInfos.iterator();
688:                SubInfo job = null;
689:                List l = null;
690:                String key = null;
691:
692:                while (it.hasNext()) {
693:                    job = (SubInfo) it.next();
694:                    key = job.logicalName;
695:                    //check if the job logical name is already in the map
696:                    if (mJobMap.containsKey(key)) {
697:                        //add the job to the corresponding list.
698:                        l = (List) mJobMap.get(key);
699:                        l.add(job);
700:                    } else {
701:                        //first instance of this logical name
702:                        l = new java.util.LinkedList();
703:                        l.add(job);
704:                        mJobMap.put(key, l);
705:                    }
706:                }
707:            }
708:
709:            /**
710:             * Constructs a map with the numbers/values for the collapsing factors to
711:             * collapse the nodes of same type. The user ends up specifying these through
712:             * the  properties file. The value of the property is of the form
713:             * poolname1=value,poolname2=value....
714:             *
715:             * @param propValue the value of the property got from the properties file.
716:             *
717:             * @return the constructed map.
718:             */
719:            private Map constructMap(String propValue) {
720:                Map map = new java.util.TreeMap();
721:
722:                if (propValue != null) {
723:                    StringTokenizer st = new StringTokenizer(propValue, ",");
724:                    while (st.hasMoreTokens()) {
725:                        String raw = st.nextToken();
726:                        int pos = raw.indexOf('=');
727:                        if (pos > 0) {
728:                            map.put(raw.substring(0, pos).trim(), raw
729:                                    .substring(pos + 1).trim());
730:                        }
731:                    }
732:                }
733:
734:                return map;
735:            }
736:
737:            /**
738:             * The relations/edges are changed in local graph structure.
739:             */
740:            private void replaceJobs() {
741:                boolean val = false;
742:                List l = null;
743:                List nl = null;
744:                SubInfo sub = new SubInfo();
745:                String msg;
746:
747:                for (Iterator it = mReplacementTable.entrySet().iterator(); it
748:                        .hasNext();) {
749:                    Map.Entry entry = (Map.Entry) it.next();
750:                    String key = (String) entry.getKey();
751:                    mLogger.log("Replacing job " + key + " with "
752:                            + entry.getValue(), LogManager.DEBUG_MESSAGE_LEVEL);
753:                    //remove the old job
754:                    //remove by just creating a subinfo object with the same key
755:                    sub.jobName = key;
756:                    val = mScheduledDAG.remove(sub);
757:                    if (val == false) {
758:                        throw new RuntimeException("Removal of job " + key
759:                                + " while clustering not successful");
760:                    }
761:                }
762:                mLogger.log("All clustered jobs removed from the workflow",
763:                        LogManager.DEBUG_MESSAGE_LEVEL);
764:
765:                //Set mergedEdges = new java.util.HashSet();
766:                //this is temp thing till the hast thing sorted out correctly
767:                List mergedEdges = new java.util.ArrayList(
768:                        mScheduledDAG.vJobSubInfos.size());
769:
770:                //traverse the edges and do appropriate replacements
771:                String parent = null;
772:                String child = null;
773:                String value = null;
774:                for (Iterator it = mScheduledDAG.dagInfo.relations.iterator(); it
775:                        .hasNext();) {
776:                    PCRelation rel = (PCRelation) it.next();
777:                    //replace the parent and child if there is a need
778:                    parent = rel.parent;
779:                    child = rel.child;
780:
781:                    msg = ("\n Replacing " + rel);
782:
783:                    value = (String) mReplacementTable.get(parent);
784:                    if (value != null) {
785:                        rel.parent = value;
786:                    }
787:                    value = (String) mReplacementTable.get(child);
788:                    if (value != null) {
789:                        rel.child = value;
790:                    }
791:                    msg += (" with " + rel);
792:
793:                    //put in the merged edges set
794:                    if (!mergedEdges.contains(rel)) {
795:                        val = mergedEdges.add(rel);
796:                        msg += "Add to set : " + val;
797:                    } else {
798:                        msg += "\t Duplicate Entry for " + rel;
799:                    }
800:                    mLogger.log(msg, LogManager.DEBUG_MESSAGE_LEVEL);
801:                }
802:
803:                //the final edges need to be updated
804:                mScheduledDAG.dagInfo.relations = null;
805:                mScheduledDAG.dagInfo.relations = new java.util.Vector(
806:                        mergedEdges);
807:            }
808:
809:            /**
810:             * A utility method to print short description of jobs in a list.
811:             *
812:             * @param l the list of <code>SubInfo</code> objects
813:             */
814:            private void printList(List l) {
815:                for (Iterator it = l.iterator(); it.hasNext();) {
816:                    SubInfo job = (SubInfo) it.next();
817:                    System.out.print(" " + /*job.getCompleteTCName() +*/
818:                    "[" + job.logicalId + "]");
819:                }
820:
821:            }
822:
823:            /**
824:             * A job comparator, that allows me to compare jobs according to the
825:             * transformation names. It is applied to group jobs in a particular partition,
826:             * according to the underlying transformation that is referred.
827:             * <p>
828:             * This comparator is not consistent with the SubInfo.equals(Object) method.
829:             * Hence, should not be used in sorted sets or Maps.
830:             */
831:            private class JobComparator implements  Comparator {
832:
833:                /**
834:                 * Compares this object with the specified object for order. Returns a
835:                 * negative integer, zero, or a positive integer if the first argument is
836:                 * less than, equal to, or greater than the specified object. The
837:                 * SubInfo are compared by their transformation name.
838:                 *
839:                 * This implementation is not consistent with the
840:                 * SubInfo.equals(Object) method. Hence, should not be used in sorted
841:                 * Sets or Maps.
842:                 *
843:                 * @param o1 is the first object to be compared.
844:                 * @param o2 is the second object to be compared.
845:                 *
846:                 * @return a negative number, zero, or a positive number, if the
847:                 * object compared against is less than, equals or greater than
848:                 * this object.
849:                 * @exception ClassCastException if the specified object's type
850:                 * prevents it from being compared to this Object.
851:                 */
852:                public int compare(Object o1, Object o2) {
853:                    if (o1 instanceof  SubInfo && o2 instanceof  SubInfo) {
854:                        return ((SubInfo) o1).getCompleteTCName().compareTo(
855:                                ((SubInfo) o2).getCompleteTCName());
856:
857:                    } else {
858:                        throw new ClassCastException(
859:                                "Objects being compared are not SubInfo");
860:                    }
861:                }
862:            }
863:
864:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.