Source Code Cross Referenced for Horizontal.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » partitioner » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.partitioner 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:        package org.griphyn.cPlanner.partitioner;
016:
017:        import org.griphyn.cPlanner.common.LogManager;
018:        import org.griphyn.cPlanner.common.PegasusProperties;
019:
020:        import org.griphyn.cPlanner.partitioner.graph.GraphNode;
021:        import org.griphyn.cPlanner.partitioner.graph.Bag;
022:        import org.griphyn.cPlanner.partitioner.graph.LabelBag;
023:
024:        import java.util.Collections;
025:        import java.util.Comparator;
026:        import java.util.Iterator;
027:        import java.util.LinkedList;
028:        import java.util.List;
029:        import java.util.ArrayList;
030:        import java.util.Map;
031:        import java.util.HashMap;
032:        import java.util.Set;
033:        import java.util.HashSet;
034:
035:        /**
036:         * Horizontal based partitioning scheme, that allows the user to configure the
037:         * number of partitions per transformation name per level.
038:         * To set the size of the partition per transformation, the following properties
039:         * need to be set
040:         * <pre>
041:         *       pegasus.partitioner.horizontal.collapse.[txName]
042:         *       pegasus.partitioner.horizontal.bundle.[txName]
043:         * </pre>
044:         *
045:         * The bundle value designates the number of partitions per transformation per level.
046:         * The collapse values designates the number of nodes in a partitioning referring
047:         * to a particular transformation. If both are specified, then bundle value takes
048:         * precedence.
049:         *
050:         * @author Karan Vahi
051:         * @version $Revision: 50 $
052:         */
053:        public class Horizontal extends BFS {
054:
055:            /**
056:             * A short description about the partitioner.
057:             */
058:            public static final String DESCRIPTION = "Configurable Level Based Partitioning";
059:
060:            /**
061:             * The default collapse factor for collapsing jobs with same logical name
062:             * scheduled onto the same execution pool.
063:             */
064:            public static final int DEFAULT_COLLAPSE_FACTOR = 3;
065:
066:            /**
067:             * A map indexed by the partition ID. Each value is a partition object.
068:             */
069:            private Map mPartitionMap;
070:
071:            /**
072:             * A static instance of GraphNode comparator.
073:             */
074:            private GraphNodeComparator mNodeComparator;
075:
076:            /**
077:             * The global counter that is used to assign ID's to the partitions.
078:             */
079:            private int mIDCounter;
080:
081:            /**
082:             * Singleton access to the job comparator.
083:             *
084:             * @return the job comparator.
085:             */
086:            private Comparator nodeComparator() {
087:                return (mNodeComparator == null) ? new GraphNodeComparator()
088:                        : mNodeComparator;
089:            }
090:
091:            /**
092:             * The overloaded constructor.
093:             *
094:             * @param root   the dummy root node of the graph.
095:             * @param graph  the map containing all the nodes of the graph keyed by
096:             *               the logical id of the nodes.
097:             * @param properties the properties passed to the planner.
098:             */
099:            public Horizontal(GraphNode root, Map graph,
100:                    PegasusProperties properties) {
101:                super (root, graph, properties);
102:                mIDCounter = 0;
103:                mPartitionMap = new HashMap(10);
104:            }
105:
106:            /**
107:             * Returns a textual description of the partitioner implementation.
108:             *
109:             * @return a short textual description
110:             */
111:            public String description() {
112:                return this .DESCRIPTION;
113:            }
114:
115:            /**
116:             * Given a list of jobs, constructs (one or more) partitions out of it.
117:             * Calls out to the partitioner callback, for each of the partitions
118:             * constructed.
119:             *
120:             * @param c     the parititoner callback
121:             * @param nodes the list of <code>GraphNode</code> objects on a particular level.
122:             * @param level the level as determined from the root of the workflow.
123:             */
124:            protected void constructPartitions(Callback c, List nodes, int level) {
125:                //group the nodes by their logical names
126:                Collections.sort(nodes, nodeComparator());
127:                //traverse through the list and collapse jobs
128:                //referring to same logical transformation
129:                GraphNode previous = null;
130:                List clusterList = new LinkedList();
131:                GraphNode node = null;
132:
133:                for (Iterator it = nodes.iterator(); it.hasNext();) {
134:                    node = (GraphNode) it.next();
135:                    if (previous == null
136:                            || node.getName().equals(previous.getName())) {
137:                        clusterList.add(node);
138:                    } else {
139:                        //at boundary collapse jobs
140:                        constructPartitions(c, clusterList, level, previous
141:                                .getName());
142:                        clusterList = new LinkedList();
143:                        clusterList.add(node);
144:                    }
145:                    previous = node;
146:                }
147:                //cluster the last clusterList
148:                if (previous != null) {
149:                    constructPartitions(c, clusterList, level, previous
150:                            .getName());
151:                }
152:
153:            }
154:
155:            /**
156:             * Given a list of jobs, constructs (one or more) partitions out of it.
157:             * Calls out to the partitioner callback, for each of the partitions
158:             * constructed.
159:             *
160:             * @param c     the parititoner callback
161:             * @param nodes the list of <code>GraphNode</code> objects on a particular level,
162:             *              referring to the same transformation underneath.
163:             * @param level the level as determined from the root of the workflow.
164:             * @param name  the transformation name
165:             */
166:            protected void constructPartitions(Callback c, List nodes,
167:                    int level, String name) {
168:                //figure out number of jobs that go into one partition
169:                int[] cFactor = new int[2];
170:                cFactor[0] = 0;
171:                cFactor[1] = 0;
172:
173:                int size = nodes.size();
174:                cFactor = this .getCollapseFactor(name, size);
175:
176:                StringBuffer message = new StringBuffer();
177:
178:                if (cFactor[0] == 0 && cFactor[1] == 0) {
179:                    message.append("\t Collapse factor of ").append(cFactor[0])
180:                            .append(",").append(cFactor[1]).append(
181:                                    " determined for transformation ").append(
182:                                    name);
183:                    mLogger.log(message.toString(),
184:                            LogManager.DEBUG_MESSAGE_LEVEL);
185:                    return;
186:                }
187:
188:                message.append("Partitioning jobs of type ").append(name)
189:                        .append(" at level ").append(level).append(
190:                                " wth collapse factor ").append(cFactor[0])
191:                        .append(",").append(cFactor[1]);
192:
193:                mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
194:
195:                Partition p;
196:                if (cFactor[0] >= size) {
197:                    //means put all the nodes in one partition
198:                    //we want to ignore the dummy node partition
199:                    p = createPartition(nodes);
200:                    c.cbPartition(p);
201:                } else {
202:                    //do collapsing in chunks of cFactor
203:                    int increment = 0;
204:                    int toIndex;
205:                    for (int i = 0; i < size; i = i + increment) {
206:                        //compute the increment and decrement cFactor[1]
207:                        increment = (cFactor[1] > 0) ? cFactor[0] + 1
208:                                : cFactor[0];
209:                        cFactor[1]--;
210:
211:                        //determine the toIndex for creating the partition
212:                        toIndex = ((i + increment) < size) ? i + increment
213:                                : size;
214:
215:                        p = createPartition(nodes.subList(i, toIndex));
216:                        c.cbPartition(p);
217:                    }
218:                }
219:
220:            }
221:
222:            /**
223:             * Calls out to the callback with appropriate relations between the partitions
224:             * constructed for the levels. This is an empty implementation, as we
225:             * do our own book-keeping in this partitioner to determine the relations
226:             * between the partitions.
227:             *
228:             * @param c       the parititoner callback
229:             * @param parent  the parent level
230:             * @param child   the child level.
231:             *
232:             * @see #done( Callback )
233:             */
234:            protected void constructLevelRelations(Callback c, int parent,
235:                    int child) {
236:
237:            }
238:
239:            /**
240:             * Indicates that we are done with the traversal of the graph. Determines
241:             * the relations between the partitions constructed and calls out to the
242:             * appropriate callback function
243:             *
244:             * @param c  the partitioner callback
245:             */
246:            protected void done(Callback c) {
247:                GraphNode node;
248:                GraphNode parent;
249:
250:                mLogger.log("Determining relations between partitions",
251:                        LogManager.INFO_MESSAGE_LEVEL);
252:                //construct the relations
253:                for (Iterator it = mPartitionMap.entrySet().iterator(); it
254:                        .hasNext();) {
255:                    Map.Entry entry = (Map.Entry) it.next();
256:                    Partition p = (Partition) entry.getValue();
257:                    List roots = p.getRootNodes();
258:                    Set parentPartitions = new HashSet(roots.size());
259:
260:                    //get the Root nodes for each partition and
261:                    //for each root, determine the partitions of it's parents
262:                    for (Iterator rootIt = roots.iterator(); rootIt.hasNext();) {
263:                        node = (GraphNode) rootIt.next();
264:                        for (Iterator parentsIt = node.getParents().iterator(); parentsIt
265:                                .hasNext();) {
266:                            parent = (GraphNode) parentsIt.next();
267:                            //the parents partition id is parent for the
268:                            //partition containing the root
269:                            parentPartitions.add(parent.getBag().get(
270:                                    LabelBag.PARTITION_KEY));
271:                        }
272:                    }
273:                    //write out all the parents of the partition
274:                    if (!parentPartitions.isEmpty()) {
275:                        c.cbParents(p.getID(), new ArrayList(parentPartitions));
276:                    }
277:                }
278:                mLogger.logCompletion(
279:                        "Determining relations between partitions",
280:                        LogManager.INFO_MESSAGE_LEVEL);
281:
282:                //done with the partitioning
283:                c.cbDone();
284:            }
285:
286:            /**
287:             * Returns the collapse factor, that is used to determine the number of nodes
288:             * going in a partition. The collapse factor is determined by
289:             * getting the collapse and the bundle values specified for the transformations
290:             * in the properties file.
291:             *
292:             * There are two orthogonal notions of bundling and collapsing. In case the
293:             * bundle key is specified, it ends up overriding the collapse key, and
294:             * the bundle value is used to generate the collapse values.
295:             *
296:             * If both are not specified or null, then collapseFactor is set to size.
297:             *
298:             * @param txName the logical transformation name
299:             * @param size  the number of jobs that refer to the same logical
300:             *              transformation and are scheduled on the same execution pool.
301:             *
302:             * @return int array of size 2 where :-
303:             *         int[0] is the the collapse factor  (number of nodes in a partition)
304:             *         int[1] is the number of parititons for whom collapsing is int[0] + 1.
305:             */
306:            protected int[] getCollapseFactor(String txName, int size) {
307:                String factor = null;
308:                String bundle = null;
309:                int result[] = new int[2];
310:                result[1] = 0;
311:
312:                //the job should have the collapse key from the TC if
313:                //by the user specified
314:                try {
315:                    //ceiling is (x + y -1)/y
316:                    bundle = mProps.getHorizontalPartitionerBundleValue(txName);
317:                    if (bundle != null) {
318:                        int b = Integer.parseInt(bundle);
319:                        result[0] = size / b;
320:                        result[1] = size % b;
321:                        return result;
322:                        //doing no boundary condition checks
323:                        //return (size + b -1)/b;
324:                    }
325:
326:                    factor = mProps
327:                            .getHorizontalPartitionerCollapseValue(txName);
328:                    //return the appropriate value
329:                    result[0] = (factor == null) ? size : //then collapse factor is same as size
330:                            Integer.parseInt(factor); //use the value in the prop file
331:                } catch (NumberFormatException e) {
332:                    //set bundle to size
333:                    StringBuffer error = new StringBuffer();
334:
335:                    if (factor == null) {
336:                        error.append("Bundle value (").append(bundle).append(
337:                                ")");
338:                    } else {
339:                        error.append("Collapse value (").append(factor).append(
340:                                ")");
341:                    }
342:                    error.append(" for transformation ").append(txName).append(
343:                            " is not a number");
344:                    mLogger.log(error.toString(),
345:                            LogManager.DEBUG_MESSAGE_LEVEL);
346:                    result[0] = size;
347:                }
348:                return result;
349:
350:            }
351:
352:            /**
353:             * Creates a partition out of a list of nodes. Also stores it in the internal
354:             * partition map to track partitions later on. Associates the partition ID
355:             * with each of the nodes making the partition also.
356:             *
357:             * @param nodes  the list of <code>GraphNodes</code> making the partition.
358:             *
359:             * @return the partition out of those nodes.
360:             */
361:            protected Partition createPartition(List nodes) {
362:                //increment the ID counter before getting the ID
363:                this .incrementIDCounter();
364:                String id = getPartitionID(this .idCounter());
365:                Partition p = new Partition(nodes, id);
366:                p.setIndex(this .idCounter());
367:                p.constructPartition();
368:
369:                mPartitionMap.put(p.getID(), p);
370:
371:                //associate the ID with all the nodes
372:                for (Iterator it = nodes.iterator(); it.hasNext();) {
373:                    GraphNode node = (GraphNode) it.next();
374:                    Bag b = new LabelBag();
375:                    b.add(LabelBag.PARTITION_KEY, id);
376:                    node.setBag(b);
377:                }
378:
379:                //log a message
380:                StringBuffer message = new StringBuffer();
381:                message.append("Partition ").append(p.getID()).append(" is :")
382:                        .append(p.getNodeIDs());
383:                mLogger.log(message.toString(), LogManager.DEBUG_MESSAGE_LEVEL);
384:
385:                return p;
386:            }
387:
388:            /**
389:             * Increments the ID counter by 1.
390:             */
391:            private void incrementIDCounter() {
392:                mIDCounter++;
393:            }
394:
395:            /**
396:             * Returns the current value of the ID counter.
397:             */
398:            private int idCounter() {
399:                return mIDCounter;
400:            }
401:
402:            /**
403:             * Constructs the id for the partition.
404:             *
405:             * @param id an integer ID.
406:             *
407:             * @return the ID for the Partition.
408:             */
409:            private String getPartitionID(int id) {
410:                StringBuffer sb = new StringBuffer(5);
411:                sb.append("ID").append(id);
412:                return sb.toString();
413:            }
414:
415:            /**
416:             * A GraphNode comparator, that allows me to compare nodes according to the
417:             * transformation logical names. It is applied to group jobs in a particular partition,
418:             * according to the underlying transformation that is referred.
419:             *
420:             */
421:            private class GraphNodeComparator implements  Comparator {
422:
423:                /**
424:                 * Compares this object with the specified object for order. Returns a
425:                 * negative integer, zero, or a positive integer if the first argument is
426:                 * less than, equal to, or greater than the specified object. The
427:                 * SubInfo are compared by their transformation name.
428:                 *
429:                 * This implementation is not consistent with the
430:                 * SubInfo.equals(Object) method. Hence, should not be used in sorted
431:                 * Sets or Maps.
432:                 *
433:                 * @param o1 is the first object to be compared.
434:                 * @param o2 is the second object to be compared.
435:                 *
436:                 * @return a negative number, zero, or a positive number, if the
437:                 * object compared against is less than, equals or greater than
438:                 * this object.
439:                 * @exception ClassCastException if the specified object's type
440:                 * prevents it from being compared to this Object.
441:                 */
442:                public int compare(Object o1, Object o2) {
443:                    if (o1 instanceof  GraphNode && o2 instanceof  GraphNode) {
444:                        return ((GraphNode) o1).getName().compareTo(
445:                                ((GraphNode) o2).getName());
446:
447:                    } else {
448:                        throw new ClassCastException(
449:                                "Objects being compared are not  GraphNode");
450:                    }
451:                }
452:            }
453:
454:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.