0001: /*-------------------------------------------------------------------------
0002: *
0003: * Copyright (c) 2003-2005, PostgreSQL Global Development Group
0004: * Copyright (c) 2004, Open Cloud Limited.
0005: *
0006: * IDENTIFICATION
0007: * $PostgreSQL: pgjdbc/org/postgresql/core/v3/QueryExecutorImpl.java,v 1.35 2007/07/27 10:15:29 jurka Exp $
0008: *
0009: *-------------------------------------------------------------------------
0010: */
0011: package org.postgresql.core.v3;
0012:
0013: import org.postgresql.core.*;
0014:
0015: import java.util.ArrayList;
0016: import java.util.Vector;
0017: import java.util.HashMap;
0018: import java.util.Properties;
0019:
0020: import java.lang.ref.*;
0021:
0022: import java.io.IOException;
0023: import java.sql.*;
0024: import org.postgresql.util.PSQLException;
0025: import org.postgresql.util.PSQLWarning;
0026: import org.postgresql.util.PSQLState;
0027: import org.postgresql.util.ServerErrorMessage;
0028: import org.postgresql.util.GT;
0029:
0030: /**
0031: * QueryExecutor implementation for the V3 protocol.
0032: */
0033: public class QueryExecutorImpl implements QueryExecutor {
0034: public QueryExecutorImpl(ProtocolConnectionImpl protoConnection,
0035: PGStream pgStream, Properties info, Logger logger) {
0036: this .protoConnection = protoConnection;
0037: this .pgStream = pgStream;
0038: this .logger = logger;
0039:
0040: if (info.getProperty("allowEncodingChanges") != null) {
0041: this .allowEncodingChanges = Boolean.valueOf(
0042: info.getProperty("allowEncodingChanges"))
0043: .booleanValue();
0044: } else {
0045: this .allowEncodingChanges = false;
0046: }
0047: }
0048:
0049: //
0050: // Query parsing
0051: //
0052:
0053: public Query createSimpleQuery(String sql) {
0054: return parseQuery(sql, false);
0055: }
0056:
0057: public Query createParameterizedQuery(String sql) {
0058: return parseQuery(sql, true);
0059: }
0060:
0061: private Query parseQuery(String query, boolean withParameters) {
0062: // Parse query and find parameter placeholders;
0063: // also break the query into separate statements.
0064:
0065: ArrayList statementList = new ArrayList();
0066: ArrayList fragmentList = new ArrayList(15);
0067:
0068: int fragmentStart = 0;
0069: int inParen = 0;
0070:
0071: boolean standardConformingStrings = protoConnection
0072: .getStandardConformingStrings();
0073:
0074: char[] aChars = query.toCharArray();
0075:
0076: for (int i = 0; i < aChars.length; ++i) {
0077: switch (aChars[i]) {
0078: case '\'': // single-quotes
0079: i = Parser.parseSingleQuotes(aChars, i,
0080: standardConformingStrings);
0081: break;
0082:
0083: case '"': // double-quotes
0084: i = Parser.parseDoubleQuotes(aChars, i);
0085: break;
0086:
0087: case '-': // possibly -- style comment
0088: i = Parser.parseLineComment(aChars, i);
0089: break;
0090:
0091: case '/': // possibly /* */ style comment
0092: i = Parser.parseBlockComment(aChars, i);
0093: break;
0094:
0095: case '$': // possibly dollar quote start
0096: i = Parser.parseDollarQuotes(aChars, i);
0097: break;
0098:
0099: case '(':
0100: inParen++;
0101: break;
0102:
0103: case ')':
0104: inParen--;
0105: break;
0106:
0107: case '?':
0108: if (withParameters) {
0109: fragmentList.add(query.substring(fragmentStart, i));
0110: fragmentStart = i + 1;
0111: }
0112: break;
0113:
0114: case ';':
0115: if (inParen == 0) {
0116: fragmentList.add(query.substring(fragmentStart, i));
0117: fragmentStart = i + 1;
0118: if (fragmentList.size() > 1
0119: || ((String) fragmentList.get(0)).trim()
0120: .length() > 0)
0121: statementList
0122: .add(fragmentList
0123: .toArray(new String[fragmentList
0124: .size()]));
0125: fragmentList.clear();
0126: }
0127: break;
0128:
0129: default:
0130: break;
0131: }
0132: }
0133:
0134: fragmentList.add(query.substring(fragmentStart));
0135: if (fragmentList.size() > 1
0136: || ((String) fragmentList.get(0)).trim().length() > 0)
0137: statementList.add(fragmentList
0138: .toArray(new String[fragmentList.size()]));
0139:
0140: if (statementList.isEmpty()) // Empty query.
0141: return EMPTY_QUERY;
0142:
0143: if (statementList.size() == 1) {
0144: // Only one statement.
0145: return new SimpleQuery((String[]) statementList.get(0));
0146: }
0147:
0148: // Multiple statements.
0149: SimpleQuery[] subqueries = new SimpleQuery[statementList.size()];
0150: int[] offsets = new int[statementList.size()];
0151: int offset = 0;
0152: for (int i = 0; i < statementList.size(); ++i) {
0153: String[] fragments = (String[]) statementList.get(i);
0154: offsets[i] = offset;
0155: subqueries[i] = new SimpleQuery(fragments);
0156: offset += fragments.length - 1;
0157: }
0158:
0159: return new CompositeQuery(subqueries, offsets);
0160: }
0161:
0162: //
0163: // Query execution
0164: //
0165:
0166: public synchronized void execute(Query query,
0167: ParameterList parameters, ResultHandler handler,
0168: int maxRows, int fetchSize, int flags) throws SQLException {
0169: if (logger.logDebug()) {
0170: logger.debug("simple execute, handler=" + handler
0171: + ", maxRows=" + maxRows + ", fetchSize="
0172: + fetchSize + ", flags=" + flags);
0173: }
0174:
0175: if (parameters == null)
0176: parameters = SimpleQuery.NO_PARAMETERS;
0177:
0178: boolean describeOnly = (QUERY_DESCRIBE_ONLY & flags) != 0;
0179:
0180: // Check parameters are all set..
0181: if (!describeOnly)
0182: ((V3ParameterList) parameters).checkAllParametersSet();
0183:
0184: try {
0185: try {
0186: handler = sendQueryPreamble(handler, flags);
0187: sendQuery((V3Query) query,
0188: (V3ParameterList) parameters, maxRows,
0189: fetchSize, flags);
0190: sendSync();
0191: processResults(handler, flags);
0192: } catch (PGBindException se) {
0193: // There are three causes of this error, an
0194: // invalid total Bind message length, a
0195: // BinaryStream that cannot provide the amount
0196: // of data claimed by the length arugment, and
0197: // a BinaryStream that throws an Exception
0198: // when reading.
0199: //
0200: // We simply do not send the Execute message
0201: // so we can just continue on as if nothing
0202: // has happened. Perhaps we need to
0203: // introduce an error here to force the
0204: // caller to rollback if there is a
0205: // transaction in progress?
0206: //
0207: sendSync();
0208: processResults(handler, flags);
0209: handler
0210: .handleError(new PSQLException(
0211: GT
0212: .tr("Unable to bind parameter values for statement."),
0213: PSQLState.INVALID_PARAMETER_VALUE, se
0214: .getIOException()));
0215: }
0216: } catch (IOException e) {
0217: protoConnection.close();
0218: handler
0219: .handleError(new PSQLException(
0220: GT
0221: .tr("An I/O error occured while sending to the backend."),
0222: PSQLState.CONNECTION_FAILURE, e));
0223: }
0224:
0225: handler.handleCompletion();
0226: }
0227:
0228: // Deadlock avoidance:
0229: //
0230: // It's possible for the send and receive streams to get "deadlocked" against each other since
0231: // we do not have a separate thread. The scenario is this: we have two streams:
0232: //
0233: // driver -> TCP buffering -> server
0234: // server -> TCP buffering -> driver
0235: //
0236: // The server behaviour is roughly:
0237: // while true:
0238: // read message
0239: // execute message
0240: // write results
0241: //
0242: // If the server -> driver stream has a full buffer, the write will block.
0243: // If the driver is still writing when this happens, and the driver -> server
0244: // stream also fills up, we deadlock: the driver is blocked on write() waiting
0245: // for the server to read some more data, and the server is blocked on write()
0246: // waiting for the driver to read some more data.
0247: //
0248: // To avoid this, we guess at how many queries we can send before the server ->
0249: // driver stream's buffer is full (MAX_BUFFERED_QUERIES). This is the point where
0250: // the server blocks on write and stops reading data. If we reach this point, we
0251: // force a Sync message and read pending data from the server until ReadyForQuery,
0252: // then go back to writing more queries unless we saw an error.
0253: //
0254: // This is not 100% reliable -- it's only done in the batch-query case and only
0255: // at a reasonably high level (per query, not per message), and it's only an estimate
0256: // -- so it might break. To do it correctly in all cases would seem to require a
0257: // separate send or receive thread as we can only do the Sync-and-read-results
0258: // operation at particular points, and also as we don't really know how much data
0259: // the server is sending.
0260:
0261: // Assume 64k server->client buffering and 250 bytes response per query (conservative).
0262: private static final int MAX_BUFFERED_QUERIES = (64000 / 250);
0263:
0264: // Helper handler that tracks error status.
0265: private static class ErrorTrackingResultHandler implements
0266: ResultHandler {
0267: private final ResultHandler delegateHandler;
0268: private boolean sawError = false;
0269:
0270: ErrorTrackingResultHandler(ResultHandler delegateHandler) {
0271: this .delegateHandler = delegateHandler;
0272: }
0273:
0274: public void handleResultRows(Query fromQuery, Field[] fields,
0275: Vector tuples, ResultCursor cursor) {
0276: delegateHandler.handleResultRows(fromQuery, fields, tuples,
0277: cursor);
0278: }
0279:
0280: public void handleCommandStatus(String status, int updateCount,
0281: long insertOID) {
0282: delegateHandler.handleCommandStatus(status, updateCount,
0283: insertOID);
0284: }
0285:
0286: public void handleWarning(SQLWarning warning) {
0287: delegateHandler.handleWarning(warning);
0288: }
0289:
0290: public void handleError(SQLException error) {
0291: sawError = true;
0292: delegateHandler.handleError(error);
0293: }
0294:
0295: public void handleCompletion() throws SQLException {
0296: delegateHandler.handleCompletion();
0297: }
0298:
0299: boolean hasErrors() {
0300: return sawError;
0301: }
0302: }
0303:
0304: public synchronized void execute(Query[] queries,
0305: ParameterList[] parameterLists, ResultHandler handler,
0306: int maxRows, int fetchSize, int flags) throws SQLException {
0307: if (logger.logDebug()) {
0308: logger.debug("batch execute " + queries.length
0309: + " queries, handler=" + handler + ", maxRows="
0310: + maxRows + ", fetchSize=" + fetchSize + ", flags="
0311: + flags);
0312: }
0313:
0314: boolean describeOnly = (QUERY_DESCRIBE_ONLY & flags) != 0;
0315: // Check parameters and resolve OIDs.
0316: if (!describeOnly) {
0317: for (int i = 0; i < parameterLists.length; ++i) {
0318: if (parameterLists[i] != null)
0319: ((V3ParameterList) parameterLists[i])
0320: .checkAllParametersSet();
0321: }
0322: }
0323:
0324: try {
0325: int queryCount = 0;
0326:
0327: handler = sendQueryPreamble(handler, flags);
0328: ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(
0329: handler);
0330:
0331: for (int i = 0; i < queries.length; ++i) {
0332: ++queryCount;
0333: if (queryCount >= MAX_BUFFERED_QUERIES) {
0334: sendSync();
0335: processResults(trackingHandler, flags);
0336:
0337: // If we saw errors, don't send anything more.
0338: if (trackingHandler.hasErrors())
0339: break;
0340:
0341: queryCount = 0;
0342: }
0343:
0344: V3Query query = (V3Query) queries[i];
0345: V3ParameterList parameters = (V3ParameterList) parameterLists[i];
0346: if (parameters == null)
0347: parameters = SimpleQuery.NO_PARAMETERS;
0348: sendQuery(query, parameters, maxRows, fetchSize, flags);
0349: }
0350:
0351: if (!trackingHandler.hasErrors()) {
0352: sendSync();
0353: processResults(handler, flags);
0354: }
0355: } catch (IOException e) {
0356: protoConnection.close();
0357: handler
0358: .handleError(new PSQLException(
0359: GT
0360: .tr("An I/O error occured while sending to the backend."),
0361: PSQLState.CONNECTION_FAILURE, e));
0362: }
0363:
0364: handler.handleCompletion();
0365: }
0366:
0367: private ResultHandler sendQueryPreamble(
0368: final ResultHandler delegateHandler, int flags)
0369: throws IOException {
0370: // First, send CloseStatements for finalized SimpleQueries that had statement names assigned.
0371: processDeadParsedQueries();
0372: processDeadPortals();
0373:
0374: // Send BEGIN on first statement in transaction.
0375: if ((flags & QueryExecutor.QUERY_SUPPRESS_BEGIN) != 0
0376: || protoConnection.getTransactionState() != ProtocolConnection.TRANSACTION_IDLE)
0377: return delegateHandler;
0378:
0379: sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS,
0380: 0, 0, QueryExecutor.QUERY_NO_METADATA);
0381:
0382: // Insert a handler that intercepts the BEGIN.
0383: return new ResultHandler() {
0384: private boolean sawBegin = false;
0385:
0386: public void handleResultRows(Query fromQuery,
0387: Field[] fields, Vector tuples, ResultCursor cursor) {
0388: if (sawBegin)
0389: delegateHandler.handleResultRows(fromQuery, fields,
0390: tuples, cursor);
0391: }
0392:
0393: public void handleCommandStatus(String status,
0394: int updateCount, long insertOID) {
0395: if (!sawBegin) {
0396: sawBegin = true;
0397: if (!status.equals("BEGIN"))
0398: handleError(new PSQLException(
0399: GT
0400: .tr(
0401: "Expected command status BEGIN, got {0}.",
0402: status),
0403: PSQLState.PROTOCOL_VIOLATION));
0404: } else {
0405: delegateHandler.handleCommandStatus(status,
0406: updateCount, insertOID);
0407: }
0408: }
0409:
0410: public void handleWarning(SQLWarning warning) {
0411: delegateHandler.handleWarning(warning);
0412: }
0413:
0414: public void handleError(SQLException error) {
0415: delegateHandler.handleError(error);
0416: }
0417:
0418: public void handleCompletion() throws SQLException {
0419: delegateHandler.handleCompletion();
0420: }
0421: };
0422: }
0423:
0424: //
0425: // Fastpath
0426: //
0427:
0428: public synchronized byte[] fastpathCall(int fnid,
0429: ParameterList parameters, boolean suppressBegin)
0430: throws SQLException {
0431: if (protoConnection.getTransactionState() == ProtocolConnection.TRANSACTION_IDLE
0432: && !suppressBegin) {
0433:
0434: if (logger.logDebug())
0435: logger.debug("Issuing BEGIN before fastpath call.");
0436:
0437: ResultHandler handler = new ResultHandler() {
0438: private boolean sawBegin = false;
0439: private SQLException sqle = null;
0440:
0441: public void handleResultRows(Query fromQuery,
0442: Field[] fields, Vector tuples,
0443: ResultCursor cursor) {
0444: }
0445:
0446: public void handleCommandStatus(String status,
0447: int updateCount, long insertOID) {
0448: if (!sawBegin) {
0449: if (!status.equals("BEGIN"))
0450: handleError(new PSQLException(
0451: GT
0452: .tr(
0453: "Expected command status BEGIN, got {0}.",
0454: status),
0455: PSQLState.PROTOCOL_VIOLATION));
0456: sawBegin = true;
0457: } else {
0458: handleError(new PSQLException(GT.tr(
0459: "Unexpected command status: {0}.",
0460: status), PSQLState.PROTOCOL_VIOLATION));
0461: }
0462: }
0463:
0464: public void handleWarning(SQLWarning warning) {
0465: // we don't want to ignore warnings and it would be tricky
0466: // to chain them back to the connection, so since we don't
0467: // expect to get them in the first place, we just consider
0468: // them errors.
0469: handleError(warning);
0470: }
0471:
0472: public void handleError(SQLException error) {
0473: if (sqle == null) {
0474: sqle = error;
0475: } else {
0476: sqle.setNextException(error);
0477: }
0478: }
0479:
0480: public void handleCompletion() throws SQLException {
0481: if (sqle != null)
0482: throw sqle;
0483: }
0484: };
0485:
0486: try {
0487: sendOneQuery(beginTransactionQuery,
0488: SimpleQuery.NO_PARAMETERS, 0, 0,
0489: QueryExecutor.QUERY_NO_METADATA);
0490: sendSync();
0491: processResults(handler, 0);
0492: } catch (IOException ioe) {
0493: throw new PSQLException(
0494: GT
0495: .tr("An I/O error occured while sending to the backend."),
0496: PSQLState.CONNECTION_FAILURE, ioe);
0497: }
0498: }
0499:
0500: try {
0501: sendFastpathCall(fnid, (SimpleParameterList) parameters);
0502: return receiveFastpathResult();
0503: } catch (IOException ioe) {
0504: protoConnection.close();
0505: throw new PSQLException(
0506: GT
0507: .tr("An I/O error occured while sending to the backend."),
0508: PSQLState.CONNECTION_FAILURE, ioe);
0509: }
0510: }
0511:
0512: public ParameterList createFastpathParameters(int count) {
0513: return new SimpleParameterList(count);
0514: }
0515:
0516: private void sendFastpathCall(int fnid, SimpleParameterList params)
0517: throws SQLException, IOException {
0518: if (logger.logDebug())
0519: logger.debug(" FE=> FunctionCall(" + fnid + ", "
0520: + params.getParameterCount() + " params)");
0521:
0522: //
0523: // Total size = 4 (length)
0524: // + 4 (function OID)
0525: // + 2 (format code count) + N * 2 (format codes)
0526: // + 2 (parameter count) + encodedSize (parameters)
0527: // + 2 (result format)
0528:
0529: int paramCount = params.getParameterCount();
0530: int encodedSize = 0;
0531: for (int i = 1; i <= paramCount; ++i) {
0532: if (params.isNull(i))
0533: encodedSize += 4;
0534: else
0535: encodedSize += 4 + params.getV3Length(i);
0536: }
0537:
0538: pgStream.SendChar('F');
0539: pgStream.SendInteger4(4 + 4 + 2 + 2 * paramCount + 2
0540: + encodedSize + 2);
0541: pgStream.SendInteger4(fnid);
0542: pgStream.SendInteger2(paramCount);
0543: for (int i = 1; i <= paramCount; ++i)
0544: pgStream.SendInteger2(params.isBinary(i) ? 1 : 0);
0545: pgStream.SendInteger2(paramCount);
0546: for (int i = 1; i <= paramCount; i++) {
0547: if (params.isNull(i)) {
0548: pgStream.SendInteger4(-1);
0549: } else {
0550: pgStream.SendInteger4(params.getV3Length(i)); // Parameter size
0551: params.writeV3Value(i, pgStream);
0552: }
0553: }
0554: pgStream.SendInteger2(1); // Binary result format
0555: pgStream.flush();
0556: }
0557:
0558: public synchronized void processNotifies() throws SQLException {
0559: // Asynchronous notifies only arrive when we are not in a transaction
0560: if (protoConnection.getTransactionState() != ProtocolConnection.TRANSACTION_IDLE)
0561: return;
0562:
0563: try {
0564: while (pgStream.hasMessagePending()) {
0565: int c = pgStream.ReceiveChar();
0566: switch (c) {
0567: case 'A': // Asynchronous Notify
0568: receiveAsyncNotify();
0569: break;
0570: case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
0571: throw receiveErrorResponse();
0572: // break;
0573: case 'N': // Notice Response (warnings / info)
0574: SQLWarning warning = receiveNoticeResponse();
0575: protoConnection.addWarning(warning);
0576: break;
0577: default:
0578: throw new PSQLException(GT.tr(
0579: "Unknown Response Type {0}.",
0580: new Character((char) c)),
0581: PSQLState.CONNECTION_FAILURE);
0582: }
0583: }
0584: } catch (IOException ioe) {
0585: throw new PSQLException(
0586: GT
0587: .tr("An I/O error occured while sending to the backend."),
0588: PSQLState.CONNECTION_FAILURE, ioe);
0589: }
0590: }
0591:
0592: private byte[] receiveFastpathResult() throws IOException,
0593: SQLException {
0594: boolean endQuery = false;
0595: SQLException error = null;
0596: byte[] returnValue = null;
0597:
0598: while (!endQuery) {
0599: int c = pgStream.ReceiveChar();
0600: switch (c) {
0601: case 'A': // Asynchronous Notify
0602: receiveAsyncNotify();
0603: break;
0604:
0605: case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
0606: SQLException newError = receiveErrorResponse();
0607: if (error == null)
0608: error = newError;
0609: else
0610: error.setNextException(newError);
0611: // keep processing
0612: break;
0613:
0614: case 'N': // Notice Response (warnings / info)
0615: SQLWarning warning = receiveNoticeResponse();
0616: protoConnection.addWarning(warning);
0617: break;
0618:
0619: case 'Z': // Ready For Query (eventual response to Sync)
0620: receiveRFQ();
0621: endQuery = true;
0622: break;
0623:
0624: case 'V': // FunctionCallResponse
0625: int msgLen = pgStream.ReceiveInteger4();
0626: int valueLen = pgStream.ReceiveInteger4();
0627:
0628: if (logger.logDebug())
0629: logger.debug(" <=BE FunctionCallResponse("
0630: + valueLen + " bytes)");
0631:
0632: if (valueLen != -1) {
0633: byte buf[] = new byte[valueLen];
0634: pgStream.Receive(buf, 0, valueLen);
0635: returnValue = buf;
0636: }
0637:
0638: break;
0639:
0640: default:
0641: throw new PSQLException(GT.tr(
0642: "Unknown Response Type {0}.", new Character(
0643: (char) c)),
0644: PSQLState.CONNECTION_FAILURE);
0645: }
0646:
0647: }
0648:
0649: // did we get an error during this query?
0650: if (error != null)
0651: throw error;
0652:
0653: return returnValue;
0654: }
0655:
0656: /*
0657: * Send a query to the backend.
0658: */
0659: private void sendQuery(V3Query query, V3ParameterList parameters,
0660: int maxRows, int fetchSize, int flags) throws IOException,
0661: SQLException {
0662: // Now the query itself.
0663: SimpleQuery[] subqueries = query.getSubqueries();
0664: SimpleParameterList[] subparams = parameters.getSubparams();
0665:
0666: if (subqueries == null) {
0667: sendOneQuery((SimpleQuery) query,
0668: (SimpleParameterList) parameters, maxRows,
0669: fetchSize, flags);
0670: } else {
0671: for (int i = 0; i < subqueries.length; ++i) {
0672: // In the situation where parameters is already
0673: // NO_PARAMETERS it cannot know the correct
0674: // number of array elements to return in the
0675: // above call to getSubparams(), so it must
0676: // return null which we check for here.
0677: //
0678: SimpleParameterList subparam = SimpleQuery.NO_PARAMETERS;
0679: if (subparams != null) {
0680: subparam = subparams[i];
0681: }
0682: sendOneQuery(subqueries[i], subparam, maxRows,
0683: fetchSize, flags);
0684: }
0685: }
0686: }
0687:
0688: //
0689: // Message sending
0690: //
0691:
0692: private void sendSync() throws IOException {
0693: if (logger.logDebug())
0694: logger.debug(" FE=> Sync");
0695:
0696: pgStream.SendChar('S'); // Sync
0697: pgStream.SendInteger4(4); // Length
0698: pgStream.flush();
0699: }
0700:
0701: private void sendParse(SimpleQuery query,
0702: SimpleParameterList params, boolean oneShot)
0703: throws IOException {
0704: // Already parsed, or we have a Parse pending and the types are right?
0705: int[] typeOIDs = params.getTypeOIDs();
0706: if (query.isPreparedFor(typeOIDs))
0707: return;
0708:
0709: // Clean up any existing statement, as we can't use it.
0710: query.unprepare();
0711: processDeadParsedQueries();
0712:
0713: String statementName = null;
0714: if (!oneShot) {
0715: // Generate a statement name to use.
0716: statementName = "S_" + (nextUniqueID++);
0717:
0718: // And prepare the new statement.
0719: // NB: Must clone the OID array, as it's a direct reference to
0720: // the SimpleParameterList's internal array that might be modified
0721: // under us.
0722: query.setStatementName(statementName);
0723: query.setStatementTypes((int[]) typeOIDs.clone());
0724: }
0725:
0726: byte[] encodedStatementName = query.getEncodedStatementName();
0727: String[] fragments = query.getFragments();
0728:
0729: if (logger.logDebug()) {
0730: StringBuffer sbuf = new StringBuffer(" FE=> Parse(stmt="
0731: + statementName + ",query=\"");
0732: for (int i = 0; i < fragments.length; ++i) {
0733: if (i > 0)
0734: sbuf.append("$" + i);
0735: sbuf.append(fragments[i]);
0736: }
0737: sbuf.append("\",oids={");
0738: for (int i = 1; i <= params.getParameterCount(); ++i) {
0739: if (i != 1)
0740: sbuf.append(",");
0741: sbuf.append("" + params.getTypeOID(i));
0742: }
0743: sbuf.append("})");
0744: logger.debug(sbuf.toString());
0745: }
0746:
0747: //
0748: // Send Parse.
0749: //
0750:
0751: byte[][] parts = new byte[fragments.length * 2 - 1][];
0752: int j = 0;
0753: int encodedSize = 0;
0754:
0755: // Total size = 4 (size field)
0756: // + N + 1 (statement name, zero-terminated)
0757: // + N + 1 (query, zero terminated)
0758: // + 2 (parameter count) + N * 4 (parameter types)
0759: // original query: "frag0 ? frag1 ? frag2"
0760: // fragments: { "frag0", "frag1", "frag2" }
0761: // output: "frag0 $1 frag1 $2 frag2"
0762: for (int i = 0; i < fragments.length; ++i) {
0763: if (i != 0) {
0764: parts[j] = Utils.encodeUTF8("$" + i);
0765: encodedSize += parts[j].length;
0766: ++j;
0767: }
0768:
0769: parts[j] = Utils.encodeUTF8(fragments[i]);
0770: encodedSize += parts[j].length;
0771: ++j;
0772: }
0773:
0774: encodedSize = 4
0775: + (encodedStatementName == null ? 0
0776: : encodedStatementName.length) + 1
0777: + encodedSize + 1 + 2 + 4 * params.getParameterCount();
0778:
0779: pgStream.SendChar('P'); // Parse
0780: pgStream.SendInteger4(encodedSize);
0781: if (encodedStatementName != null)
0782: pgStream.Send(encodedStatementName);
0783: pgStream.SendChar(0); // End of statement name
0784: for (int i = 0; i < parts.length; ++i) { // Query string
0785: pgStream.Send(parts[i]);
0786: }
0787: pgStream.SendChar(0); // End of query string.
0788: pgStream.SendInteger2(params.getParameterCount()); // # of parameter types specified
0789: for (int i = 1; i <= params.getParameterCount(); ++i)
0790: pgStream.SendInteger4(params.getTypeOID(i));
0791:
0792: pendingParseQueue.add(new Object[] { query,
0793: query.getStatementName() });
0794: }
0795:
0796: private void sendBind(SimpleQuery query,
0797: SimpleParameterList params, Portal portal)
0798: throws IOException {
0799: //
0800: // Send Bind.
0801: //
0802:
0803: String statementName = query.getStatementName();
0804: byte[] encodedStatementName = query.getEncodedStatementName();
0805: byte[] encodedPortalName = (portal == null ? null : portal
0806: .getEncodedPortalName());
0807:
0808: if (logger.logDebug()) {
0809: StringBuffer sbuf = new StringBuffer(" FE=> Bind(stmt="
0810: + statementName + ",portal=" + portal);
0811: for (int i = 1; i <= params.getParameterCount(); ++i) {
0812: sbuf.append(",$" + i + "=<" + params.toString(i) + ">");
0813: }
0814: sbuf.append(")");
0815: logger.debug(sbuf.toString());
0816: }
0817:
0818: // Total size = 4 (size field) + N + 1 (destination portal)
0819: // + N + 1 (statement name)
0820: // + 2 (param format code count) + N * 2 (format codes)
0821: // + 2 (param value count) + N (encoded param value size)
0822: // + 2 (result format code count, 0)
0823: long encodedSize = 0;
0824: for (int i = 1; i <= params.getParameterCount(); ++i) {
0825: if (params.isNull(i))
0826: encodedSize += 4;
0827: else
0828: encodedSize += (long) 4 + params.getV3Length(i);
0829: }
0830:
0831: encodedSize = 4
0832: + (encodedPortalName == null ? 0
0833: : encodedPortalName.length)
0834: + 1
0835: + (encodedStatementName == null ? 0
0836: : encodedStatementName.length) + 1 + 2
0837: + params.getParameterCount() * 2 + 2 + encodedSize + 2;
0838:
0839: // backend's MaxAllocSize is the largest message that can
0840: // be received from a client. If we have a bigger value
0841: // from either very large parameters or incorrent length
0842: // descriptions of setXXXStream we do not send the bind
0843: // messsage.
0844: //
0845: if (encodedSize > 0x3fffffff) {
0846: throw new PGBindException(
0847: new IOException(
0848: GT
0849: .tr(
0850: "Bind message length {0} too long. This can be caused by very large or incorrect length specifications on InputStream parameters.",
0851: new Long(encodedSize))));
0852: }
0853:
0854: pgStream.SendChar('B'); // Bind
0855: pgStream.SendInteger4((int) encodedSize); // Message size
0856: if (encodedPortalName != null)
0857: pgStream.Send(encodedPortalName); // Destination portal name.
0858: pgStream.SendChar(0); // End of portal name.
0859: if (encodedStatementName != null)
0860: pgStream.Send(encodedStatementName); // Source statement name.
0861: pgStream.SendChar(0); // End of statement name.
0862:
0863: pgStream.SendInteger2(params.getParameterCount()); // # of parameter format codes
0864: for (int i = 1; i <= params.getParameterCount(); ++i)
0865: pgStream.SendInteger2(params.isBinary(i) ? 1 : 0); // Parameter format code
0866:
0867: pgStream.SendInteger2(params.getParameterCount()); // # of parameter values
0868:
0869: // If an error occurs when reading a stream we have to
0870: // continue pumping out data to match the length we
0871: // said we would. Once we've done that we throw
0872: // this exception. Multiple exceptions can occur and
0873: // it really doesn't matter which one is reported back
0874: // to the caller.
0875: //
0876: PGBindException bindException = null;
0877:
0878: for (int i = 1; i <= params.getParameterCount(); ++i) {
0879: if (params.isNull(i))
0880: pgStream.SendInteger4(-1); // Magic size of -1 means NULL
0881: else {
0882: pgStream.SendInteger4(params.getV3Length(i)); // Parameter size
0883: try {
0884: params.writeV3Value(i, pgStream); // Parameter value
0885: } catch (PGBindException be) {
0886: bindException = be;
0887: }
0888: }
0889: }
0890:
0891: pgStream.SendInteger2(0); // # of result format codes (0)
0892:
0893: pendingBindQueue.add(portal);
0894:
0895: if (bindException != null) {
0896: throw bindException;
0897: }
0898: }
0899:
0900: private void sendDescribePortal(Portal portal) throws IOException {
0901: //
0902: // Send Describe.
0903: //
0904:
0905: if (logger.logDebug()) {
0906: logger.debug(" FE=> Describe(portal=" + portal + ")");
0907: }
0908:
0909: byte[] encodedPortalName = (portal == null ? null : portal
0910: .getEncodedPortalName());
0911:
0912: // Total size = 4 (size field) + 1 (describe type, 'P') + N + 1 (portal name)
0913: int encodedSize = 4 + 1 + (encodedPortalName == null ? 0
0914: : encodedPortalName.length) + 1;
0915:
0916: pgStream.SendChar('D'); // Describe
0917: pgStream.SendInteger4(encodedSize); // message size
0918: pgStream.SendChar('P'); // Describe (Portal)
0919: if (encodedPortalName != null)
0920: pgStream.Send(encodedPortalName); // portal name to close
0921: pgStream.SendChar(0); // end of portal name
0922: }
0923:
0924: private void sendDescribeStatement(SimpleQuery query,
0925: SimpleParameterList params, boolean describeOnly)
0926: throws IOException {
0927: // Send Statement Describe
0928:
0929: if (logger.logDebug()) {
0930: logger.debug(" FE=> Describe(statement="
0931: + query.getStatementName() + ")");
0932: }
0933:
0934: byte[] encodedStatementName = query.getEncodedStatementName();
0935:
0936: // Total size = 4 (size field) + 1 (describe type, 'S') + N + 1 (portal name)
0937: int encodedSize = 4 + 1 + (encodedStatementName == null ? 0
0938: : encodedStatementName.length) + 1;
0939:
0940: pgStream.SendChar('D'); // Describe
0941: pgStream.SendInteger4(encodedSize); // Message size
0942: pgStream.SendChar('S'); // Describe (Statement);
0943: if (encodedStatementName != null)
0944: pgStream.Send(encodedStatementName); // Statement name
0945: pgStream.SendChar(0); // end message
0946:
0947: pendingDescribeStatementQueue.add(new Object[] { query, params,
0948: new Boolean(describeOnly) });
0949: }
0950:
0951: private void sendExecute(Query query, Portal portal, int limit)
0952: throws IOException {
0953: //
0954: // Send Execute.
0955: //
0956:
0957: if (logger.logDebug()) {
0958: logger.debug(" FE=> Execute(portal=" + portal + ",limit="
0959: + limit + ")");
0960: }
0961:
0962: byte[] encodedPortalName = (portal == null ? null : portal
0963: .getEncodedPortalName());
0964: int encodedSize = (encodedPortalName == null ? 0
0965: : encodedPortalName.length);
0966:
0967: // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
0968: pgStream.SendChar('E'); // Execute
0969: pgStream.SendInteger4(4 + 1 + encodedSize + 4); // message size
0970: if (encodedPortalName != null)
0971: pgStream.Send(encodedPortalName); // portal name
0972: pgStream.SendChar(0); // portal name terminator
0973: pgStream.SendInteger4(limit); // row limit
0974:
0975: pendingExecuteQueue.add(new Object[] { query, portal });
0976: }
0977:
0978: private void sendClosePortal(String portalName) throws IOException {
0979: //
0980: // Send Close.
0981: //
0982:
0983: if (logger.logDebug()) {
0984: logger.debug(" FE=> ClosePortal(" + portalName + ")");
0985: }
0986:
0987: byte[] encodedPortalName = (portalName == null ? null : Utils
0988: .encodeUTF8(portalName));
0989: int encodedSize = (encodedPortalName == null ? 0
0990: : encodedPortalName.length);
0991:
0992: // Total size = 4 (size field) + 1 (close type, 'P') + 1 + N (portal name)
0993: pgStream.SendChar('C'); // Close
0994: pgStream.SendInteger4(4 + 1 + 1 + encodedSize); // message size
0995: pgStream.SendChar('P'); // Close (Portal)
0996: if (encodedPortalName != null)
0997: pgStream.Send(encodedPortalName);
0998: pgStream.SendChar(0); // unnamed portal
0999: }
1000:
1001: private void sendCloseStatement(String statementName)
1002: throws IOException {
1003: //
1004: // Send Close.
1005: //
1006:
1007: if (logger.logDebug()) {
1008: logger.debug(" FE=> CloseStatement(" + statementName + ")");
1009: }
1010:
1011: byte[] encodedStatementName = Utils.encodeUTF8(statementName);
1012:
1013: // Total size = 4 (size field) + 1 (close type, 'S') + N + 1 (statement name)
1014: pgStream.SendChar('C'); // Close
1015: pgStream.SendInteger4(4 + 1 + encodedStatementName.length + 1); // message size
1016: pgStream.SendChar('S'); // Close (Statement)
1017: pgStream.Send(encodedStatementName); // statement to close
1018: pgStream.SendChar(0); // statement name terminator
1019: }
1020:
1021: // sendOneQuery sends a single statement via the extended query protocol.
1022: // Per the FE/BE docs this is essentially the same as how a simple query runs
1023: // (except that it generates some extra acknowledgement messages, and we
1024: // can send several queries before doing the Sync)
1025: //
1026: // Parse S_n from "query string with parameter placeholders"; skipped if already done previously or if oneshot
1027: // Bind C_n from S_n plus parameters (or from unnamed statement for oneshot queries)
1028: // Describe C_n; skipped if caller doesn't want metadata
1029: // Execute C_n with maxRows limit; maxRows = 1 if caller doesn't want results
1030: // (above repeats once per call to sendOneQuery)
1031: // Sync (sent by caller)
1032: //
1033: private void sendOneQuery(SimpleQuery query,
1034: SimpleParameterList params, int maxRows, int fetchSize,
1035: int flags) throws IOException {
1036: // nb: if we decide to use a portal (usePortal == true) we must also use a named statement
1037: // (oneShot == false) as otherwise the portal will be closed under us unexpectedly when
1038: // the unnamed statement is next reused.
1039:
1040: boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
1041: boolean noMeta = (flags & QueryExecutor.QUERY_NO_METADATA) != 0;
1042: boolean describeOnly = (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) != 0;
1043: boolean usePortal = (flags & QueryExecutor.QUERY_FORWARD_CURSOR) != 0
1044: && !noResults
1045: && !noMeta
1046: && fetchSize > 0
1047: && !describeOnly;
1048: boolean oneShot = (flags & QueryExecutor.QUERY_ONESHOT) != 0
1049: && !usePortal;
1050: boolean describeStatement = describeOnly
1051: || (params.hasUnresolvedTypes() && !oneShot);
1052:
1053: // Work out how many rows to fetch in this pass.
1054:
1055: int rows;
1056: if (noResults) {
1057: rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum
1058: } else if (!usePortal) {
1059: rows = maxRows; // Not using a portal -- fetchSize is irrelevant
1060: } else if (maxRows != 0 && fetchSize > maxRows) {
1061: rows = maxRows; // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true)
1062: } else {
1063: rows = fetchSize; // maxRows > fetchSize
1064: }
1065:
1066: sendParse(query, params, oneShot);
1067:
1068: if (describeStatement) {
1069: sendDescribeStatement(query, params, describeOnly);
1070: if (describeOnly)
1071: return;
1072: }
1073:
1074: // Construct a new portal if needed.
1075: Portal portal = null;
1076: if (usePortal) {
1077: String portalName = "C_" + (nextUniqueID++);
1078: portal = new Portal(query, portalName);
1079: }
1080:
1081: sendBind(query, params, portal);
1082:
1083: // A statement describe will also output a RowDescription,
1084: // so don't reissue it here if we've already done so.
1085: //
1086: if (!noMeta && !describeStatement)
1087: sendDescribePortal(portal);
1088:
1089: sendExecute(query, portal, rows);
1090: }
1091:
1092: //
1093: // Garbage collection of parsed statements.
1094: //
1095: // When a statement is successfully parsed, registerParsedQuery is called.
1096: // This creates a PhantomReference referring to the "owner" of the statement
1097: // (the originating Query object) and inserts that reference as a key in
1098: // parsedQueryMap. The values of parsedQueryMap are the corresponding allocated
1099: // statement names. The originating Query object also holds a reference to the
1100: // PhantomReference.
1101: //
1102: // When the owning Query object is closed, it enqueues and clears the associated
1103: // PhantomReference.
1104: //
1105: // If the owning Query object becomes unreachable (see java.lang.ref javadoc) before
1106: // being closed, the corresponding PhantomReference is enqueued on
1107: // parsedQueryCleanupQueue. In the Sun JVM, phantom references are only enqueued
1108: // when a GC occurs, so this is not necessarily prompt but should eventually happen.
1109: //
1110: // Periodically (currently, just before query execution), the parsedQueryCleanupQueue
1111: // is polled. For each enqueued PhantomReference we find, we remove the corresponding
1112: // entry from parsedQueryMap, obtaining the name of the underlying statement in the
1113: // process. Then we send a message to the backend to deallocate that statement.
1114: //
1115:
1116: private final HashMap parsedQueryMap = new HashMap();
1117: private final ReferenceQueue parsedQueryCleanupQueue = new ReferenceQueue();
1118:
1119: private void registerParsedQuery(SimpleQuery query,
1120: String statementName) {
1121: if (statementName == null)
1122: return;
1123:
1124: PhantomReference cleanupRef = new PhantomReference(query,
1125: parsedQueryCleanupQueue);
1126: parsedQueryMap.put(cleanupRef, statementName);
1127: query.setCleanupRef(cleanupRef);
1128: }
1129:
1130: private void processDeadParsedQueries() throws IOException {
1131: PhantomReference deadQuery;
1132: while ((deadQuery = (PhantomReference) parsedQueryCleanupQueue
1133: .poll()) != null) {
1134: String statementName = (String) parsedQueryMap
1135: .remove(deadQuery);
1136: sendCloseStatement(statementName);
1137: deadQuery.clear();
1138: }
1139: }
1140:
1141: //
1142: // Essentially the same strategy is used for the cleanup of portals.
1143: // Note that each Portal holds a reference to the corresponding Query
1144: // that generated it, so the Query won't be collected (and the statement
1145: // closed) until all the Portals are, too. This is required by the mechanics
1146: // of the backend protocol: when a statement is closed, all dependent portals
1147: // are also closed.
1148: //
1149:
1150: private final HashMap openPortalMap = new HashMap();
1151: private final ReferenceQueue openPortalCleanupQueue = new ReferenceQueue();
1152:
1153: private void registerOpenPortal(Portal portal) {
1154: if (portal == null)
1155: return; // Using the unnamed portal.
1156:
1157: String portalName = portal.getPortalName();
1158: PhantomReference cleanupRef = new PhantomReference(portal,
1159: openPortalCleanupQueue);
1160: openPortalMap.put(cleanupRef, portalName);
1161: portal.setCleanupRef(cleanupRef);
1162: }
1163:
1164: private void processDeadPortals() throws IOException {
1165: PhantomReference deadPortal;
1166: while ((deadPortal = (PhantomReference) openPortalCleanupQueue
1167: .poll()) != null) {
1168: String portalName = (String) openPortalMap
1169: .remove(deadPortal);
1170: sendClosePortal(portalName);
1171: deadPortal.clear();
1172: }
1173: }
1174:
1175: protected void processResults(ResultHandler handler, int flags)
1176: throws IOException {
1177: boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
1178:
1179: Field[] fields = null;
1180: Vector tuples = null;
1181:
1182: int len;
1183: int c;
1184: boolean endQuery = false;
1185:
1186: // At the end of a command execution we have the CommandComplete
1187: // message to tell us we're done, but with a describeOnly command
1188: // we have no real flag to let us know we're done. We've got to
1189: // look for the next RowDescription or NoData message and return
1190: // from there.
1191: boolean doneAfterRowDescNoData = false;
1192:
1193: int parseIndex = 0;
1194: int describeIndex = 0;
1195: int bindIndex = 0;
1196: int executeIndex = 0;
1197:
1198: while (!endQuery) {
1199: c = pgStream.ReceiveChar();
1200: switch (c) {
1201: case 'A': // Asynchronous Notify
1202: receiveAsyncNotify();
1203: break;
1204:
1205: case '1': // Parse Complete (response to Parse)
1206: pgStream.ReceiveInteger4(); // len, discarded
1207:
1208: Object[] parsedQueryAndStatement = (Object[]) pendingParseQueue
1209: .get(parseIndex++);
1210:
1211: SimpleQuery parsedQuery = (SimpleQuery) parsedQueryAndStatement[0];
1212: String parsedStatementName = (String) parsedQueryAndStatement[1];
1213:
1214: if (logger.logDebug())
1215: logger.debug(" <=BE ParseComplete ["
1216: + parsedStatementName + "]");
1217:
1218: registerParsedQuery(parsedQuery, parsedStatementName);
1219: break;
1220:
1221: case 't': // ParameterDescription
1222: pgStream.ReceiveInteger4(); // len, discarded
1223:
1224: if (logger.logDebug())
1225: logger.debug(" <=BE ParameterDescription");
1226:
1227: {
1228: Object describeData[] = (Object[]) pendingDescribeStatementQueue
1229: .get(describeIndex);
1230: SimpleQuery query = (SimpleQuery) describeData[0];
1231: SimpleParameterList params = (SimpleParameterList) describeData[1];
1232: boolean describeOnly = ((Boolean) describeData[2])
1233: .booleanValue();
1234:
1235: int numParams = pgStream.ReceiveInteger2();
1236: for (int i = 1; i <= numParams; i++) {
1237: int typeOid = pgStream.ReceiveInteger4();
1238: params.setResolvedType(i, typeOid);
1239: }
1240: query.setStatementTypes((int[]) params
1241: .getTypeOIDs().clone());
1242:
1243: if (describeOnly)
1244: doneAfterRowDescNoData = true;
1245: else
1246: describeIndex++;
1247: }
1248: break;
1249:
1250: case '2': // Bind Complete (response to Bind)
1251: pgStream.ReceiveInteger4(); // len, discarded
1252:
1253: Portal boundPortal = (Portal) pendingBindQueue
1254: .get(bindIndex++);
1255: if (logger.logDebug())
1256: logger.debug(" <=BE BindComplete [" + boundPortal
1257: + "]");
1258:
1259: registerOpenPortal(boundPortal);
1260: break;
1261:
1262: case '3': // Close Complete (response to Close)
1263: pgStream.ReceiveInteger4(); // len, discarded
1264: if (logger.logDebug())
1265: logger.debug(" <=BE CloseComplete");
1266: break;
1267:
1268: case 'n': // No Data (response to Describe)
1269: pgStream.ReceiveInteger4(); // len, discarded
1270: if (logger.logDebug())
1271: logger.debug(" <=BE NoData");
1272:
1273: if (doneAfterRowDescNoData) {
1274: Object describeData[] = (Object[]) pendingDescribeStatementQueue
1275: .get(describeIndex++);
1276: Query currentQuery = (Query) describeData[0];
1277:
1278: if (fields != null || tuples != null) { // There was a resultset.
1279: handler.handleResultRows(currentQuery, fields,
1280: tuples, null);
1281: fields = null;
1282: tuples = null;
1283: }
1284: }
1285: break;
1286:
1287: case 's': // Portal Suspended (end of Execute)
1288: // nb: this appears *instead* of CommandStatus.
1289: // Must be a SELECT if we suspended, so don't worry about it.
1290:
1291: pgStream.ReceiveInteger4(); // len, discarded
1292: if (logger.logDebug())
1293: logger.debug(" <=BE PortalSuspended");
1294:
1295: {
1296: Object[] executeData = (Object[]) pendingExecuteQueue
1297: .get(executeIndex++);
1298: Query currentQuery = (Query) executeData[0];
1299: Portal currentPortal = (Portal) executeData[1];
1300: handler.handleResultRows(currentQuery, fields,
1301: tuples, currentPortal);
1302: }
1303:
1304: fields = null;
1305: tuples = null;
1306: break;
1307:
1308: case 'C': // Command Status (end of Execute)
1309: // Handle status.
1310: String status = receiveCommandStatus();
1311:
1312: doneAfterRowDescNoData = false;
1313:
1314: {
1315: Object[] executeData = (Object[]) pendingExecuteQueue
1316: .get(executeIndex++);
1317: Query currentQuery = (Query) executeData[0];
1318: Portal currentPortal = (Portal) executeData[1];
1319:
1320: if (fields != null || tuples != null) { // There was a resultset.
1321: handler.handleResultRows(currentQuery, fields,
1322: tuples, null);
1323: fields = null;
1324: tuples = null;
1325: } else {
1326: interpretCommandStatus(status, handler);
1327: }
1328:
1329: if (currentPortal != null)
1330: currentPortal.close();
1331: }
1332: break;
1333:
1334: case 'D': // Data Transfer (ongoing Execute response)
1335: Object tuple = null;
1336: try {
1337: tuple = pgStream.ReceiveTupleV3();
1338: } catch (OutOfMemoryError oome) {
1339: if (!noResults) {
1340: handler
1341: .handleError(new PSQLException(
1342: GT
1343: .tr("Ran out of memory retrieving query results."),
1344: PSQLState.OUT_OF_MEMORY, oome));
1345: }
1346: }
1347:
1348: if (!noResults) {
1349: if (tuples == null)
1350: tuples = new Vector();
1351: tuples.addElement(tuple);
1352: }
1353:
1354: if (logger.logDebug())
1355: logger.debug(" <=BE DataRow");
1356:
1357: break;
1358:
1359: case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
1360: SQLException error = receiveErrorResponse();
1361: handler.handleError(error);
1362:
1363: // keep processing
1364: break;
1365:
1366: case 'I': // Empty Query (end of Execute)
1367: pgStream.ReceiveInteger4();
1368:
1369: if (logger.logDebug())
1370: logger.debug(" <=BE EmptyQuery");
1371:
1372: {
1373: Object[] executeData = (Object[]) pendingExecuteQueue
1374: .get(executeIndex++);
1375: Query currentQuery = (Query) executeData[0];
1376: Portal currentPortal = (Portal) executeData[1];
1377: handler.handleCommandStatus("EMPTY", 0, 0);
1378: if (currentPortal != null)
1379: currentPortal.close();
1380: }
1381:
1382: break;
1383:
1384: case 'N': // Notice Response
1385: SQLWarning warning = receiveNoticeResponse();
1386: handler.handleWarning(warning);
1387: break;
1388:
1389: case 'S': // Parameter Status
1390: {
1391: int l_len = pgStream.ReceiveInteger4();
1392: String name = pgStream.ReceiveString();
1393: String value = pgStream.ReceiveString();
1394: if (logger.logDebug())
1395: logger.debug(" <=BE ParameterStatus(" + name
1396: + " = " + value + ")");
1397:
1398: if (name.equals("client_encoding")
1399: && !value.equalsIgnoreCase("UNICODE")
1400: && !allowEncodingChanges) {
1401: protoConnection.close(); // we're screwed now; we can't trust any subsequent string.
1402: handler
1403: .handleError(new PSQLException(
1404: GT
1405: .tr(
1406: "The server''s client_encoding parameter was changed to {0}. The JDBC driver requires client_encoding to be UNICODE for correct operation.",
1407: value),
1408: PSQLState.CONNECTION_FAILURE));
1409: endQuery = true;
1410: }
1411:
1412: if (name.equals("DateStyle")
1413: && !value.startsWith("ISO,")) {
1414: protoConnection.close(); // we're screwed now; we can't trust any subsequent date.
1415: handler
1416: .handleError(new PSQLException(
1417: GT
1418: .tr(
1419: "The server''s DateStyle parameter was changed to {0}. The JDBC driver requires DateStyle to begin with ISO for correct operation.",
1420: value),
1421: PSQLState.CONNECTION_FAILURE));
1422: endQuery = true;
1423: }
1424:
1425: if (name.equals("standard_conforming_strings")) {
1426: if (value.equals("on"))
1427: protoConnection
1428: .setStandardConformingStrings(true);
1429: else if (value.equals("off"))
1430: protoConnection
1431: .setStandardConformingStrings(false);
1432: else {
1433: protoConnection.close(); // we're screwed now; we don't know how to escape string literals
1434: handler
1435: .handleError(new PSQLException(
1436: GT
1437: .tr(
1438: "The server''s standard_conforming_strings parameter was reported as {0}. The JDBC driver expected on or off.",
1439: value),
1440: PSQLState.CONNECTION_FAILURE));
1441: endQuery = true;
1442: }
1443: }
1444: }
1445: break;
1446:
1447: case 'T': // Row Description (response to Describe)
1448: fields = receiveFields();
1449: tuples = new Vector();
1450: if (doneAfterRowDescNoData) {
1451: Object describeData[] = (Object[]) pendingDescribeStatementQueue
1452: .get(describeIndex++);
1453: Query currentQuery = (Query) describeData[0];
1454:
1455: if (fields != null || tuples != null) { // There was a resultset.
1456: handler.handleResultRows(currentQuery, fields,
1457: tuples, null);
1458: fields = null;
1459: tuples = null;
1460: }
1461: }
1462: break;
1463:
1464: case 'Z': // Ready For Query (eventual response to Sync)
1465: receiveRFQ();
1466: endQuery = true;
1467:
1468: // Reset the statement name of Parses that failed.
1469: while (parseIndex < pendingParseQueue.size()) {
1470: Object[] failedQueryAndStatement = (Object[]) pendingParseQueue
1471: .get(parseIndex++);
1472: SimpleQuery failedQuery = (SimpleQuery) failedQueryAndStatement[0];
1473: failedQuery.unprepare();
1474: }
1475:
1476: pendingParseQueue.clear(); // No more ParseComplete messages expected.
1477: pendingDescribeStatementQueue.clear(); // No more ParameterDescription messages expected.
1478: pendingBindQueue.clear(); // No more BindComplete messages expected.
1479: pendingExecuteQueue.clear(); // No more query executions expected.
1480: break;
1481:
1482: case 'G': // CopyInResponse
1483: case 'H': // CopyOutResponse
1484: case 'c': // CopyDone
1485: case 'd': // CopyData
1486: {
1487: // COPY FROM STDIN / COPY TO STDOUT, neither of which are currently
1488: // supported.
1489:
1490: // CopyInResponse can only occur in response to an Execute we sent.
1491: // Every Execute we send is followed by either a Bind or a ClosePortal,
1492: // so we don't need to send a CopyFail; the server will fail the copy
1493: // automatically when it sees the next message.
1494:
1495: int l_len = pgStream.ReceiveInteger4();
1496: /* discard */
1497: pgStream.Receive(l_len);
1498:
1499: handler
1500: .handleError(new PSQLException(
1501: GT
1502: .tr("The driver currently does not support COPY operations."),
1503: PSQLState.NOT_IMPLEMENTED));
1504: }
1505: break;
1506:
1507: default:
1508: throw new IOException("Unexpected packet type: " + c);
1509: }
1510:
1511: }
1512: }
1513:
1514: public synchronized void fetch(ResultCursor cursor,
1515: ResultHandler handler, int fetchSize) throws SQLException {
1516: final Portal portal = (Portal) cursor;
1517:
1518: // Insert a ResultHandler that turns bare command statuses into empty datasets
1519: // (if the fetch returns no rows, we see just a CommandStatus..)
1520: final ResultHandler delegateHandler = handler;
1521: handler = new ResultHandler() {
1522: public void handleResultRows(Query fromQuery,
1523: Field[] fields, Vector tuples, ResultCursor cursor) {
1524: delegateHandler.handleResultRows(fromQuery, fields,
1525: tuples, cursor);
1526: }
1527:
1528: public void handleCommandStatus(String status,
1529: int updateCount, long insertOID) {
1530: handleResultRows(portal.getQuery(), null, new Vector(),
1531: null);
1532: }
1533:
1534: public void handleWarning(SQLWarning warning) {
1535: delegateHandler.handleWarning(warning);
1536: }
1537:
1538: public void handleError(SQLException error) {
1539: delegateHandler.handleError(error);
1540: }
1541:
1542: public void handleCompletion() throws SQLException {
1543: delegateHandler.handleCompletion();
1544: }
1545: };
1546:
1547: // Now actually run it.
1548:
1549: try {
1550: processDeadParsedQueries();
1551: processDeadPortals();
1552:
1553: sendExecute(portal.getQuery(), portal, fetchSize);
1554: sendSync();
1555:
1556: processResults(handler, 0);
1557: } catch (IOException e) {
1558: protoConnection.close();
1559: handler
1560: .handleError(new PSQLException(
1561: GT
1562: .tr("An I/O error occured while sending to the backend."),
1563: PSQLState.CONNECTION_FAILURE, e));
1564: }
1565:
1566: handler.handleCompletion();
1567: }
1568:
1569: /*
1570: * Receive the field descriptions from the back end.
1571: */
1572: private Field[] receiveFields() throws IOException {
1573: int l_msgSize = pgStream.ReceiveInteger4();
1574: int size = pgStream.ReceiveInteger2();
1575: Field[] fields = new Field[size];
1576:
1577: if (logger.logDebug())
1578: logger.debug(" <=BE RowDescription(" + size + ")");
1579:
1580: for (int i = 0; i < fields.length; i++) {
1581: String columnLabel = pgStream.ReceiveString();
1582: int tableOid = pgStream.ReceiveInteger4();
1583: short positionInTable = (short) pgStream.ReceiveInteger2();
1584: int typeOid = pgStream.ReceiveInteger4();
1585: int typeLength = pgStream.ReceiveInteger2();
1586: int typeModifier = pgStream.ReceiveInteger4();
1587: int formatType = pgStream.ReceiveInteger2();
1588: fields[i] = new Field(columnLabel, null, /* name not yet determined */
1589: typeOid, typeLength, typeModifier, tableOid,
1590: positionInTable);
1591: fields[i].setFormat(formatType);
1592: }
1593:
1594: return fields;
1595: }
1596:
1597: private void receiveAsyncNotify() throws IOException {
1598: int msglen = pgStream.ReceiveInteger4();
1599: int pid = pgStream.ReceiveInteger4();
1600: String msg = pgStream.ReceiveString();
1601: String param = pgStream.ReceiveString();
1602: protoConnection
1603: .addNotification(new org.postgresql.core.Notification(
1604: msg, pid, param));
1605:
1606: if (logger.logDebug())
1607: logger.debug(" <=BE AsyncNotify(" + pid + "," + msg + ","
1608: + param + ")");
1609: }
1610:
1611: private SQLException receiveErrorResponse() throws IOException {
1612: // it's possible to get more than one error message for a query
1613: // see libpq comments wrt backend closing a connection
1614: // so, append messages to a string buffer and keep processing
1615: // check at the bottom to see if we need to throw an exception
1616:
1617: int elen = pgStream.ReceiveInteger4();
1618: String totalMessage = pgStream.ReceiveString(elen - 4);
1619: ServerErrorMessage errorMsg = new ServerErrorMessage(
1620: totalMessage, logger.getLogLevel());
1621:
1622: if (logger.logDebug())
1623: logger.debug(" <=BE ErrorMessage(" + errorMsg.toString()
1624: + ")");
1625:
1626: return new PSQLException(errorMsg);
1627: }
1628:
1629: private SQLWarning receiveNoticeResponse() throws IOException {
1630: int nlen = pgStream.ReceiveInteger4();
1631: ServerErrorMessage warnMsg = new ServerErrorMessage(pgStream
1632: .ReceiveString(nlen - 4), logger.getLogLevel());
1633:
1634: if (logger.logDebug())
1635: logger.debug(" <=BE NoticeResponse(" + warnMsg.toString()
1636: + ")");
1637:
1638: return new PSQLWarning(warnMsg);
1639: }
1640:
1641: private String receiveCommandStatus() throws IOException {
1642: //TODO: better handle the msg len
1643: int l_len = pgStream.ReceiveInteger4();
1644: //read l_len -5 bytes (-4 for l_len and -1 for trailing \0)
1645: String status = pgStream.ReceiveString(l_len - 5);
1646: //now read and discard the trailing \0
1647: pgStream.Receive(1);
1648:
1649: if (logger.logDebug())
1650: logger.debug(" <=BE CommandStatus(" + status + ")");
1651:
1652: return status;
1653: }
1654:
1655: private void interpretCommandStatus(String status,
1656: ResultHandler handler) {
1657: int update_count = 0;
1658: long insert_oid = 0;
1659:
1660: if (status.startsWith("INSERT") || status.startsWith("UPDATE")
1661: || status.startsWith("DELETE")
1662: || status.startsWith("MOVE")) {
1663: try {
1664: update_count = Integer.parseInt(status
1665: .substring(1 + status.lastIndexOf(' ')));
1666: if (status.startsWith("INSERT"))
1667: insert_oid = Long.parseLong(status.substring(
1668: 1 + status.indexOf(' '), status
1669: .lastIndexOf(' ')));
1670: } catch (NumberFormatException nfe) {
1671: handler
1672: .handleError(new PSQLException(
1673: GT
1674: .tr(
1675: "Unable to interpret the update count in command completion tag: {0}.",
1676: status),
1677: PSQLState.CONNECTION_FAILURE));
1678: return;
1679: }
1680: }
1681:
1682: handler.handleCommandStatus(status, update_count, insert_oid);
1683: }
1684:
1685: private void receiveRFQ() throws IOException {
1686: if (pgStream.ReceiveInteger4() != 5)
1687: throw new IOException(
1688: "unexpected length of ReadyForQuery message");
1689:
1690: char tStatus = (char) pgStream.ReceiveChar();
1691: if (logger.logDebug())
1692: logger.debug(" <=BE ReadyForQuery(" + tStatus + ")");
1693:
1694: // Update connection state.
1695: switch (tStatus) {
1696: case 'I':
1697: protoConnection
1698: .setTransactionState(ProtocolConnection.TRANSACTION_IDLE);
1699: break;
1700: case 'T':
1701: protoConnection
1702: .setTransactionState(ProtocolConnection.TRANSACTION_OPEN);
1703: break;
1704: case 'E':
1705: protoConnection
1706: .setTransactionState(ProtocolConnection.TRANSACTION_FAILED);
1707: break;
1708: default:
1709: throw new IOException(
1710: "unexpected transaction state in ReadyForQuery message: "
1711: + (int) tStatus);
1712: }
1713: }
1714:
1715: private final ArrayList pendingParseQueue = new ArrayList(); // list of SimpleQuery instances
1716: private final ArrayList pendingBindQueue = new ArrayList(); // list of Portal instances
1717: private final ArrayList pendingExecuteQueue = new ArrayList(); // list of {SimpleQuery,Portal} object arrays
1718: private final ArrayList pendingDescribeStatementQueue = new ArrayList(); // list of {SimpleQuery, SimpleParameterList, Boolean} object arrays
1719:
1720: private long nextUniqueID = 1;
1721: private final ProtocolConnectionImpl protoConnection;
1722: private final PGStream pgStream;
1723: private final Logger logger;
1724: private final boolean allowEncodingChanges;
1725:
1726: private final SimpleQuery beginTransactionQuery = new SimpleQuery(
1727: new String[] { "BEGIN" });
1728:
1729: private final static SimpleQuery EMPTY_QUERY = new SimpleQuery(
1730: new String[] { "" });
1731: }
|