001: /*
002: * The contents of this file are subject to the terms of the Common Development
003: * and Distribution License (the License). You may not use this file except in
004: * compliance with the License.
005: *
006: * You can obtain a copy of the License at http://www.netbeans.org/cddl.html
007: * or http://www.netbeans.org/cddl.txt.
008: *
009: * When distributing Covered Code, include this CDDL Header Notice in each file
010: * and include the License file at http://www.netbeans.org/cddl.txt.
011: * If applicable, add the following below the CDDL Header, with the fields
012: * enclosed by brackets [] replaced by your own identifying information:
013: * "Portions Copyrighted [year] [name of copyright owner]"
014: *
015: * The Original Software is NetBeans. The Initial Developer of the Original
016: * Software is Sun Microsystems, Inc. Portions Copyright 1997-2007 Sun
017: * Microsystems, Inc. All Rights Reserved.
018: */
019: package org.netbeans.modules.etl.codegen.impl;
020:
021: import java.util.Iterator;
022: import java.util.List;
023:
024: import org.netbeans.modules.etl.codegen.ETLScriptBuilderModel;
025: import org.netbeans.modules.etl.codegen.ETLStrategyBuilderContext;
026: import org.netbeans.modules.etl.codegen.PatternFinder;
027: import org.netbeans.modules.etl.utils.MessageManager;
028: import org.netbeans.modules.sql.framework.codegen.DB;
029: import org.netbeans.modules.sql.framework.codegen.DBFactory;
030: import org.netbeans.modules.sql.framework.codegen.StatementContext;
031: import org.netbeans.modules.sql.framework.codegen.Statements;
032: import org.netbeans.modules.sql.framework.model.SQLConstants;
033: import org.netbeans.modules.sql.framework.model.SourceTable;
034: import org.netbeans.modules.sql.framework.model.TargetTable;
035:
036: import com.sun.etl.engine.ETLEngine;
037: import com.sun.etl.engine.ETLTask;
038: import com.sun.etl.engine.ETLTaskNode;
039: import com.sun.sql.framework.exception.BaseException;
040: import com.sun.sql.framework.jdbc.SQLPart;
041: import com.sun.sql.framework.utils.AttributeMap;
042: import net.java.hulp.i18n.Logger;
043: import org.netbeans.modules.etl.logger.Localizer;
044: import org.netbeans.modules.etl.logger.LogUtil;
045: import org.netbeans.modules.sql.framework.model.DBConnectionDefinition;
046:
047: /**
048: * @author Girish Patil
049: * @version $Revision$
050: */
051: public class StagingStrategyBuilder extends BaseETLStrategyBuilder {
052:
053: private static final String LOG_CATEGORY = StagingStrategyBuilder.class
054: .getName();
055: private static final String SQL_INDENT = "";
056: private boolean forceStaging = false;
057: private static transient final Logger mLogger = LogUtil
058: .getLogger(StagingStrategyBuilder.class.getName());
059: private static transient final Localizer mLoc = Localizer.get();
060:
061: public StagingStrategyBuilder(ETLScriptBuilderModel model)
062: throws BaseException {
063: super (model);
064: }
065:
066: /**
067: * Before calling apply appropriate applyConnections
068: */
069: public void generateScriptForTable(ETLStrategyBuilderContext context)
070: throws BaseException {
071: mLogger
072: .infoNoloc(mLoc.t(
073: "PRSR010: In generateScriptForTable:{0}",
074: LOG_CATEGORY));
075: super .checkTargetConnectionDefinition(context);
076:
077: populateInitTask(context.getInitTask(), context
078: .getGlobalCleanUpTask(), context.getTargetTable());
079: MessageManager msgMgr = MessageManager
080: .getManager(ETLTaskNode.class);
081: TargetTable targetTable = context.getTargetTable();
082: int statementType = targetTable.getStatementType();
083: DB targetDB = getDBFor(this .builderModel
084: .getConnectionDefinition(targetTable));
085: DB statsDB = DBFactory.getInstance().getDatabase(DB.AXIONDB);
086:
087: // For each target table create a transformer task node
088: ETLTaskNode transformerTask = null;
089: String displayName = msgMgr.getString("TEMPLATE_dn", msgMgr
090: .getString("LBL_dn_transformer"), context
091: .getTargetTable().getName());
092:
093: // TODO Need to refactor/redesign interfaces between sql-codegen and etl-codgen framework
094: // such that we will avoid code like "IF ELSE" like below.
095: if ((targetDB.getDBType() == DB.JDBCDB)
096: && (targetTable.getSourceTableList().size() != 0)
097: && ((statementType == SQLConstants.UPDATE_STATEMENT) || (statementType == SQLConstants.INSERT_UPDATE_STATEMENT))) {
098: transformerTask = builderModel.getEngine()
099: .createETLTaskNode(
100: ETLEngine.CORRELATED_QUERY_EXECUTOR);
101: transformerTask.addNextETLTaskNode(ETLTask.SUCCESS, context
102: .getNextTaskOnSuccess().getId());
103: transformerTask.addNextETLTaskNode(ETLTask.EXCEPTION,
104: context.getNextTaskOnException().getId());
105: transformerTask.setDisplayName(displayName);
106: if (context.getDependentTasksForNextTask().length() > 0) {
107: context.getDependentTasksForNextTask().append(",");
108: }
109:
110: context.getDependentTasksForNextTask().append(
111: transformerTask.getId());
112:
113: createCorrelatedUpdateSQLParts(
114: targetTable,
115: transformerTask,
116: getTargetConnName(),
117: targetDB,
118: statsDB,
119: (statementType == SQLConstants.INSERT_UPDATE_STATEMENT));
120: } else {
121: transformerTask = builderModel.getEngine()
122: .createETLTaskNode(ETLEngine.TRANSFORMER);
123: transformerTask.addNextETLTaskNode(ETLTask.SUCCESS, context
124: .getNextTaskOnSuccess().getId());
125: transformerTask.addNextETLTaskNode(ETLTask.EXCEPTION,
126: context.getNextTaskOnException().getId());
127: transformerTask.setDisplayName(displayName);
128: if (context.getDependentTasksForNextTask().length() > 0) {
129: context.getDependentTasksForNextTask().append(",");
130: }
131:
132: context.getDependentTasksForNextTask().append(
133: transformerTask.getId());
134:
135: createTransformerSQLPart(targetTable, true,
136: transformerTask, getTargetConnName(), context
137: .getNextTaskOnException(), targetDB,
138: statsDB);
139: }
140:
141: ETLTaskNode xformPredecessor = null;
142: List sourceTables = context.getTargetTable()
143: .getSourceTableList();
144: String dropSQLStr = "";
145:
146: // Loop Thru the source tables to generate
147: if (sourceTables == null || sourceTables.isEmpty()) {
148: // No extraction: link transformer nodes to init wait task.
149: xformPredecessor = context.getPredecessorTask();
150: } else {
151: Iterator srcIter = sourceTables.iterator();
152: // Create a wait task node for each extraction chain, so
153: // that transformation will start only when all the participated tables
154: // has been extracted to the temp table in the workspace database.
155: ETLTaskNode extractorWait = this .builderModel.getEngine()
156: .createETLTaskNode(ETLEngine.WAIT);
157: extractorWait.addNextETLTaskNode(ETLTask.SUCCESS,
158: transformerTask.getId());
159: extractorWait.addNextETLTaskNode(ETLTask.EXCEPTION, context
160: .getNextTaskOnException().getId());
161: String waitDisplayName = msgMgr.getString("TEMPLATE_dn",
162: msgMgr.getString("LBL_dn_extractorwait"), context
163: .getTargetTable().getName());
164: extractorWait.setDisplayName(waitDisplayName);
165:
166: StringBuilder dependentTasks = new StringBuilder();
167: String waitTaskId = extractorWait.getId();
168: xformPredecessor = extractorWait;
169:
170: while (srcIter.hasNext()) {
171: SourceTable sourceTable = (SourceTable) srcIter.next();
172:
173: // Don't create an extractor node for a source table that is in the
174: // same DB as the target table.
175: if (isExtractionRequired(sourceTable, targetTable)) {
176: ETLTaskNode extractorTask = createExtractorNode(
177: sourceTable, targetDB, waitTaskId,
178: waitTaskId, getTargetConnName(),
179: targetTable.getSchema().toUpperCase());
180:
181: // Add staging table to the drop list only if user specifies
182: // that it is deleteable - otherwise preserve the contents of the
183: // staging table.
184: if (sourceTable.isDropStagingTable()) {
185: StatementContext dropContext = new StatementContext();
186: dropContext.setUsingTempTableName(sourceTable,
187: true);
188: dropContext.putClientProperty(
189: StatementContext.IF_EXISTS,
190: Boolean.TRUE);
191: SQLPart dropSQLPartTemp = getTargetStatements()
192: .getDropStatement(sourceTable,
193: dropContext);
194: if (dropSQLStr.length() > 0) {
195: dropSQLStr += SQLPart.STATEMENT_SEPARATOR;
196: }
197: dropSQLStr += dropSQLPartTemp.getSQL();
198: }
199:
200: context.getPredecessorTask().addNextETLTaskNode(
201: ETLTask.SUCCESS, extractorTask.getId());
202: if (dependentTasks.length() > 0) {
203: dependentTasks.append(",");
204: }
205: dependentTasks.append(extractorTask.getId());
206:
207: StatementContext stmtContext = new StatementContext();
208: stmtContext
209: .setUsingFullyQualifiedTablePrefix(false);
210: stmtContext.setUsingUniqueTableName(true);
211: final String statsTableName = statsDB
212: .getUnescapedName(statsDB
213: .getGeneratorFactory().generate(
214: targetTable, stmtContext));
215: extractorTask.setTableName(statsTableName);
216: }
217: } // end extractor Loop
218:
219: // set dependent List for Level1 wait node
220: extractorWait.setDependsOn(dependentTasks.toString());
221: }
222:
223: // Add drop statements for temp tables, if any, to cleanup task.
224: if (dropSQLStr != null && dropSQLStr.trim().length() != 0) {
225: // Ensure we use same connection in dropping staging tables as we used
226: // in creating them.
227: SQLPart dropSQLPart = new SQLPart(dropSQLStr,
228: SQLPart.STMT_DROP, getTargetConnName()); // NOI18N
229: SQLPart etlSQLPart = dropSQLPart;
230: context.getNextTaskOnException().addStatement(etlSQLPart);
231: }
232:
233: // Set dependent list for predecessor to transform nodes
234: xformPredecessor.addNextETLTaskNode(ETLTask.SUCCESS,
235: transformerTask.getId());
236:
237: // Add statements to create execution summary table if it does not exist, and
238: // update the assocaited execution record upon successful execution
239: addCreateIfNotExistsSummaryTableStatement(context.getInitTask());
240: addUpdateExecutionRecordPreparedStatement(context
241: .getStatsUpdateTask(), context.getTargetTable());
242: }
243:
244: public String getScriptToDisplay(ETLStrategyBuilderContext context)
245: throws BaseException {
246: super .checkTargetConnectionDefinition(context);
247: StringBuilder buffer = new StringBuilder();
248: TargetTable targetTable = context.getTargetTable();
249: DB targetDB = getDBFor(context.getModel()
250: .getConnectionDefinition(targetTable));
251:
252: List sourceTables = context.getTargetTable()
253: .getSourceTableList();
254: if (sourceTables != null && !sourceTables.isEmpty()) {
255: Iterator srcIter = sourceTables.iterator();
256: while (srcIter.hasNext()) {
257: SourceTable sourceTable = (SourceTable) srcIter.next();
258: if (isExtractionRequired(sourceTable, targetTable)) {
259: buffer.append(getExtractorSQL(sourceTable,
260: targetDB, context.getTargetTable()));
261: buffer.append("\n");
262: }
263: }
264: }
265:
266: String transformSQL = getTransformerSQL(targetTable, targetDB,
267: sourceTables, true);
268: buffer.append(SQL_INDENT).append(
269: getCommentForTransformer(targetTable)).append("\n");
270: buffer.append(transformSQL);
271:
272: return buffer.toString();
273: }
274:
275: public boolean isForceStaging() {
276: return this .forceStaging;
277: }
278:
279: public void setForceStaging(boolean forceStaging) {
280: this .forceStaging = forceStaging;
281: }
282:
283: @Override
284: protected boolean isExtractionRequired(SourceTable sourceTable,
285: TargetTable targetTable) throws BaseException {
286: if (forceStaging) {
287: return true;
288: } else {
289: return (!PatternFinder.isFromSameDB(sourceTable,
290: targetTable, this .builderModel));
291: }
292: }
293:
294: private ETLTaskNode createExtractorNode(SourceTable srcTable,
295: DB targetDB, String waitTaskId, String cleanupTaskId,
296: String trgtConnName, String trgtSchema)
297: throws BaseException {
298: DBConnectionDefinition srcConDef = this .builderModel
299: .getConnectionDefinition(srcTable);
300: String srcConnName = srcConDef.getName();
301:
302: // reset the Generator for the source table dbType
303: DB sourceDB = getDBFor(srcConDef);
304: final Statements sourceStmts = sourceDB.getStatements();
305: final Statements targetStmts = targetDB.getStatements();
306:
307: // for each source table create a extractor task node
308: ETLTaskNode extractorTask = this .builderModel.getEngine()
309: .createETLTaskNode(ETLEngine.EXTRACTOR);
310: extractorTask.addNextETLTaskNode(ETLTask.SUCCESS, waitTaskId);
311: extractorTask.addNextETLTaskNode(ETLTask.EXCEPTION,
312: cleanupTaskId);
313:
314: MessageManager msgMgr = MessageManager
315: .getManager(ETLTaskNode.class);
316: String displayName = msgMgr.getString("TEMPLATE_dn", msgMgr
317: .getString("LBL_dn_extractor"), srcTable.getName());
318: extractorTask.setDisplayName(displayName);
319:
320: // RFE-102428
321: String stgTableName = srcTable.getStagingTableName();
322: if (stgTableName == null || stgTableName.trim().length() == 0) {
323: // User has not specified the "Staging Table Name", proceed with default logic
324: if (srcTable.isDropStagingTable()) {
325: StatementContext context = new StatementContext();
326: context.setUsingTempTableName(srcTable, true);
327: context.putClientProperty("targetSchema", trgtSchema);
328:
329: SQLPart tableExistsPart = targetStmts
330: .getTableExistsStatement(srcTable, context);
331: tableExistsPart.setConnectionPoolName(trgtConnName);
332: extractorTask.addStatement(tableExistsPart);
333:
334: // Drop if exists statement for temp table
335: context.setUsingTempTableName(srcTable, true);
336: context.putClientProperty("ifExists", Boolean.TRUE);
337:
338: SQLPart dropSQLPart = targetStmts.getDropStatement(
339: srcTable, context);
340: dropSQLPart.setConnectionPoolName(trgtConnName);
341: extractorTask.addStatement(dropSQLPart);
342: }
343:
344: // Create temp table in target database
345: StatementContext context = new StatementContext();
346: context.setUsingTempTableName(srcTable, true);
347: SQLPart createSQLPart = targetStmts.getCreateStatement(
348: srcTable, context);
349: createSQLPart.setConnectionPoolName(trgtConnName);
350: extractorTask.addStatement(createSQLPart);
351: }
352:
353: if (srcTable.isTruncateStagingTable()) {
354: // User has specified the "Staging Table Name" property. Use it and truncate the data.
355: StatementContext trcontext = new StatementContext();
356: trcontext.setUsingTempTableName(srcTable, true);
357: trcontext.putClientProperty("targetSchema", trgtSchema);
358: truncateTableIfExists(srcTable, extractorTask,
359: trgtConnName, targetStmts, trcontext);
360: }
361:
362: // Select extraction set from source
363: StatementContext context = new StatementContext();
364: context.setUsingTempTableName(srcTable, false);
365: this .useUniqueNameIfRequired(srcTable, context);
366: SQLPart selectSQLPart = sourceStmts.getSelectStatement(
367: srcTable, context);
368: selectSQLPart.setConnectionPoolName(srcConnName);
369: extractorTask.addStatement(selectSQLPart);
370:
371: // Create insert prepared statement.
372: context.setUsingTempTableName(srcTable, true);
373: context.setUsingUniqueTableName(srcTable, false);
374: SQLPart insertSQLPart = targetStmts.getPreparedInsertStatement(
375: srcTable, context);
376: insertSQLPart.setConnectionPoolName(trgtConnName);
377: extractorTask.addStatement(insertSQLPart);
378:
379: AttributeMap attrMap = new AttributeMap();
380: attrMap.put("batchSize", srcTable.getBatchSize() + ""); // NOI18N
381: extractorTask.setAttributeMap(attrMap);
382:
383: return extractorTask;
384: }
385:
386: private String getExtractorSQL(SourceTable srcTable, DB targetDB,
387: TargetTable tt) throws BaseException {
388: StringBuilder buffer = new StringBuilder(50);
389:
390: // Get the Generator
391: DBConnectionDefinition srcConDefn = this .builderModel
392: .getConnectionDefinition(srcTable);
393: DB sourceDB = getDBFor(srcConDefn);
394:
395: String modelName = srcTable.getParent().getModelName();
396: String msg = "";
397:
398: // Select statement for source table
399: msg = MSG_MGR.getString("DISPLAY_SELECT", srcTable.getName(),
400: modelName, srcConDefn.getDBType());
401: buffer.append(SQL_INDENT).append(msg).append("\n");
402:
403: StatementContext context = new StatementContext();
404: SQLPart selectSQLPart = sourceDB.getStatements()
405: .getSelectStatement(srcTable, context);
406:
407: buffer.append(selectSQLPart.getSQL());
408: buffer.append("\n\n");
409:
410: // Create statement for temp table
411: msg = MSG_MGR.getString("DISPLAY_CREATE", modelName,
412: this .builderModel.getConnectionDefinition(tt)
413: .getDBType());
414: buffer.append(SQL_INDENT).append(msg).append("\n");
415:
416: context = new StatementContext();
417: context.setUsingTempTableName(srcTable, true);
418: SQLPart createSQLPart = targetDB.getStatements()
419: .getCreateStatement(srcTable, context);
420: buffer.append(createSQLPart.getSQL());
421: buffer.append("\n\n");
422:
423: // insert statement to store extracted rows.
424: buffer.append(SQL_INDENT).append(
425: MSG_MGR.getString("DISPLAY_INSERT_TEMP")).append("\n");
426: SQLPart insertSQLPart = targetDB.getStatements()
427: .getPreparedInsertStatement(srcTable, context);
428: buffer.append(insertSQLPart.getSQL());
429: buffer.append("\n\n");
430:
431: return buffer.toString();
432: }
433: }
|