Source Code Cross Referenced for Windward.java in  » Workflow-Engines » pegasus-2.1.0 » org » griphyn » cPlanner » transfer » implementation » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Workflow Engines » pegasus 2.1.0 » org.griphyn.cPlanner.transfer.implementation 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


001:        /*
002:         * This file or a portion of this file is licensed under the terms of
003:         * the Globus Toolkit Public License, found in file GTPL, or at
004:         * http://www.globus.org/toolkit/download/license.html. This notice must
005:         * appear in redistributions of this file, with or without modification.
006:         *
007:         * Redistributions of this Software, with or without modification, must
008:         * reproduce the GTPL in: (1) the Software, or (2) the Documentation or
009:         * some other similar material which is provided with the Software (if
010:         * any).
011:         *
012:         * Copyright 1999-2004 University of Chicago and The University of
013:         * Southern California. All rights reserved.
014:         */
015:
016:        package org.griphyn.cPlanner.transfer.implementation;
017:
018:        import org.griphyn.cPlanner.classes.ADag;
019:        import org.griphyn.cPlanner.classes.SubInfo;
020:        import org.griphyn.cPlanner.classes.TransferJob;
021:        import org.griphyn.cPlanner.classes.PlannerOptions;
022:        import org.griphyn.cPlanner.classes.FileTransfer;
023:        import org.griphyn.cPlanner.classes.SiteInfo;
024:        import org.griphyn.cPlanner.classes.JobManager;
025:        import org.griphyn.cPlanner.classes.NameValue;
026:        import org.griphyn.cPlanner.classes.PegasusBag;
027:
028:        import org.griphyn.cPlanner.common.LogManager;
029:        import org.griphyn.cPlanner.common.PegasusProperties;
030:
031:        import org.griphyn.cPlanner.transfer.Implementation;
032:        import org.griphyn.cPlanner.transfer.MultipleFTPerXFERJob;
033:
034:        import org.griphyn.common.catalog.TransformationCatalogEntry;
035:
036:        import org.griphyn.common.classes.TCType;
037:
038:        import org.griphyn.common.util.Separator;
039:
040:        import org.griphyn.cPlanner.cluster.aggregator.JobAggregatorFactory;
041:        import org.griphyn.cPlanner.cluster.JobAggregator;
042:
043:        import java.io.File;
044:        import java.io.FileWriter;
045:
046:        import java.util.Collection;
047:        import java.util.HashSet;
048:        import java.util.List;
049:        import java.util.LinkedList;
050:        import java.util.Iterator;
051:        import org.griphyn.cPlanner.transfer.Refiner;
052:        import org.griphyn.cPlanner.classes.Profile;
053:        import java.util.ArrayList;
054:
055:        /**
056:         * A Windward implementation that uses the seqexec client to execute
057:         *
058:         * -DC Transfer client to fetch the raw data sources
059:         * -Pegasus transfer client to fetch the patterns from the pattern catalog.
060:         *
061:         * @author Karan Vahi
062:         * @version $Revision: 450 $
063:         */
064:
065:        public class Windward extends Abstract implements  MultipleFTPerXFERJob {
066:
067:            /**
068:             * The prefix to identify the raw data sources.
069:             */
070:            public static final String DATA_SOURCE_PREFIX = "DS";
071:
072:            /**
073:             * A short description of the transfer implementation.
074:             */
075:            public static final String DESCRIPTION = "Seqexec Transfer Wrapper around Pegasus Transfer and DC Transfer Client";
076:
077:            /**
078:             * The transformation namespace for for the transfer job.
079:             */
080:            public static final String TRANSFORMATION_NAMESPACE = "windward";
081:
082:            /**
083:             * The name of the underlying transformation that is queried for in the
084:             * Transformation Catalog.
085:             */
086:            public static final String TRANSFORMATION_NAME = "dc-transfer";
087:
088:            /**
089:             * The version number for the transfer job.
090:             */
091:            public static final String TRANSFORMATION_VERSION = null;
092:
093:            /**
094:             * The derivation namespace for for the transfer job.
095:             */
096:            public static final String DERIVATION_NAMESPACE = "windward";
097:
098:            /**
099:             * The name of the underlying derivation.
100:             */
101:            public static final String DERIVATION_NAME = "dc-transfer";
102:
103:            /**
104:             * The derivation version number for the transfer job.
105:             */
106:            public static final String DERIVATION_VERSION = null;
107:
108:            /**
109:             * The handle to the transfer implementation.
110:             */
111:            private Transfer mPegasusTransfer;
112:
113:            /**
114:             * The seqexec job aggregator.
115:             */
116:            private JobAggregator mSeqExecAggregator;
117:
118:            /**
119:             * The overloaded constructor, that is called by the Factory to load the
120:             * class.
121:             *
122:             * @param properties  the properties object.
123:             * @param options     the options passed to the Planner.
124:             */
125:            public Windward(PegasusProperties properties, PlannerOptions options) {
126:                super (properties, options);
127:
128:                //should probably go through the factory
129:                mPegasusTransfer = new Transfer(properties, options);
130:
131:                //just to pass the label have to send an empty ADag.
132:                //should be fixed
133:                ADag dag = new ADag();
134:                dag.dagInfo.setLabel("windward");
135:                PegasusBag bag = new PegasusBag();
136:                bag.add(PegasusBag.PEGASUS_PROPERTIES, properties);
137:                bag.add(PegasusBag.PEGASUS_LOGMANAGER, mLogger);
138:                bag.add(PegasusBag.PLANNER_OPTIONS, options);
139:
140:                mSeqExecAggregator = JobAggregatorFactory.loadInstance(
141:                        JobAggregatorFactory.SEQ_EXEC_CLASS, dag, bag);
142:            }
143:
144:            /**
145:             * Sets the callback to the refiner, that has loaded this implementation.
146:             *
147:             * @param refiner  the transfer refiner that loaded the implementation.
148:             */
149:            public void setRefiner(Refiner refiner) {
150:                super .setRefiner(refiner);
151:                //also set the refiner for hte internal pegasus transfer
152:                mPegasusTransfer.setRefiner(refiner);
153:            }
154:
155:            /**
156:             *
157:             *
158:             * @param job         the SubInfo object for the job, in relation to which
159:             *                    the transfer node is being added. Either the transfer
160:             *                    node can be transferring this jobs input files to
161:             *                    the execution pool, or transferring this job's output
162:             *                    files to the output pool.
163:             * @param files       collection of <code>FileTransfer</code> objects
164:             *                    representing the data files and staged executables to be
165:             *                    transferred.
166:             * @param execFiles   subset collection of the files parameter, that identifies
167:             *                    the executable files that are being transferred.
168:             * @param txJobName   the name of transfer node.
169:             * @param jobClass    the job Class for the newly added job. Can be one of the
170:             *                    following:
171:             *                              stage-in
172:             *                              stage-out
173:             *                              inter-pool transfer
174:             *
175:             * @return  the created TransferJob.
176:             */
177:            public TransferJob createTransferJob(SubInfo job, Collection files,
178:                    Collection execFiles, String txJobName, int jobClass) {
179:
180:                //iterate through all the files and identify the patterns
181:                //and the other data sources
182:                Collection rawDataSources = new LinkedList();
183:                Collection patterns = new LinkedList();
184:
185:                for (Iterator it = files.iterator(); it.hasNext();) {
186:                    FileTransfer ft = (FileTransfer) it.next();
187:                    if (ft.getLFN().startsWith(DATA_SOURCE_PREFIX)
188:                            && !ft.getSourceURL().getValue().endsWith(".zip")) {
189:                        //it a raw data source that will have to be ingested
190:                        rawDataSources.add(ft);
191:                    } else {
192:                        //everything else is a pattern
193:                        patterns.add(ft);
194:                    }
195:                }
196:
197:                List txJobs = new LinkedList();
198:
199:                //use the Pegasus Transfer to handle the patterns
200:                TransferJob patternTXJob = null;
201:                String patternTXJobStdin = null;
202:                if (!patterns.isEmpty()) {
203:                    patternTXJob = mPegasusTransfer.createTransferJob(job,
204:                            patterns, null, txJobName, jobClass);
205:
206:                    //get the stdin and set it as lof in the arguments
207:                    patternTXJobStdin = patternTXJob.getStdIn();
208:                    StringBuffer patternArgs = new StringBuffer();
209:                    patternArgs.append(patternTXJob.getArguments()).append(" ")
210:                            .append(patternTXJobStdin);
211:                    patternTXJob.setArguments(patternArgs.toString());
212:                    patternTXJob.setStdIn("");
213:                    txJobs.add(patternTXJob);
214:                }
215:
216:                TransformationCatalogEntry tcEntry = this 
217:                        .getTransformationCatalogEntry(job.getSiteHandle());
218:                if (tcEntry == null) {
219:                    //should throw a TC specific exception
220:                    StringBuffer error = new StringBuffer();
221:                    error.append("Could not find entry in tc for lfn ").append(
222:                            getCompleteTCName()).append(" at site ").append(
223:                            job.getSiteHandle());
224:                    mLogger.log(error.toString(),
225:                            LogManager.ERROR_MESSAGE_LEVEL);
226:                    throw new RuntimeException(error.toString());
227:                }
228:
229:                //this should in fact only be set
230:                // for non third party pools
231:                //we first check if there entry for transfer universe,
232:                //if no then go for globus
233:                SiteInfo ePool = mSCHandle.getTXPoolEntry(job.getSiteHandle());
234:                JobManager jobmanager = ePool.selectJobManager(
235:                        this .TRANSFER_UNIVERSE, true);
236:
237:                //use the DC transfer client to handle the data sources
238:                for (Iterator it = rawDataSources.iterator(); it.hasNext();) {
239:                    FileTransfer ft = (FileTransfer) it.next();
240:                    TransferJob dcTXJob = new TransferJob();
241:
242:                    dcTXJob.namespace = tcEntry.getLogicalNamespace();
243:                    dcTXJob.logicalName = tcEntry.getLogicalName();
244:                    dcTXJob.version = tcEntry.getLogicalVersion();
245:
246:                    dcTXJob.dvNamespace = this .DERIVATION_NAMESPACE;
247:                    dcTXJob.dvName = this .DERIVATION_NAME;
248:                    dcTXJob.dvVersion = this .DERIVATION_VERSION;
249:
250:                    dcTXJob.setRemoteExecutable(tcEntry
251:                            .getPhysicalTransformation());
252:
253:                    dcTXJob.globusScheduler = (jobmanager == null) ? null
254:                            : jobmanager.getInfo(JobManager.URL);
255:
256:                    dcTXJob.setArguments(quote(((NameValue) ft.getSourceURL())
257:                            .getValue())
258:                            + " "
259:                            + quote(((NameValue) ft.getDestURL()).getValue()));
260:                    dcTXJob.setStdIn("");
261:                    dcTXJob.setStdOut("");
262:                    dcTXJob.setStdErr("");
263:                    dcTXJob.setSiteHandle(job.getSiteHandle());
264:
265:                    //the profile information from the transformation
266:                    //catalog needs to be assimilated into the job
267:                    dcTXJob.updateProfiles(tcEntry);
268:
269:                    txJobs.add(dcTXJob);
270:                }
271:
272:                //now lets merge all these jobs
273:                SubInfo merged = mSeqExecAggregator.construct(txJobs,
274:                        "transfer", txJobName);
275:                TransferJob txJob = new TransferJob(merged);
276:
277:                //set the name of the merged job back to the name of
278:                //transfer job passed in the function call
279:                txJob.setName(txJobName);
280:                txJob.setJobType(jobClass);
281:
282:                //if a pattern job was constructed add the pattern stdin
283:                //as an input file for condor to transfer
284:                if (patternTXJobStdin != null) {
285:                    txJob.condorVariables
286:                            .addIPFileForTransfer(patternTXJobStdin);
287:                }
288:                //take care of transfer of proxies
289:                this .checkAndTransferProxy(txJob);
290:
291:                //apply the priority to the transfer job
292:                this .applyPriority(txJob);
293:
294:                if (execFiles != null) {
295:                    //we need to add setup jobs to change the XBit
296:                    super .addSetXBitJobs(job, txJob, execFiles);
297:                }
298:
299:                return txJob;
300:            }
301:
302:            /**
303:             * Returns a textual description of the transfer implementation.
304:             *
305:             * @return a short textual description
306:             */
307:            public String getDescription() {
308:                return this .DESCRIPTION;
309:            }
310:
311:            /**
312:             * Returns a boolean indicating whether the transfer protocol being used by
313:             * the implementation preserves the X Bit or not while staging.
314:             *
315:             * @return boolean
316:             */
317:            public boolean doesPreserveXBit() {
318:                return false;
319:            }
320:
321:            /**
322:             * Return a boolean indicating whether the transfers to be done always in
323:             * a third party transfer mode. A value of false, results in the
324:             * direct or peer to peer transfers being done.
325:             * <p>
326:             * A value of false does not preclude third party transfers. They still can
327:             * be done, by setting the property "pegasus.transfer.*.thirdparty.sites".
328:             *
329:             * @return boolean indicating whether to always use third party transfers
330:             *         or not.
331:             *
332:             * @see PegasusProperties#getThirdPartySites(String)
333:             */
334:            public boolean useThirdPartyTransferAlways() {
335:                return false;
336:            }
337:
338:            /**
339:             * Retrieves the transformation catalog entry for the executable that is
340:             * being used to transfer the files in the implementation.
341:             *
342:             * @param siteHandle  the handle of the  site where the transformation is
343:             *                    to be searched.
344:             *
345:             * @return  the transformation catalog entry if found, else null.
346:             */
347:            public TransformationCatalogEntry getTransformationCatalogEntry(
348:                    String siteHandle) {
349:                List tcentries = null;
350:                try {
351:                    //namespace and version are null for time being
352:                    tcentries = mTCHandle.getTCEntries(
353:                            this .TRANSFORMATION_NAMESPACE,
354:                            this .TRANSFORMATION_NAME,
355:                            this .TRANSFORMATION_VERSION, siteHandle,
356:                            TCType.INSTALLED);
357:                } catch (Exception e) {
358:                    mLogger.log("Unable to retrieve entry from TC for "
359:                            + getCompleteTCName() + " Cause:" + e,
360:                            LogManager.DEBUG_MESSAGE_LEVEL);
361:                }
362:
363:                return (tcentries == null) ? this .defaultTCEntry(
364:                        this .TRANSFORMATION_NAMESPACE,
365:                        this .TRANSFORMATION_NAME, this .TRANSFORMATION_VERSION,
366:                        siteHandle) : //try using a default one
367:                        (TransformationCatalogEntry) tcentries.get(0);
368:
369:            }
370:
371:            /**
372:             * Quotes a URL and returns it
373:             *
374:             * @param url String
375:             * @return quoted url
376:             */
377:            protected String quote(String url) {
378:                StringBuffer q = new StringBuffer();
379:                q.append("'").append(url).append("'");
380:                return q.toString();
381:            }
382:
383:            /**
384:             * Returns a default TC entry to be used for the DC transfer client.
385:             *
386:             * @param namespace  the namespace of the transfer transformation
387:             * @param name       the logical name of the transfer transformation
388:             * @param version    the version of the transfer transformation
389:             *
390:             * @param site  the site for which the default entry is required.
391:             *
392:             *
393:             * @return  the default entry.
394:             */
395:            protected TransformationCatalogEntry defaultTCEntry(
396:                    String namespace, String name, String version, String site) {
397:
398:                TransformationCatalogEntry defaultTCEntry = null;
399:                //check if DC_HOME is set
400:                String dcHome = mSCHandle.getEnvironmentVariable(site,
401:                        "DC_HOME");
402:
403:                mLogger.log("Creating a default TC entry for "
404:                        + Separator.combine(namespace, name, version)
405:                        + " at site " + site, LogManager.DEBUG_MESSAGE_LEVEL);
406:
407:                //if home is still null
408:                if (dcHome == null) {
409:                    //cannot create default TC
410:                    mLogger.log("Unable to create a default entry for "
411:                            + Separator.combine(namespace, name, version)
412:                            + " as DC_HOME is not set in Site Catalog",
413:                            LogManager.DEBUG_MESSAGE_LEVEL);
414:                    //set the flag back to true
415:                    return defaultTCEntry;
416:                }
417:
418:                //get the essential environment variables required to get
419:                //it to work correctly
420:                List envs = this .getEnvironmentVariables(site);
421:                if (envs == null) {
422:                    //cannot create default TC
423:                    mLogger
424:                            .log(
425:                                    "Unable to create a default entry for as could not construct necessary environment "
426:                                            + Separator.combine(namespace,
427:                                                    name, version),
428:                                    LogManager.DEBUG_MESSAGE_LEVEL);
429:                    //set the flag back to true
430:                    return defaultTCEntry;
431:                }
432:                //add the DC home to environments
433:                envs.add(new Profile(Profile.ENV, "DC_HOME", dcHome));
434:
435:                //remove trailing / if specified
436:                dcHome = (dcHome.charAt(dcHome.length() - 1) == File.separatorChar) ? dcHome
437:                        .substring(0, dcHome.length() - 1)
438:                        : dcHome;
439:
440:                //construct the path to the jar
441:                StringBuffer path = new StringBuffer();
442:                path.append(dcHome).append(File.separator).append("bin")
443:                        .append(File.separator).append("dc-transfer");
444:
445:                defaultTCEntry = new TransformationCatalogEntry(namespace,
446:                        name, version);
447:
448:                defaultTCEntry.setPhysicalTransformation(path.toString());
449:                defaultTCEntry.setResourceId(site);
450:                defaultTCEntry.setType(TCType.INSTALLED);
451:                defaultTCEntry.setProfiles(envs);
452:
453:                //register back into the transformation catalog
454:                //so that we do not need to worry about creating it again
455:                try {
456:                    mTCHandle.addTCEntry(defaultTCEntry, false);
457:                } catch (Exception e) {
458:                    //just log as debug. as this is more of a performance improvement
459:                    //than anything else
460:                    mLogger.log(
461:                            "Unable to register in the TC the default entry "
462:                                    + defaultTCEntry.getLogicalTransformation()
463:                                    + " for site " + site, e,
464:                            LogManager.DEBUG_MESSAGE_LEVEL);
465:                }
466:                mLogger.log("Created entry with path "
467:                        + defaultTCEntry.getPhysicalTransformation(),
468:                        LogManager.DEBUG_MESSAGE_LEVEL);
469:                return defaultTCEntry;
470:            }
471:
472:            /**
473:             * Returns the environment profiles that are required for the default
474:             * entry to sensibly work.
475:             *
476:             * @param site the site where the job is going to run.
477:             *
478:             * @return List of environment variables, else null in case where the
479:             *         required environment variables could not be found.
480:             */
481:            protected List getEnvironmentVariables(String site) {
482:                List result = new ArrayList(1);
483:
484:                //create the CLASSPATH from home
485:                String java = mSCHandle.getEnvironmentVariable(site,
486:                        "JAVA_HOME");
487:                if (java == null) {
488:                    mLogger.log("JAVA_HOME not set in site catalog for site "
489:                            + site, LogManager.DEBUG_MESSAGE_LEVEL);
490:                    return null;
491:                }
492:
493:                //we have both the environment variables
494:                result.add(new Profile(Profile.ENV, "JAVA_HOME", java));
495:
496:                return result;
497:            }
498:
499:            /**
500:             * Returns the complete name for the transformation.
501:             *
502:             * @return the complete name.
503:             */
504:            protected String getCompleteTCName() {
505:                return Separator.combine(this.TRANSFORMATION_NAMESPACE,
506:                        this.TRANSFORMATION_NAME, this.TRANSFORMATION_VERSION);
507:            }
508:
509:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.