001: package prefuse.data.io.sql;
002:
003: import java.sql.Connection;
004: import java.sql.ResultSet;
005: import java.sql.ResultSetMetaData;
006: import java.sql.SQLException;
007: import java.sql.Statement;
008: import java.util.logging.Logger;
009:
010: import prefuse.data.Schema;
011: import prefuse.data.Table;
012: import prefuse.data.io.DataIOException;
013: import prefuse.data.util.Index;
014:
015: /**
016: * Sends queries to a relational database and processes the results, storing
017: * the results in prefuse Table instances. This class should not be
018: * instantiated directly. To access a database, the {@link ConnectionFactory}
019: * class should be used to retrieve an appropriate instance of this class.
020: *
021: * @author <a href="http://jheer.org">jeffrey heer</a>
022: */
023: public class DatabaseDataSource {
024:
025: // logger
026: private static final Logger s_logger = Logger
027: .getLogger(DatabaseDataSource.class.getName());
028:
029: protected Connection m_conn;
030: protected Statement m_stmt;
031: protected SQLDataHandler m_handler;
032:
033: // ------------------------------------------------------------------------
034:
035: /**
036: * Creates a new DatabaseDataSource for reading data from a SQL relational
037: * database. This constructor is only package visible and is not intended
038: * for use by application level code. Instead, the
039: * {@link ConnectionFactory} class should be used to create any number of
040: * DatabaseDataSource connections.
041: */
042: DatabaseDataSource(Connection conn, SQLDataHandler handler) {
043: m_conn = conn;
044: m_handler = handler;
045: }
046:
047: // ------------------------------------------------------------------------
048: // Synchronous Data Retrieval
049:
050: /**
051: * Executes a query and returns the results in a Table instance.
052: * @param query the text SQL query to execute
053: * @return a Table of the query results
054: * @throws DataIOException if an error occurs while executing the query
055: * or adding the query results in a prefuse Table.
056: */
057: public synchronized Table getData(String query)
058: throws DataIOException {
059: return getData(null, query, null);
060: }
061:
062: /**
063: * Executes a query and returns the results in a Table instance.
064: * @param query the text SQL query to execute
065: * @param keyField the field to treat as a primary key, ensuring that this
066: * field is indexed in the resulting table instance.
067: * @return a Table of the query results
068: * @throws DataIOException if an error occurs while executing the query
069: * or adding the query results in a prefuse Table.
070: */
071: public synchronized Table getData(String query, String keyField)
072: throws DataIOException {
073: return getData(null, query, keyField);
074: }
075:
076: /**
077: * Executes a query and returns the results in a Table instance.
078: * @param t the Table to store the results in. If this value is null, a
079: * new table will automatically be created.
080: * @param query the text SQL query to execute
081: * @return a Table of the query results
082: * @throws DataIOException if an error occurs while executing the query
083: * or adding the query results in a prefuse Table.
084: */
085: public synchronized Table getData(Table t, String query)
086: throws DataIOException {
087: return getData(t, query, null);
088: }
089:
090: /**
091: * Executes a query and returns the results in a Table instance.
092: * @param t the Table to store the results in. If this value is null, a
093: * new table will automatically be created.
094: * @param query the text SQL query to execute
095: * @param keyField used to determine if the row already exists in the table
096: * @return a Table of the query results
097: * @throws DataIOException if an error occurs while executing the query
098: * or adding the query results in a prefuse Table.
099: */
100: public synchronized Table getData(Table t, String query,
101: String keyField) throws DataIOException {
102: return getData(t, query, keyField, null);
103: }
104:
105: /**
106: * Executes a query and returns the results in a Table instance.
107: * @param t the Table to store the results in. If this value is null, a
108: * new table will automatically be created.
109: * @param query the text SQL query to execute
110: * @param keyField used to determine if the row already exists in the table
111: * @param lock an optional Object to use as a lock when performing data
112: * processing. This lock will be synchronized on whenever the Table is
113: * modified.
114: * @return a Table of the query results
115: * @throws DataIOException if an error occurs while executing the query
116: * or adding the query results in a prefuse Table.
117: */
118: public synchronized Table getData(Table t, String query,
119: String keyField, Object lock) throws DataIOException {
120: ResultSet rs;
121: try {
122: rs = executeQuery(query);
123: } catch (SQLException e) {
124: throw new DataIOException(e);
125: }
126: return process(t, rs, keyField, lock);
127: }
128:
129: // ------------------------------------------------------------------------
130: // Asynchronous Data Retrieval
131:
132: /**
133: * Asynchronously executes a query and stores the results in the given
134: * table instance. All data processing is done in a separate thread of
135: * execution.
136: * @param t the Table in which to store the results
137: * @param query the query to execute
138: */
139: public void loadData(Table t, String query) {
140: loadData(t, query, null, null, null);
141: }
142:
143: /**
144: * Asynchronously executes a query and stores the results in the given
145: * table instance. All data processing is done in a separate thread of
146: * execution.
147: * @param t the Table in which to store the results
148: * @param query the query to execute
149: * @param keyField the primary key field, comparisons on this field are
150: * performed to recognize data records already present in the table.
151: */
152: public void loadData(Table t, String query, String keyField) {
153: loadData(t, query, keyField, null, null);
154: }
155:
156: /**
157: * Asynchronously executes a query and stores the results in the given
158: * table instance. All data processing is done in a separate thread of
159: * execution.
160: * @param t the Table in which to store the results
161: * @param query the query to execute
162: * @param lock an optional Object to use as a lock when performing data
163: * processing. This lock will be synchronized on whenever the Table is
164: * modified.
165: */
166: public void loadData(Table t, String query, Object lock) {
167: loadData(t, query, null, lock, null);
168: }
169:
170: /**
171: * Asynchronously executes a query and stores the results in the given
172: * table instance. All data processing is done in a separate thread of
173: * execution.
174: * @param t the Table in which to store the results
175: * @param query the query to execute
176: * @param keyField the primary key field, comparisons on this field are
177: * performed to recognize data records already present in the table.
178: * @param lock an optional Object to use as a lock when performing data
179: * processing. This lock will be synchronized on whenever the Table is
180: * modified.
181: */
182: public void loadData(Table t, String query, String keyField,
183: Object lock) {
184: loadData(t, query, keyField, lock, null);
185: }
186:
187: /**
188: * Asynchronously executes a query and stores the results in the given
189: * table instance. All data processing is done in a separate thread of
190: * execution.
191: * @param t the Table in which to store the results
192: * @param query the query to execute
193: * @param keyField the primary key field, comparisons on this field are
194: * performed to recognize data records already present in the table.
195: * A null value will result in no key checking.
196: * @param lock an optional Object to use as a lock when performing data
197: * processing. This lock will be synchronized on whenever the Table is
198: * modified. A null value will result in no locking.
199: * @param listener an optional listener that will provide notifications
200: * before the query has been issued and after the query has been
201: * processed. This is most useful for post-processing operations.
202: */
203: public void loadData(Table t, String query, String keyField,
204: Object lock, DataSourceWorker.Listener listener) {
205: DataSourceWorker.Entry e = new DataSourceWorker.Entry(this , t,
206: query, keyField, lock, listener);
207: DataSourceWorker.submit(e);
208: }
209:
210: // ------------------------------------------------------------------------
211:
212: /**
213: * Execute a query and return the corresponding result set
214: * @param query the text SQL query to execute
215: * @return the ResultSet of the query
216: * @throws SQLException if an error occurs issuing the query
217: */
218: private ResultSet executeQuery(String query) throws SQLException {
219: if (m_stmt == null)
220: m_stmt = m_conn.createStatement();
221:
222: // clock in
223: long timein = System.currentTimeMillis();
224:
225: s_logger.info("Issuing query: " + query);
226: ResultSet rset = m_stmt.executeQuery(query);
227:
228: // clock out
229: long time = System.currentTimeMillis() - timein;
230: s_logger.info("External query processing completed: "
231: + (time / 1000) + "." + (time % 1000) + " seconds.");
232:
233: return rset;
234: }
235:
236: // ------------------------------------------------------------------------
237:
238: /**
239: * Process the results of a SQL query, putting retrieved data into a
240: * Table instance. If a null table is provided, a new table with the
241: * appropriate schema will be created.
242: * @param t the Table to store results in
243: * @param rset the SQL query result set
244: * @return a Table containing the query results
245: */
246: protected Table process(Table t, ResultSet rset, String key,
247: Object lock) throws DataIOException {
248: // clock in
249: int count = 0;
250: long timein = System.currentTimeMillis();
251:
252: try {
253: ResultSetMetaData metadata = rset.getMetaData();
254: int ncols = metadata.getColumnCount();
255:
256: // create a new table if necessary
257: if (t == null) {
258: t = getSchema(metadata, m_handler).instantiate();
259: if (key != null) {
260: try {
261: t.index(key);
262: s_logger.info("Indexed field: " + key);
263: } catch (Exception e) {
264: s_logger
265: .warning("Error indexing field: " + key);
266: }
267: }
268: }
269:
270: // set the lock, lock on the table itself if nothing else provided
271: lock = (lock == null ? t : lock);
272:
273: // process the returned rows
274: while (rset.next()) {
275: synchronized (lock) {
276: // determine the table row index to use
277: int row = getExistingRow(t, rset, key);
278: if (row < 0) {
279: row = t.addRow();
280: }
281:
282: //process each value in the current row
283: for (int i = 1; i <= ncols; ++i) {
284: m_handler.process(t, row, rset, i);
285: }
286: }
287:
288: // increment row count
289: ++count;
290: }
291: } catch (SQLException e) {
292: throw new DataIOException(e);
293: }
294:
295: // clock out
296: long time = System.currentTimeMillis() - timein;
297: s_logger.info("Internal query processing completed: " + count
298: + " rows, " + (time / 1000) + "." + (time % 1000)
299: + " seconds.");
300:
301: return t;
302: }
303:
304: /**
305: * See if a retrieved database row is already represented in the given
306: * Table.
307: * @param t the prefuse Table to check for an existing row
308: * @param rset the ResultSet, set to a particular row, which may or
309: * may not have a matching row in the prefuse Table
310: * @param keyField the key field to look up to check for an existing row
311: * @return the index of the existing row, or -1 if no match is found
312: * @throws SQLException
313: */
314: protected int getExistingRow(Table t, ResultSet rset,
315: String keyField) throws SQLException {
316: // check if we have a keyField, bail if not
317: if (keyField == null)
318: return -1;
319:
320: // retrieve the column data type, bail if column is not found
321: Class type = t.getColumnType(keyField);
322: if (type == null)
323: return -1;
324:
325: // get the index and perform the lookup
326: Index index = t.index(keyField);
327: if (type == int.class) {
328: return index.get(rset.getInt(keyField));
329: } else if (type == long.class) {
330: return index.get(rset.getLong(keyField));
331: } else if (type == float.class) {
332: return index.get(rset.getFloat(keyField));
333: } else if (type == double.class) {
334: return index.get(rset.getDouble(keyField));
335: } else if (!type.isPrimitive()) {
336: return index.get(rset.getObject(keyField));
337: } else {
338: return -1;
339: }
340: }
341:
342: /**
343: * Given the metadata for a SQL result set and a data value handler for that
344: * result set, returns a corresponding schema for a prefuse table.
345: * @param metadata the SQL result set metadata
346: * @param handler the data value handler
347: * @return the schema determined by the metadata and handler
348: * @throws SQLException if an error occurs accessing the metadata
349: */
350: public Schema getSchema(ResultSetMetaData metadata,
351: SQLDataHandler handler) throws SQLException {
352: int ncols = metadata.getColumnCount();
353: Schema schema = new Schema(ncols);
354:
355: // determine the table schema
356: for (int i = 1; i <= ncols; ++i) {
357: String name = metadata.getColumnName(i);
358: int sqlType = metadata.getColumnType(i);
359: Class type = handler.getDataType(name, sqlType);
360: if (type != null)
361: schema.addColumn(name, type);
362: }
363:
364: return schema;
365: }
366:
367: } // end of class DatabaseDataSource
|