Source Code Cross Referenced for QueryExecutorImpl.java in  » Database-JDBC-Connection-Pool » postgresql » org » postgresql » core » v3 » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » postgresql » org.postgresql.core.v3 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*-------------------------------------------------------------------------
0002:         *
0003:         * Copyright (c) 2003-2005, PostgreSQL Global Development Group
0004:         * Copyright (c) 2004, Open Cloud Limited.
0005:         *
0006:         * IDENTIFICATION
0007:         *   $PostgreSQL: pgjdbc/org/postgresql/core/v3/QueryExecutorImpl.java,v 1.35 2007/07/27 10:15:29 jurka Exp $
0008:         *
0009:         *-------------------------------------------------------------------------
0010:         */
0011:        package org.postgresql.core.v3;
0012:
0013:        import org.postgresql.core.*;
0014:
0015:        import java.util.ArrayList;
0016:        import java.util.Vector;
0017:        import java.util.HashMap;
0018:        import java.util.Properties;
0019:
0020:        import java.lang.ref.*;
0021:
0022:        import java.io.IOException;
0023:        import java.sql.*;
0024:        import org.postgresql.util.PSQLException;
0025:        import org.postgresql.util.PSQLWarning;
0026:        import org.postgresql.util.PSQLState;
0027:        import org.postgresql.util.ServerErrorMessage;
0028:        import org.postgresql.util.GT;
0029:
0030:        /**
0031:         * QueryExecutor implementation for the V3 protocol.
0032:         */
0033:        public class QueryExecutorImpl implements  QueryExecutor {
0034:            public QueryExecutorImpl(ProtocolConnectionImpl protoConnection,
0035:                    PGStream pgStream, Properties info, Logger logger) {
0036:                this .protoConnection = protoConnection;
0037:                this .pgStream = pgStream;
0038:                this .logger = logger;
0039:
0040:                if (info.getProperty("allowEncodingChanges") != null) {
0041:                    this .allowEncodingChanges = Boolean.valueOf(
0042:                            info.getProperty("allowEncodingChanges"))
0043:                            .booleanValue();
0044:                } else {
0045:                    this .allowEncodingChanges = false;
0046:                }
0047:            }
0048:
0049:            //
0050:            // Query parsing
0051:            //
0052:
0053:            public Query createSimpleQuery(String sql) {
0054:                return parseQuery(sql, false);
0055:            }
0056:
0057:            public Query createParameterizedQuery(String sql) {
0058:                return parseQuery(sql, true);
0059:            }
0060:
0061:            private Query parseQuery(String query, boolean withParameters) {
0062:                // Parse query and find parameter placeholders;
0063:                // also break the query into separate statements.
0064:
0065:                ArrayList statementList = new ArrayList();
0066:                ArrayList fragmentList = new ArrayList(15);
0067:
0068:                int fragmentStart = 0;
0069:                int inParen = 0;
0070:
0071:                boolean standardConformingStrings = protoConnection
0072:                        .getStandardConformingStrings();
0073:
0074:                char[] aChars = query.toCharArray();
0075:
0076:                for (int i = 0; i < aChars.length; ++i) {
0077:                    switch (aChars[i]) {
0078:                    case '\'': // single-quotes
0079:                        i = Parser.parseSingleQuotes(aChars, i,
0080:                                standardConformingStrings);
0081:                        break;
0082:
0083:                    case '"': // double-quotes
0084:                        i = Parser.parseDoubleQuotes(aChars, i);
0085:                        break;
0086:
0087:                    case '-': // possibly -- style comment
0088:                        i = Parser.parseLineComment(aChars, i);
0089:                        break;
0090:
0091:                    case '/': // possibly /* */ style comment
0092:                        i = Parser.parseBlockComment(aChars, i);
0093:                        break;
0094:
0095:                    case '$': // possibly dollar quote start
0096:                        i = Parser.parseDollarQuotes(aChars, i);
0097:                        break;
0098:
0099:                    case '(':
0100:                        inParen++;
0101:                        break;
0102:
0103:                    case ')':
0104:                        inParen--;
0105:                        break;
0106:
0107:                    case '?':
0108:                        if (withParameters) {
0109:                            fragmentList.add(query.substring(fragmentStart, i));
0110:                            fragmentStart = i + 1;
0111:                        }
0112:                        break;
0113:
0114:                    case ';':
0115:                        if (inParen == 0) {
0116:                            fragmentList.add(query.substring(fragmentStart, i));
0117:                            fragmentStart = i + 1;
0118:                            if (fragmentList.size() > 1
0119:                                    || ((String) fragmentList.get(0)).trim()
0120:                                            .length() > 0)
0121:                                statementList
0122:                                        .add(fragmentList
0123:                                                .toArray(new String[fragmentList
0124:                                                        .size()]));
0125:                            fragmentList.clear();
0126:                        }
0127:                        break;
0128:
0129:                    default:
0130:                        break;
0131:                    }
0132:                }
0133:
0134:                fragmentList.add(query.substring(fragmentStart));
0135:                if (fragmentList.size() > 1
0136:                        || ((String) fragmentList.get(0)).trim().length() > 0)
0137:                    statementList.add(fragmentList
0138:                            .toArray(new String[fragmentList.size()]));
0139:
0140:                if (statementList.isEmpty()) // Empty query.
0141:                    return EMPTY_QUERY;
0142:
0143:                if (statementList.size() == 1) {
0144:                    // Only one statement.
0145:                    return new SimpleQuery((String[]) statementList.get(0));
0146:                }
0147:
0148:                // Multiple statements.
0149:                SimpleQuery[] subqueries = new SimpleQuery[statementList.size()];
0150:                int[] offsets = new int[statementList.size()];
0151:                int offset = 0;
0152:                for (int i = 0; i < statementList.size(); ++i) {
0153:                    String[] fragments = (String[]) statementList.get(i);
0154:                    offsets[i] = offset;
0155:                    subqueries[i] = new SimpleQuery(fragments);
0156:                    offset += fragments.length - 1;
0157:                }
0158:
0159:                return new CompositeQuery(subqueries, offsets);
0160:            }
0161:
0162:            //
0163:            // Query execution
0164:            //
0165:
0166:            public synchronized void execute(Query query,
0167:                    ParameterList parameters, ResultHandler handler,
0168:                    int maxRows, int fetchSize, int flags) throws SQLException {
0169:                if (logger.logDebug()) {
0170:                    logger.debug("simple execute, handler=" + handler
0171:                            + ", maxRows=" + maxRows + ", fetchSize="
0172:                            + fetchSize + ", flags=" + flags);
0173:                }
0174:
0175:                if (parameters == null)
0176:                    parameters = SimpleQuery.NO_PARAMETERS;
0177:
0178:                boolean describeOnly = (QUERY_DESCRIBE_ONLY & flags) != 0;
0179:
0180:                // Check parameters are all set..
0181:                if (!describeOnly)
0182:                    ((V3ParameterList) parameters).checkAllParametersSet();
0183:
0184:                try {
0185:                    try {
0186:                        handler = sendQueryPreamble(handler, flags);
0187:                        sendQuery((V3Query) query,
0188:                                (V3ParameterList) parameters, maxRows,
0189:                                fetchSize, flags);
0190:                        sendSync();
0191:                        processResults(handler, flags);
0192:                    } catch (PGBindException se) {
0193:                        // There are three causes of this error, an
0194:                        // invalid total Bind message length, a
0195:                        // BinaryStream that cannot provide the amount
0196:                        // of data claimed by the length arugment, and
0197:                        // a BinaryStream that throws an Exception
0198:                        // when reading.
0199:                        //
0200:                        // We simply do not send the Execute message
0201:                        // so we can just continue on as if nothing
0202:                        // has happened.  Perhaps we need to
0203:                        // introduce an error here to force the
0204:                        // caller to rollback if there is a
0205:                        // transaction in progress?
0206:                        //
0207:                        sendSync();
0208:                        processResults(handler, flags);
0209:                        handler
0210:                                .handleError(new PSQLException(
0211:                                        GT
0212:                                                .tr("Unable to bind parameter values for statement."),
0213:                                        PSQLState.INVALID_PARAMETER_VALUE, se
0214:                                                .getIOException()));
0215:                    }
0216:                } catch (IOException e) {
0217:                    protoConnection.close();
0218:                    handler
0219:                            .handleError(new PSQLException(
0220:                                    GT
0221:                                            .tr("An I/O error occured while sending to the backend."),
0222:                                    PSQLState.CONNECTION_FAILURE, e));
0223:                }
0224:
0225:                handler.handleCompletion();
0226:            }
0227:
0228:            // Deadlock avoidance:
0229:            //
0230:            // It's possible for the send and receive streams to get "deadlocked" against each other since
0231:            // we do not have a separate thread. The scenario is this: we have two streams:
0232:            //
0233:            //   driver -> TCP buffering -> server
0234:            //   server -> TCP buffering -> driver
0235:            //
0236:            // The server behaviour is roughly:
0237:            //  while true:
0238:            //   read message
0239:            //   execute message
0240:            //   write results
0241:            //
0242:            // If the server -> driver stream has a full buffer, the write will block.
0243:            // If the driver is still writing when this happens, and the driver -> server
0244:            // stream also fills up, we deadlock: the driver is blocked on write() waiting
0245:            // for the server to read some more data, and the server is blocked on write()
0246:            // waiting for the driver to read some more data.
0247:            //
0248:            // To avoid this, we guess at how many queries we can send before the server ->
0249:            // driver stream's buffer is full (MAX_BUFFERED_QUERIES). This is the point where
0250:            // the server blocks on write and stops reading data. If we reach this point, we
0251:            // force a Sync message and read pending data from the server until ReadyForQuery,
0252:            // then go back to writing more queries unless we saw an error.
0253:            //
0254:            // This is not 100% reliable -- it's only done in the batch-query case and only
0255:            // at a reasonably high level (per query, not per message), and it's only an estimate
0256:            // -- so it might break. To do it correctly in all cases would seem to require a
0257:            // separate send or receive thread as we can only do the Sync-and-read-results
0258:            // operation at particular points, and also as we don't really know how much data
0259:            // the server is sending.
0260:
0261:            // Assume 64k server->client buffering and 250 bytes response per query (conservative).
0262:            private static final int MAX_BUFFERED_QUERIES = (64000 / 250);
0263:
0264:            // Helper handler that tracks error status.
0265:            private static class ErrorTrackingResultHandler implements 
0266:                    ResultHandler {
0267:                private final ResultHandler delegateHandler;
0268:                private boolean sawError = false;
0269:
0270:                ErrorTrackingResultHandler(ResultHandler delegateHandler) {
0271:                    this .delegateHandler = delegateHandler;
0272:                }
0273:
0274:                public void handleResultRows(Query fromQuery, Field[] fields,
0275:                        Vector tuples, ResultCursor cursor) {
0276:                    delegateHandler.handleResultRows(fromQuery, fields, tuples,
0277:                            cursor);
0278:                }
0279:
0280:                public void handleCommandStatus(String status, int updateCount,
0281:                        long insertOID) {
0282:                    delegateHandler.handleCommandStatus(status, updateCount,
0283:                            insertOID);
0284:                }
0285:
0286:                public void handleWarning(SQLWarning warning) {
0287:                    delegateHandler.handleWarning(warning);
0288:                }
0289:
0290:                public void handleError(SQLException error) {
0291:                    sawError = true;
0292:                    delegateHandler.handleError(error);
0293:                }
0294:
0295:                public void handleCompletion() throws SQLException {
0296:                    delegateHandler.handleCompletion();
0297:                }
0298:
0299:                boolean hasErrors() {
0300:                    return sawError;
0301:                }
0302:            }
0303:
0304:            public synchronized void execute(Query[] queries,
0305:                    ParameterList[] parameterLists, ResultHandler handler,
0306:                    int maxRows, int fetchSize, int flags) throws SQLException {
0307:                if (logger.logDebug()) {
0308:                    logger.debug("batch execute " + queries.length
0309:                            + " queries, handler=" + handler + ", maxRows="
0310:                            + maxRows + ", fetchSize=" + fetchSize + ", flags="
0311:                            + flags);
0312:                }
0313:
0314:                boolean describeOnly = (QUERY_DESCRIBE_ONLY & flags) != 0;
0315:                // Check parameters and resolve OIDs.
0316:                if (!describeOnly) {
0317:                    for (int i = 0; i < parameterLists.length; ++i) {
0318:                        if (parameterLists[i] != null)
0319:                            ((V3ParameterList) parameterLists[i])
0320:                                    .checkAllParametersSet();
0321:                    }
0322:                }
0323:
0324:                try {
0325:                    int queryCount = 0;
0326:
0327:                    handler = sendQueryPreamble(handler, flags);
0328:                    ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(
0329:                            handler);
0330:
0331:                    for (int i = 0; i < queries.length; ++i) {
0332:                        ++queryCount;
0333:                        if (queryCount >= MAX_BUFFERED_QUERIES) {
0334:                            sendSync();
0335:                            processResults(trackingHandler, flags);
0336:
0337:                            // If we saw errors, don't send anything more.
0338:                            if (trackingHandler.hasErrors())
0339:                                break;
0340:
0341:                            queryCount = 0;
0342:                        }
0343:
0344:                        V3Query query = (V3Query) queries[i];
0345:                        V3ParameterList parameters = (V3ParameterList) parameterLists[i];
0346:                        if (parameters == null)
0347:                            parameters = SimpleQuery.NO_PARAMETERS;
0348:                        sendQuery(query, parameters, maxRows, fetchSize, flags);
0349:                    }
0350:
0351:                    if (!trackingHandler.hasErrors()) {
0352:                        sendSync();
0353:                        processResults(handler, flags);
0354:                    }
0355:                } catch (IOException e) {
0356:                    protoConnection.close();
0357:                    handler
0358:                            .handleError(new PSQLException(
0359:                                    GT
0360:                                            .tr("An I/O error occured while sending to the backend."),
0361:                                    PSQLState.CONNECTION_FAILURE, e));
0362:                }
0363:
0364:                handler.handleCompletion();
0365:            }
0366:
0367:            private ResultHandler sendQueryPreamble(
0368:                    final ResultHandler delegateHandler, int flags)
0369:                    throws IOException {
0370:                // First, send CloseStatements for finalized SimpleQueries that had statement names assigned.
0371:                processDeadParsedQueries();
0372:                processDeadPortals();
0373:
0374:                // Send BEGIN on first statement in transaction.
0375:                if ((flags & QueryExecutor.QUERY_SUPPRESS_BEGIN) != 0
0376:                        || protoConnection.getTransactionState() != ProtocolConnection.TRANSACTION_IDLE)
0377:                    return delegateHandler;
0378:
0379:                sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS,
0380:                        0, 0, QueryExecutor.QUERY_NO_METADATA);
0381:
0382:                // Insert a handler that intercepts the BEGIN.
0383:                return new ResultHandler() {
0384:                    private boolean sawBegin = false;
0385:
0386:                    public void handleResultRows(Query fromQuery,
0387:                            Field[] fields, Vector tuples, ResultCursor cursor) {
0388:                        if (sawBegin)
0389:                            delegateHandler.handleResultRows(fromQuery, fields,
0390:                                    tuples, cursor);
0391:                    }
0392:
0393:                    public void handleCommandStatus(String status,
0394:                            int updateCount, long insertOID) {
0395:                        if (!sawBegin) {
0396:                            sawBegin = true;
0397:                            if (!status.equals("BEGIN"))
0398:                                handleError(new PSQLException(
0399:                                        GT
0400:                                                .tr(
0401:                                                        "Expected command status BEGIN, got {0}.",
0402:                                                        status),
0403:                                        PSQLState.PROTOCOL_VIOLATION));
0404:                        } else {
0405:                            delegateHandler.handleCommandStatus(status,
0406:                                    updateCount, insertOID);
0407:                        }
0408:                    }
0409:
0410:                    public void handleWarning(SQLWarning warning) {
0411:                        delegateHandler.handleWarning(warning);
0412:                    }
0413:
0414:                    public void handleError(SQLException error) {
0415:                        delegateHandler.handleError(error);
0416:                    }
0417:
0418:                    public void handleCompletion() throws SQLException {
0419:                        delegateHandler.handleCompletion();
0420:                    }
0421:                };
0422:            }
0423:
0424:            //
0425:            // Fastpath
0426:            //
0427:
0428:            public synchronized byte[] fastpathCall(int fnid,
0429:                    ParameterList parameters, boolean suppressBegin)
0430:                    throws SQLException {
0431:                if (protoConnection.getTransactionState() == ProtocolConnection.TRANSACTION_IDLE
0432:                        && !suppressBegin) {
0433:
0434:                    if (logger.logDebug())
0435:                        logger.debug("Issuing BEGIN before fastpath call.");
0436:
0437:                    ResultHandler handler = new ResultHandler() {
0438:                        private boolean sawBegin = false;
0439:                        private SQLException sqle = null;
0440:
0441:                        public void handleResultRows(Query fromQuery,
0442:                                Field[] fields, Vector tuples,
0443:                                ResultCursor cursor) {
0444:                        }
0445:
0446:                        public void handleCommandStatus(String status,
0447:                                int updateCount, long insertOID) {
0448:                            if (!sawBegin) {
0449:                                if (!status.equals("BEGIN"))
0450:                                    handleError(new PSQLException(
0451:                                            GT
0452:                                                    .tr(
0453:                                                            "Expected command status BEGIN, got {0}.",
0454:                                                            status),
0455:                                            PSQLState.PROTOCOL_VIOLATION));
0456:                                sawBegin = true;
0457:                            } else {
0458:                                handleError(new PSQLException(GT.tr(
0459:                                        "Unexpected command status: {0}.",
0460:                                        status), PSQLState.PROTOCOL_VIOLATION));
0461:                            }
0462:                        }
0463:
0464:                        public void handleWarning(SQLWarning warning) {
0465:                            // we don't want to ignore warnings and it would be tricky
0466:                            // to chain them back to the connection, so since we don't
0467:                            // expect to get them in the first place, we just consider
0468:                            // them errors.
0469:                            handleError(warning);
0470:                        }
0471:
0472:                        public void handleError(SQLException error) {
0473:                            if (sqle == null) {
0474:                                sqle = error;
0475:                            } else {
0476:                                sqle.setNextException(error);
0477:                            }
0478:                        }
0479:
0480:                        public void handleCompletion() throws SQLException {
0481:                            if (sqle != null)
0482:                                throw sqle;
0483:                        }
0484:                    };
0485:
0486:                    try {
0487:                        sendOneQuery(beginTransactionQuery,
0488:                                SimpleQuery.NO_PARAMETERS, 0, 0,
0489:                                QueryExecutor.QUERY_NO_METADATA);
0490:                        sendSync();
0491:                        processResults(handler, 0);
0492:                    } catch (IOException ioe) {
0493:                        throw new PSQLException(
0494:                                GT
0495:                                        .tr("An I/O error occured while sending to the backend."),
0496:                                PSQLState.CONNECTION_FAILURE, ioe);
0497:                    }
0498:                }
0499:
0500:                try {
0501:                    sendFastpathCall(fnid, (SimpleParameterList) parameters);
0502:                    return receiveFastpathResult();
0503:                } catch (IOException ioe) {
0504:                    protoConnection.close();
0505:                    throw new PSQLException(
0506:                            GT
0507:                                    .tr("An I/O error occured while sending to the backend."),
0508:                            PSQLState.CONNECTION_FAILURE, ioe);
0509:                }
0510:            }
0511:
0512:            public ParameterList createFastpathParameters(int count) {
0513:                return new SimpleParameterList(count);
0514:            }
0515:
0516:            private void sendFastpathCall(int fnid, SimpleParameterList params)
0517:                    throws SQLException, IOException {
0518:                if (logger.logDebug())
0519:                    logger.debug(" FE=> FunctionCall(" + fnid + ", "
0520:                            + params.getParameterCount() + " params)");
0521:
0522:                //
0523:                // Total size = 4 (length)
0524:                //            + 4 (function OID)
0525:                //            + 2 (format code count) + N * 2 (format codes)
0526:                //            + 2 (parameter count) + encodedSize (parameters)
0527:                //            + 2 (result format)
0528:
0529:                int paramCount = params.getParameterCount();
0530:                int encodedSize = 0;
0531:                for (int i = 1; i <= paramCount; ++i) {
0532:                    if (params.isNull(i))
0533:                        encodedSize += 4;
0534:                    else
0535:                        encodedSize += 4 + params.getV3Length(i);
0536:                }
0537:
0538:                pgStream.SendChar('F');
0539:                pgStream.SendInteger4(4 + 4 + 2 + 2 * paramCount + 2
0540:                        + encodedSize + 2);
0541:                pgStream.SendInteger4(fnid);
0542:                pgStream.SendInteger2(paramCount);
0543:                for (int i = 1; i <= paramCount; ++i)
0544:                    pgStream.SendInteger2(params.isBinary(i) ? 1 : 0);
0545:                pgStream.SendInteger2(paramCount);
0546:                for (int i = 1; i <= paramCount; i++) {
0547:                    if (params.isNull(i)) {
0548:                        pgStream.SendInteger4(-1);
0549:                    } else {
0550:                        pgStream.SendInteger4(params.getV3Length(i)); // Parameter size
0551:                        params.writeV3Value(i, pgStream);
0552:                    }
0553:                }
0554:                pgStream.SendInteger2(1); // Binary result format
0555:                pgStream.flush();
0556:            }
0557:
0558:            public synchronized void processNotifies() throws SQLException {
0559:                // Asynchronous notifies only arrive when we are not in a transaction
0560:                if (protoConnection.getTransactionState() != ProtocolConnection.TRANSACTION_IDLE)
0561:                    return;
0562:
0563:                try {
0564:                    while (pgStream.hasMessagePending()) {
0565:                        int c = pgStream.ReceiveChar();
0566:                        switch (c) {
0567:                        case 'A': // Asynchronous Notify
0568:                            receiveAsyncNotify();
0569:                            break;
0570:                        case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
0571:                            throw receiveErrorResponse();
0572:                            // break;
0573:                        case 'N': // Notice Response (warnings / info)
0574:                            SQLWarning warning = receiveNoticeResponse();
0575:                            protoConnection.addWarning(warning);
0576:                            break;
0577:                        default:
0578:                            throw new PSQLException(GT.tr(
0579:                                    "Unknown Response Type {0}.",
0580:                                    new Character((char) c)),
0581:                                    PSQLState.CONNECTION_FAILURE);
0582:                        }
0583:                    }
0584:                } catch (IOException ioe) {
0585:                    throw new PSQLException(
0586:                            GT
0587:                                    .tr("An I/O error occured while sending to the backend."),
0588:                            PSQLState.CONNECTION_FAILURE, ioe);
0589:                }
0590:            }
0591:
0592:            private byte[] receiveFastpathResult() throws IOException,
0593:                    SQLException {
0594:                boolean endQuery = false;
0595:                SQLException error = null;
0596:                byte[] returnValue = null;
0597:
0598:                while (!endQuery) {
0599:                    int c = pgStream.ReceiveChar();
0600:                    switch (c) {
0601:                    case 'A': // Asynchronous Notify
0602:                        receiveAsyncNotify();
0603:                        break;
0604:
0605:                    case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
0606:                        SQLException newError = receiveErrorResponse();
0607:                        if (error == null)
0608:                            error = newError;
0609:                        else
0610:                            error.setNextException(newError);
0611:                        // keep processing
0612:                        break;
0613:
0614:                    case 'N': // Notice Response (warnings / info)
0615:                        SQLWarning warning = receiveNoticeResponse();
0616:                        protoConnection.addWarning(warning);
0617:                        break;
0618:
0619:                    case 'Z': // Ready For Query (eventual response to Sync)
0620:                        receiveRFQ();
0621:                        endQuery = true;
0622:                        break;
0623:
0624:                    case 'V': // FunctionCallResponse
0625:                        int msgLen = pgStream.ReceiveInteger4();
0626:                        int valueLen = pgStream.ReceiveInteger4();
0627:
0628:                        if (logger.logDebug())
0629:                            logger.debug(" <=BE FunctionCallResponse("
0630:                                    + valueLen + " bytes)");
0631:
0632:                        if (valueLen != -1) {
0633:                            byte buf[] = new byte[valueLen];
0634:                            pgStream.Receive(buf, 0, valueLen);
0635:                            returnValue = buf;
0636:                        }
0637:
0638:                        break;
0639:
0640:                    default:
0641:                        throw new PSQLException(GT.tr(
0642:                                "Unknown Response Type {0}.", new Character(
0643:                                        (char) c)),
0644:                                PSQLState.CONNECTION_FAILURE);
0645:                    }
0646:
0647:                }
0648:
0649:                // did we get an error during this query?
0650:                if (error != null)
0651:                    throw error;
0652:
0653:                return returnValue;
0654:            }
0655:
0656:            /*
0657:             * Send a query to the backend.
0658:             */
0659:            private void sendQuery(V3Query query, V3ParameterList parameters,
0660:                    int maxRows, int fetchSize, int flags) throws IOException,
0661:                    SQLException {
0662:                // Now the query itself.
0663:                SimpleQuery[] subqueries = query.getSubqueries();
0664:                SimpleParameterList[] subparams = parameters.getSubparams();
0665:
0666:                if (subqueries == null) {
0667:                    sendOneQuery((SimpleQuery) query,
0668:                            (SimpleParameterList) parameters, maxRows,
0669:                            fetchSize, flags);
0670:                } else {
0671:                    for (int i = 0; i < subqueries.length; ++i) {
0672:                        // In the situation where parameters is already
0673:                        // NO_PARAMETERS it cannot know the correct
0674:                        // number of array elements to return in the
0675:                        // above call to getSubparams(), so it must
0676:                        // return null which we check for here.
0677:                        //
0678:                        SimpleParameterList subparam = SimpleQuery.NO_PARAMETERS;
0679:                        if (subparams != null) {
0680:                            subparam = subparams[i];
0681:                        }
0682:                        sendOneQuery(subqueries[i], subparam, maxRows,
0683:                                fetchSize, flags);
0684:                    }
0685:                }
0686:            }
0687:
0688:            //
0689:            // Message sending
0690:            //
0691:
0692:            private void sendSync() throws IOException {
0693:                if (logger.logDebug())
0694:                    logger.debug(" FE=> Sync");
0695:
0696:                pgStream.SendChar('S'); // Sync
0697:                pgStream.SendInteger4(4); // Length
0698:                pgStream.flush();
0699:            }
0700:
0701:            private void sendParse(SimpleQuery query,
0702:                    SimpleParameterList params, boolean oneShot)
0703:                    throws IOException {
0704:                // Already parsed, or we have a Parse pending and the types are right?
0705:                int[] typeOIDs = params.getTypeOIDs();
0706:                if (query.isPreparedFor(typeOIDs))
0707:                    return;
0708:
0709:                // Clean up any existing statement, as we can't use it.
0710:                query.unprepare();
0711:                processDeadParsedQueries();
0712:
0713:                String statementName = null;
0714:                if (!oneShot) {
0715:                    // Generate a statement name to use.
0716:                    statementName = "S_" + (nextUniqueID++);
0717:
0718:                    // And prepare the new statement.
0719:                    // NB: Must clone the OID array, as it's a direct reference to
0720:                    // the SimpleParameterList's internal array that might be modified
0721:                    // under us.
0722:                    query.setStatementName(statementName);
0723:                    query.setStatementTypes((int[]) typeOIDs.clone());
0724:                }
0725:
0726:                byte[] encodedStatementName = query.getEncodedStatementName();
0727:                String[] fragments = query.getFragments();
0728:
0729:                if (logger.logDebug()) {
0730:                    StringBuffer sbuf = new StringBuffer(" FE=> Parse(stmt="
0731:                            + statementName + ",query=\"");
0732:                    for (int i = 0; i < fragments.length; ++i) {
0733:                        if (i > 0)
0734:                            sbuf.append("$" + i);
0735:                        sbuf.append(fragments[i]);
0736:                    }
0737:                    sbuf.append("\",oids={");
0738:                    for (int i = 1; i <= params.getParameterCount(); ++i) {
0739:                        if (i != 1)
0740:                            sbuf.append(",");
0741:                        sbuf.append("" + params.getTypeOID(i));
0742:                    }
0743:                    sbuf.append("})");
0744:                    logger.debug(sbuf.toString());
0745:                }
0746:
0747:                //
0748:                // Send Parse.
0749:                //
0750:
0751:                byte[][] parts = new byte[fragments.length * 2 - 1][];
0752:                int j = 0;
0753:                int encodedSize = 0;
0754:
0755:                // Total size = 4 (size field)
0756:                //            + N + 1 (statement name, zero-terminated)
0757:                //            + N + 1 (query, zero terminated)
0758:                //            + 2 (parameter count) + N * 4 (parameter types)
0759:                // original query: "frag0 ? frag1 ? frag2"
0760:                // fragments: { "frag0", "frag1", "frag2" }
0761:                // output: "frag0 $1 frag1 $2 frag2"
0762:                for (int i = 0; i < fragments.length; ++i) {
0763:                    if (i != 0) {
0764:                        parts[j] = Utils.encodeUTF8("$" + i);
0765:                        encodedSize += parts[j].length;
0766:                        ++j;
0767:                    }
0768:
0769:                    parts[j] = Utils.encodeUTF8(fragments[i]);
0770:                    encodedSize += parts[j].length;
0771:                    ++j;
0772:                }
0773:
0774:                encodedSize = 4
0775:                        + (encodedStatementName == null ? 0
0776:                                : encodedStatementName.length) + 1
0777:                        + encodedSize + 1 + 2 + 4 * params.getParameterCount();
0778:
0779:                pgStream.SendChar('P'); // Parse
0780:                pgStream.SendInteger4(encodedSize);
0781:                if (encodedStatementName != null)
0782:                    pgStream.Send(encodedStatementName);
0783:                pgStream.SendChar(0); // End of statement name
0784:                for (int i = 0; i < parts.length; ++i) { // Query string
0785:                    pgStream.Send(parts[i]);
0786:                }
0787:                pgStream.SendChar(0); // End of query string.
0788:                pgStream.SendInteger2(params.getParameterCount()); // # of parameter types specified
0789:                for (int i = 1; i <= params.getParameterCount(); ++i)
0790:                    pgStream.SendInteger4(params.getTypeOID(i));
0791:
0792:                pendingParseQueue.add(new Object[] { query,
0793:                        query.getStatementName() });
0794:            }
0795:
0796:            private void sendBind(SimpleQuery query,
0797:                    SimpleParameterList params, Portal portal)
0798:                    throws IOException {
0799:                //
0800:                // Send Bind.
0801:                //
0802:
0803:                String statementName = query.getStatementName();
0804:                byte[] encodedStatementName = query.getEncodedStatementName();
0805:                byte[] encodedPortalName = (portal == null ? null : portal
0806:                        .getEncodedPortalName());
0807:
0808:                if (logger.logDebug()) {
0809:                    StringBuffer sbuf = new StringBuffer(" FE=> Bind(stmt="
0810:                            + statementName + ",portal=" + portal);
0811:                    for (int i = 1; i <= params.getParameterCount(); ++i) {
0812:                        sbuf.append(",$" + i + "=<" + params.toString(i) + ">");
0813:                    }
0814:                    sbuf.append(")");
0815:                    logger.debug(sbuf.toString());
0816:                }
0817:
0818:                // Total size = 4 (size field) + N + 1 (destination portal)
0819:                //            + N + 1 (statement name)
0820:                //            + 2 (param format code count) + N * 2 (format codes)
0821:                //            + 2 (param value count) + N (encoded param value size)
0822:                //            + 2 (result format code count, 0)
0823:                long encodedSize = 0;
0824:                for (int i = 1; i <= params.getParameterCount(); ++i) {
0825:                    if (params.isNull(i))
0826:                        encodedSize += 4;
0827:                    else
0828:                        encodedSize += (long) 4 + params.getV3Length(i);
0829:                }
0830:
0831:                encodedSize = 4
0832:                        + (encodedPortalName == null ? 0
0833:                                : encodedPortalName.length)
0834:                        + 1
0835:                        + (encodedStatementName == null ? 0
0836:                                : encodedStatementName.length) + 1 + 2
0837:                        + params.getParameterCount() * 2 + 2 + encodedSize + 2;
0838:
0839:                // backend's MaxAllocSize is the largest message that can
0840:                // be received from a client.  If we have a bigger value
0841:                // from either very large parameters or incorrent length
0842:                // descriptions of setXXXStream we do not send the bind
0843:                // messsage.
0844:                //
0845:                if (encodedSize > 0x3fffffff) {
0846:                    throw new PGBindException(
0847:                            new IOException(
0848:                                    GT
0849:                                            .tr(
0850:                                                    "Bind message length {0} too long.  This can be caused by very large or incorrect length specifications on InputStream parameters.",
0851:                                                    new Long(encodedSize))));
0852:                }
0853:
0854:                pgStream.SendChar('B'); // Bind
0855:                pgStream.SendInteger4((int) encodedSize); // Message size
0856:                if (encodedPortalName != null)
0857:                    pgStream.Send(encodedPortalName); // Destination portal name.
0858:                pgStream.SendChar(0); // End of portal name.
0859:                if (encodedStatementName != null)
0860:                    pgStream.Send(encodedStatementName); // Source statement name.
0861:                pgStream.SendChar(0); // End of statement name.
0862:
0863:                pgStream.SendInteger2(params.getParameterCount()); // # of parameter format codes
0864:                for (int i = 1; i <= params.getParameterCount(); ++i)
0865:                    pgStream.SendInteger2(params.isBinary(i) ? 1 : 0); // Parameter format code
0866:
0867:                pgStream.SendInteger2(params.getParameterCount()); // # of parameter values
0868:
0869:                // If an error occurs when reading a stream we have to
0870:                // continue pumping out data to match the length we
0871:                // said we would.  Once we've done that we throw
0872:                // this exception.  Multiple exceptions can occur and
0873:                // it really doesn't matter which one is reported back
0874:                // to the caller.
0875:                //
0876:                PGBindException bindException = null;
0877:
0878:                for (int i = 1; i <= params.getParameterCount(); ++i) {
0879:                    if (params.isNull(i))
0880:                        pgStream.SendInteger4(-1); // Magic size of -1 means NULL
0881:                    else {
0882:                        pgStream.SendInteger4(params.getV3Length(i)); // Parameter size
0883:                        try {
0884:                            params.writeV3Value(i, pgStream); // Parameter value
0885:                        } catch (PGBindException be) {
0886:                            bindException = be;
0887:                        }
0888:                    }
0889:                }
0890:
0891:                pgStream.SendInteger2(0); // # of result format codes (0)
0892:
0893:                pendingBindQueue.add(portal);
0894:
0895:                if (bindException != null) {
0896:                    throw bindException;
0897:                }
0898:            }
0899:
0900:            private void sendDescribePortal(Portal portal) throws IOException {
0901:                //
0902:                // Send Describe.
0903:                //
0904:
0905:                if (logger.logDebug()) {
0906:                    logger.debug(" FE=> Describe(portal=" + portal + ")");
0907:                }
0908:
0909:                byte[] encodedPortalName = (portal == null ? null : portal
0910:                        .getEncodedPortalName());
0911:
0912:                // Total size = 4 (size field) + 1 (describe type, 'P') + N + 1 (portal name)
0913:                int encodedSize = 4 + 1 + (encodedPortalName == null ? 0
0914:                        : encodedPortalName.length) + 1;
0915:
0916:                pgStream.SendChar('D'); // Describe
0917:                pgStream.SendInteger4(encodedSize); // message size
0918:                pgStream.SendChar('P'); // Describe (Portal)
0919:                if (encodedPortalName != null)
0920:                    pgStream.Send(encodedPortalName); // portal name to close
0921:                pgStream.SendChar(0); // end of portal name
0922:            }
0923:
0924:            private void sendDescribeStatement(SimpleQuery query,
0925:                    SimpleParameterList params, boolean describeOnly)
0926:                    throws IOException {
0927:                // Send Statement Describe
0928:
0929:                if (logger.logDebug()) {
0930:                    logger.debug(" FE=> Describe(statement="
0931:                            + query.getStatementName() + ")");
0932:                }
0933:
0934:                byte[] encodedStatementName = query.getEncodedStatementName();
0935:
0936:                // Total size = 4 (size field) + 1 (describe type, 'S') + N + 1 (portal name)
0937:                int encodedSize = 4 + 1 + (encodedStatementName == null ? 0
0938:                        : encodedStatementName.length) + 1;
0939:
0940:                pgStream.SendChar('D'); // Describe
0941:                pgStream.SendInteger4(encodedSize); // Message size
0942:                pgStream.SendChar('S'); // Describe (Statement);
0943:                if (encodedStatementName != null)
0944:                    pgStream.Send(encodedStatementName); // Statement name
0945:                pgStream.SendChar(0); // end message
0946:
0947:                pendingDescribeStatementQueue.add(new Object[] { query, params,
0948:                        new Boolean(describeOnly) });
0949:            }
0950:
0951:            private void sendExecute(Query query, Portal portal, int limit)
0952:                    throws IOException {
0953:                //
0954:                // Send Execute.
0955:                //
0956:
0957:                if (logger.logDebug()) {
0958:                    logger.debug(" FE=> Execute(portal=" + portal + ",limit="
0959:                            + limit + ")");
0960:                }
0961:
0962:                byte[] encodedPortalName = (portal == null ? null : portal
0963:                        .getEncodedPortalName());
0964:                int encodedSize = (encodedPortalName == null ? 0
0965:                        : encodedPortalName.length);
0966:
0967:                // Total size = 4 (size field) + 1 + N (source portal) + 4 (max rows)
0968:                pgStream.SendChar('E'); // Execute
0969:                pgStream.SendInteger4(4 + 1 + encodedSize + 4); // message size
0970:                if (encodedPortalName != null)
0971:                    pgStream.Send(encodedPortalName); // portal name
0972:                pgStream.SendChar(0); // portal name terminator
0973:                pgStream.SendInteger4(limit); // row limit
0974:
0975:                pendingExecuteQueue.add(new Object[] { query, portal });
0976:            }
0977:
0978:            private void sendClosePortal(String portalName) throws IOException {
0979:                //
0980:                // Send Close.
0981:                //
0982:
0983:                if (logger.logDebug()) {
0984:                    logger.debug(" FE=> ClosePortal(" + portalName + ")");
0985:                }
0986:
0987:                byte[] encodedPortalName = (portalName == null ? null : Utils
0988:                        .encodeUTF8(portalName));
0989:                int encodedSize = (encodedPortalName == null ? 0
0990:                        : encodedPortalName.length);
0991:
0992:                // Total size = 4 (size field) + 1 (close type, 'P') + 1 + N (portal name)
0993:                pgStream.SendChar('C'); // Close
0994:                pgStream.SendInteger4(4 + 1 + 1 + encodedSize); // message size
0995:                pgStream.SendChar('P'); // Close (Portal)
0996:                if (encodedPortalName != null)
0997:                    pgStream.Send(encodedPortalName);
0998:                pgStream.SendChar(0); // unnamed portal
0999:            }
1000:
1001:            private void sendCloseStatement(String statementName)
1002:                    throws IOException {
1003:                //
1004:                // Send Close.
1005:                //
1006:
1007:                if (logger.logDebug()) {
1008:                    logger.debug(" FE=> CloseStatement(" + statementName + ")");
1009:                }
1010:
1011:                byte[] encodedStatementName = Utils.encodeUTF8(statementName);
1012:
1013:                // Total size = 4 (size field) + 1 (close type, 'S') + N + 1 (statement name)
1014:                pgStream.SendChar('C'); // Close
1015:                pgStream.SendInteger4(4 + 1 + encodedStatementName.length + 1); // message size
1016:                pgStream.SendChar('S'); // Close (Statement)
1017:                pgStream.Send(encodedStatementName); // statement to close
1018:                pgStream.SendChar(0); // statement name terminator
1019:            }
1020:
1021:            // sendOneQuery sends a single statement via the extended query protocol.
1022:            // Per the FE/BE docs this is essentially the same as how a simple query runs
1023:            // (except that it generates some extra acknowledgement messages, and we
1024:            // can send several queries before doing the Sync)
1025:            //
1026:            //   Parse     S_n from "query string with parameter placeholders"; skipped if already done previously or if oneshot
1027:            //   Bind      C_n from S_n plus parameters (or from unnamed statement for oneshot queries)
1028:            //   Describe  C_n; skipped if caller doesn't want metadata
1029:            //   Execute   C_n with maxRows limit; maxRows = 1 if caller doesn't want results
1030:            // (above repeats once per call to sendOneQuery)
1031:            //   Sync      (sent by caller)
1032:            //
1033:            private void sendOneQuery(SimpleQuery query,
1034:                    SimpleParameterList params, int maxRows, int fetchSize,
1035:                    int flags) throws IOException {
1036:                // nb: if we decide to use a portal (usePortal == true) we must also use a named statement
1037:                // (oneShot == false) as otherwise the portal will be closed under us unexpectedly when
1038:                // the unnamed statement is next reused.
1039:
1040:                boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
1041:                boolean noMeta = (flags & QueryExecutor.QUERY_NO_METADATA) != 0;
1042:                boolean describeOnly = (flags & QueryExecutor.QUERY_DESCRIBE_ONLY) != 0;
1043:                boolean usePortal = (flags & QueryExecutor.QUERY_FORWARD_CURSOR) != 0
1044:                        && !noResults
1045:                        && !noMeta
1046:                        && fetchSize > 0
1047:                        && !describeOnly;
1048:                boolean oneShot = (flags & QueryExecutor.QUERY_ONESHOT) != 0
1049:                        && !usePortal;
1050:                boolean describeStatement = describeOnly
1051:                        || (params.hasUnresolvedTypes() && !oneShot);
1052:
1053:                // Work out how many rows to fetch in this pass.
1054:
1055:                int rows;
1056:                if (noResults) {
1057:                    rows = 1; // We're discarding any results anyway, so limit data transfer to a minimum
1058:                } else if (!usePortal) {
1059:                    rows = maxRows; // Not using a portal -- fetchSize is irrelevant
1060:                } else if (maxRows != 0 && fetchSize > maxRows) {
1061:                    rows = maxRows; // fetchSize > maxRows, use maxRows (nb: fetchSize cannot be 0 if usePortal == true)
1062:                } else {
1063:                    rows = fetchSize; // maxRows > fetchSize
1064:                }
1065:
1066:                sendParse(query, params, oneShot);
1067:
1068:                if (describeStatement) {
1069:                    sendDescribeStatement(query, params, describeOnly);
1070:                    if (describeOnly)
1071:                        return;
1072:                }
1073:
1074:                // Construct a new portal if needed.
1075:                Portal portal = null;
1076:                if (usePortal) {
1077:                    String portalName = "C_" + (nextUniqueID++);
1078:                    portal = new Portal(query, portalName);
1079:                }
1080:
1081:                sendBind(query, params, portal);
1082:
1083:                // A statement describe will also output a RowDescription,
1084:                // so don't reissue it here if we've already done so.
1085:                //
1086:                if (!noMeta && !describeStatement)
1087:                    sendDescribePortal(portal);
1088:
1089:                sendExecute(query, portal, rows);
1090:            }
1091:
1092:            //
1093:            // Garbage collection of parsed statements.
1094:            //
1095:            // When a statement is successfully parsed, registerParsedQuery is called.
1096:            // This creates a PhantomReference referring to the "owner" of the statement
1097:            // (the originating Query object) and inserts that reference as a key in
1098:            // parsedQueryMap. The values of parsedQueryMap are the corresponding allocated
1099:            // statement names. The originating Query object also holds a reference to the
1100:            // PhantomReference.
1101:            //
1102:            // When the owning Query object is closed, it enqueues and clears the associated
1103:            // PhantomReference.
1104:            //
1105:            // If the owning Query object becomes unreachable (see java.lang.ref javadoc) before
1106:            // being closed, the corresponding PhantomReference is enqueued on
1107:            // parsedQueryCleanupQueue. In the Sun JVM, phantom references are only enqueued
1108:            // when a GC occurs, so this is not necessarily prompt but should eventually happen.
1109:            //
1110:            // Periodically (currently, just before query execution), the parsedQueryCleanupQueue
1111:            // is polled. For each enqueued PhantomReference we find, we remove the corresponding
1112:            // entry from parsedQueryMap, obtaining the name of the underlying statement in the
1113:            // process. Then we send a message to the backend to deallocate that statement.
1114:            //
1115:
1116:            private final HashMap parsedQueryMap = new HashMap();
1117:            private final ReferenceQueue parsedQueryCleanupQueue = new ReferenceQueue();
1118:
1119:            private void registerParsedQuery(SimpleQuery query,
1120:                    String statementName) {
1121:                if (statementName == null)
1122:                    return;
1123:
1124:                PhantomReference cleanupRef = new PhantomReference(query,
1125:                        parsedQueryCleanupQueue);
1126:                parsedQueryMap.put(cleanupRef, statementName);
1127:                query.setCleanupRef(cleanupRef);
1128:            }
1129:
1130:            private void processDeadParsedQueries() throws IOException {
1131:                PhantomReference deadQuery;
1132:                while ((deadQuery = (PhantomReference) parsedQueryCleanupQueue
1133:                        .poll()) != null) {
1134:                    String statementName = (String) parsedQueryMap
1135:                            .remove(deadQuery);
1136:                    sendCloseStatement(statementName);
1137:                    deadQuery.clear();
1138:                }
1139:            }
1140:
1141:            //
1142:            // Essentially the same strategy is used for the cleanup of portals.
1143:            // Note that each Portal holds a reference to the corresponding Query
1144:            // that generated it, so the Query won't be collected (and the statement
1145:            // closed) until all the Portals are, too. This is required by the mechanics
1146:            // of the backend protocol: when a statement is closed, all dependent portals
1147:            // are also closed.
1148:            //
1149:
1150:            private final HashMap openPortalMap = new HashMap();
1151:            private final ReferenceQueue openPortalCleanupQueue = new ReferenceQueue();
1152:
1153:            private void registerOpenPortal(Portal portal) {
1154:                if (portal == null)
1155:                    return; // Using the unnamed portal.
1156:
1157:                String portalName = portal.getPortalName();
1158:                PhantomReference cleanupRef = new PhantomReference(portal,
1159:                        openPortalCleanupQueue);
1160:                openPortalMap.put(cleanupRef, portalName);
1161:                portal.setCleanupRef(cleanupRef);
1162:            }
1163:
1164:            private void processDeadPortals() throws IOException {
1165:                PhantomReference deadPortal;
1166:                while ((deadPortal = (PhantomReference) openPortalCleanupQueue
1167:                        .poll()) != null) {
1168:                    String portalName = (String) openPortalMap
1169:                            .remove(deadPortal);
1170:                    sendClosePortal(portalName);
1171:                    deadPortal.clear();
1172:                }
1173:            }
1174:
1175:            protected void processResults(ResultHandler handler, int flags)
1176:                    throws IOException {
1177:                boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
1178:
1179:                Field[] fields = null;
1180:                Vector tuples = null;
1181:
1182:                int len;
1183:                int c;
1184:                boolean endQuery = false;
1185:
1186:                // At the end of a command execution we have the CommandComplete
1187:                // message to tell us we're done, but with a describeOnly command
1188:                // we have no real flag to let us know we're done.  We've got to
1189:                // look for the next RowDescription or NoData message and return
1190:                // from there.
1191:                boolean doneAfterRowDescNoData = false;
1192:
1193:                int parseIndex = 0;
1194:                int describeIndex = 0;
1195:                int bindIndex = 0;
1196:                int executeIndex = 0;
1197:
1198:                while (!endQuery) {
1199:                    c = pgStream.ReceiveChar();
1200:                    switch (c) {
1201:                    case 'A': // Asynchronous Notify
1202:                        receiveAsyncNotify();
1203:                        break;
1204:
1205:                    case '1': // Parse Complete (response to Parse)
1206:                        pgStream.ReceiveInteger4(); // len, discarded
1207:
1208:                        Object[] parsedQueryAndStatement = (Object[]) pendingParseQueue
1209:                                .get(parseIndex++);
1210:
1211:                        SimpleQuery parsedQuery = (SimpleQuery) parsedQueryAndStatement[0];
1212:                        String parsedStatementName = (String) parsedQueryAndStatement[1];
1213:
1214:                        if (logger.logDebug())
1215:                            logger.debug(" <=BE ParseComplete ["
1216:                                    + parsedStatementName + "]");
1217:
1218:                        registerParsedQuery(parsedQuery, parsedStatementName);
1219:                        break;
1220:
1221:                    case 't': // ParameterDescription
1222:                        pgStream.ReceiveInteger4(); // len, discarded
1223:
1224:                        if (logger.logDebug())
1225:                            logger.debug(" <=BE ParameterDescription");
1226:
1227:                        {
1228:                            Object describeData[] = (Object[]) pendingDescribeStatementQueue
1229:                                    .get(describeIndex);
1230:                            SimpleQuery query = (SimpleQuery) describeData[0];
1231:                            SimpleParameterList params = (SimpleParameterList) describeData[1];
1232:                            boolean describeOnly = ((Boolean) describeData[2])
1233:                                    .booleanValue();
1234:
1235:                            int numParams = pgStream.ReceiveInteger2();
1236:                            for (int i = 1; i <= numParams; i++) {
1237:                                int typeOid = pgStream.ReceiveInteger4();
1238:                                params.setResolvedType(i, typeOid);
1239:                            }
1240:                            query.setStatementTypes((int[]) params
1241:                                    .getTypeOIDs().clone());
1242:
1243:                            if (describeOnly)
1244:                                doneAfterRowDescNoData = true;
1245:                            else
1246:                                describeIndex++;
1247:                        }
1248:                        break;
1249:
1250:                    case '2': // Bind Complete  (response to Bind)
1251:                        pgStream.ReceiveInteger4(); // len, discarded
1252:
1253:                        Portal boundPortal = (Portal) pendingBindQueue
1254:                                .get(bindIndex++);
1255:                        if (logger.logDebug())
1256:                            logger.debug(" <=BE BindComplete [" + boundPortal
1257:                                    + "]");
1258:
1259:                        registerOpenPortal(boundPortal);
1260:                        break;
1261:
1262:                    case '3': // Close Complete (response to Close)
1263:                        pgStream.ReceiveInteger4(); // len, discarded
1264:                        if (logger.logDebug())
1265:                            logger.debug(" <=BE CloseComplete");
1266:                        break;
1267:
1268:                    case 'n': // No Data        (response to Describe)
1269:                        pgStream.ReceiveInteger4(); // len, discarded
1270:                        if (logger.logDebug())
1271:                            logger.debug(" <=BE NoData");
1272:
1273:                        if (doneAfterRowDescNoData) {
1274:                            Object describeData[] = (Object[]) pendingDescribeStatementQueue
1275:                                    .get(describeIndex++);
1276:                            Query currentQuery = (Query) describeData[0];
1277:
1278:                            if (fields != null || tuples != null) { // There was a resultset.
1279:                                handler.handleResultRows(currentQuery, fields,
1280:                                        tuples, null);
1281:                                fields = null;
1282:                                tuples = null;
1283:                            }
1284:                        }
1285:                        break;
1286:
1287:                    case 's': // Portal Suspended (end of Execute)
1288:                        // nb: this appears *instead* of CommandStatus.
1289:                        // Must be a SELECT if we suspended, so don't worry about it.
1290:
1291:                        pgStream.ReceiveInteger4(); // len, discarded
1292:                        if (logger.logDebug())
1293:                            logger.debug(" <=BE PortalSuspended");
1294:
1295:                        {
1296:                            Object[] executeData = (Object[]) pendingExecuteQueue
1297:                                    .get(executeIndex++);
1298:                            Query currentQuery = (Query) executeData[0];
1299:                            Portal currentPortal = (Portal) executeData[1];
1300:                            handler.handleResultRows(currentQuery, fields,
1301:                                    tuples, currentPortal);
1302:                        }
1303:
1304:                        fields = null;
1305:                        tuples = null;
1306:                        break;
1307:
1308:                    case 'C': // Command Status (end of Execute)
1309:                        // Handle status.
1310:                        String status = receiveCommandStatus();
1311:
1312:                        doneAfterRowDescNoData = false;
1313:
1314:                        {
1315:                            Object[] executeData = (Object[]) pendingExecuteQueue
1316:                                    .get(executeIndex++);
1317:                            Query currentQuery = (Query) executeData[0];
1318:                            Portal currentPortal = (Portal) executeData[1];
1319:
1320:                            if (fields != null || tuples != null) { // There was a resultset.
1321:                                handler.handleResultRows(currentQuery, fields,
1322:                                        tuples, null);
1323:                                fields = null;
1324:                                tuples = null;
1325:                            } else {
1326:                                interpretCommandStatus(status, handler);
1327:                            }
1328:
1329:                            if (currentPortal != null)
1330:                                currentPortal.close();
1331:                        }
1332:                        break;
1333:
1334:                    case 'D': // Data Transfer (ongoing Execute response)
1335:                        Object tuple = null;
1336:                        try {
1337:                            tuple = pgStream.ReceiveTupleV3();
1338:                        } catch (OutOfMemoryError oome) {
1339:                            if (!noResults) {
1340:                                handler
1341:                                        .handleError(new PSQLException(
1342:                                                GT
1343:                                                        .tr("Ran out of memory retrieving query results."),
1344:                                                PSQLState.OUT_OF_MEMORY, oome));
1345:                            }
1346:                        }
1347:
1348:                        if (!noResults) {
1349:                            if (tuples == null)
1350:                                tuples = new Vector();
1351:                            tuples.addElement(tuple);
1352:                        }
1353:
1354:                        if (logger.logDebug())
1355:                            logger.debug(" <=BE DataRow");
1356:
1357:                        break;
1358:
1359:                    case 'E': // Error Response (response to pretty much everything; backend then skips until Sync)
1360:                        SQLException error = receiveErrorResponse();
1361:                        handler.handleError(error);
1362:
1363:                        // keep processing
1364:                        break;
1365:
1366:                    case 'I': // Empty Query (end of Execute)
1367:                        pgStream.ReceiveInteger4();
1368:
1369:                        if (logger.logDebug())
1370:                            logger.debug(" <=BE EmptyQuery");
1371:
1372:                        {
1373:                            Object[] executeData = (Object[]) pendingExecuteQueue
1374:                                    .get(executeIndex++);
1375:                            Query currentQuery = (Query) executeData[0];
1376:                            Portal currentPortal = (Portal) executeData[1];
1377:                            handler.handleCommandStatus("EMPTY", 0, 0);
1378:                            if (currentPortal != null)
1379:                                currentPortal.close();
1380:                        }
1381:
1382:                        break;
1383:
1384:                    case 'N': // Notice Response
1385:                        SQLWarning warning = receiveNoticeResponse();
1386:                        handler.handleWarning(warning);
1387:                        break;
1388:
1389:                    case 'S': // Parameter Status
1390:                    {
1391:                        int l_len = pgStream.ReceiveInteger4();
1392:                        String name = pgStream.ReceiveString();
1393:                        String value = pgStream.ReceiveString();
1394:                        if (logger.logDebug())
1395:                            logger.debug(" <=BE ParameterStatus(" + name
1396:                                    + " = " + value + ")");
1397:
1398:                        if (name.equals("client_encoding")
1399:                                && !value.equalsIgnoreCase("UNICODE")
1400:                                && !allowEncodingChanges) {
1401:                            protoConnection.close(); // we're screwed now; we can't trust any subsequent string.
1402:                            handler
1403:                                    .handleError(new PSQLException(
1404:                                            GT
1405:                                                    .tr(
1406:                                                            "The server''s client_encoding parameter was changed to {0}. The JDBC driver requires client_encoding to be UNICODE for correct operation.",
1407:                                                            value),
1408:                                            PSQLState.CONNECTION_FAILURE));
1409:                            endQuery = true;
1410:                        }
1411:
1412:                        if (name.equals("DateStyle")
1413:                                && !value.startsWith("ISO,")) {
1414:                            protoConnection.close(); // we're screwed now; we can't trust any subsequent date.
1415:                            handler
1416:                                    .handleError(new PSQLException(
1417:                                            GT
1418:                                                    .tr(
1419:                                                            "The server''s DateStyle parameter was changed to {0}. The JDBC driver requires DateStyle to begin with ISO for correct operation.",
1420:                                                            value),
1421:                                            PSQLState.CONNECTION_FAILURE));
1422:                            endQuery = true;
1423:                        }
1424:
1425:                        if (name.equals("standard_conforming_strings")) {
1426:                            if (value.equals("on"))
1427:                                protoConnection
1428:                                        .setStandardConformingStrings(true);
1429:                            else if (value.equals("off"))
1430:                                protoConnection
1431:                                        .setStandardConformingStrings(false);
1432:                            else {
1433:                                protoConnection.close(); // we're screwed now; we don't know how to escape string literals
1434:                                handler
1435:                                        .handleError(new PSQLException(
1436:                                                GT
1437:                                                        .tr(
1438:                                                                "The server''s standard_conforming_strings parameter was reported as {0}. The JDBC driver expected on or off.",
1439:                                                                value),
1440:                                                PSQLState.CONNECTION_FAILURE));
1441:                                endQuery = true;
1442:                            }
1443:                        }
1444:                    }
1445:                        break;
1446:
1447:                    case 'T': // Row Description (response to Describe)
1448:                        fields = receiveFields();
1449:                        tuples = new Vector();
1450:                        if (doneAfterRowDescNoData) {
1451:                            Object describeData[] = (Object[]) pendingDescribeStatementQueue
1452:                                    .get(describeIndex++);
1453:                            Query currentQuery = (Query) describeData[0];
1454:
1455:                            if (fields != null || tuples != null) { // There was a resultset.
1456:                                handler.handleResultRows(currentQuery, fields,
1457:                                        tuples, null);
1458:                                fields = null;
1459:                                tuples = null;
1460:                            }
1461:                        }
1462:                        break;
1463:
1464:                    case 'Z': // Ready For Query (eventual response to Sync)
1465:                        receiveRFQ();
1466:                        endQuery = true;
1467:
1468:                        // Reset the statement name of Parses that failed.
1469:                        while (parseIndex < pendingParseQueue.size()) {
1470:                            Object[] failedQueryAndStatement = (Object[]) pendingParseQueue
1471:                                    .get(parseIndex++);
1472:                            SimpleQuery failedQuery = (SimpleQuery) failedQueryAndStatement[0];
1473:                            failedQuery.unprepare();
1474:                        }
1475:
1476:                        pendingParseQueue.clear(); // No more ParseComplete messages expected.
1477:                        pendingDescribeStatementQueue.clear(); // No more ParameterDescription messages expected.
1478:                        pendingBindQueue.clear(); // No more BindComplete messages expected.
1479:                        pendingExecuteQueue.clear(); // No more query executions expected.
1480:                        break;
1481:
1482:                    case 'G': // CopyInResponse
1483:                    case 'H': // CopyOutResponse
1484:                    case 'c': // CopyDone
1485:                    case 'd': // CopyData
1486:                    {
1487:                        // COPY FROM STDIN / COPY TO STDOUT, neither of which are currently
1488:                        // supported.
1489:
1490:                        // CopyInResponse can only occur in response to an Execute we sent.
1491:                        // Every Execute we send is followed by either a Bind or a ClosePortal,
1492:                        // so we don't need to send a CopyFail; the server will fail the copy
1493:                        // automatically when it sees the next message.
1494:
1495:                        int l_len = pgStream.ReceiveInteger4();
1496:                        /* discard */
1497:                        pgStream.Receive(l_len);
1498:
1499:                        handler
1500:                                .handleError(new PSQLException(
1501:                                        GT
1502:                                                .tr("The driver currently does not support COPY operations."),
1503:                                        PSQLState.NOT_IMPLEMENTED));
1504:                    }
1505:                        break;
1506:
1507:                    default:
1508:                        throw new IOException("Unexpected packet type: " + c);
1509:                    }
1510:
1511:                }
1512:            }
1513:
1514:            public synchronized void fetch(ResultCursor cursor,
1515:                    ResultHandler handler, int fetchSize) throws SQLException {
1516:                final Portal portal = (Portal) cursor;
1517:
1518:                // Insert a ResultHandler that turns bare command statuses into empty datasets
1519:                // (if the fetch returns no rows, we see just a CommandStatus..)
1520:                final ResultHandler delegateHandler = handler;
1521:                handler = new ResultHandler() {
1522:                    public void handleResultRows(Query fromQuery,
1523:                            Field[] fields, Vector tuples, ResultCursor cursor) {
1524:                        delegateHandler.handleResultRows(fromQuery, fields,
1525:                                tuples, cursor);
1526:                    }
1527:
1528:                    public void handleCommandStatus(String status,
1529:                            int updateCount, long insertOID) {
1530:                        handleResultRows(portal.getQuery(), null, new Vector(),
1531:                                null);
1532:                    }
1533:
1534:                    public void handleWarning(SQLWarning warning) {
1535:                        delegateHandler.handleWarning(warning);
1536:                    }
1537:
1538:                    public void handleError(SQLException error) {
1539:                        delegateHandler.handleError(error);
1540:                    }
1541:
1542:                    public void handleCompletion() throws SQLException {
1543:                        delegateHandler.handleCompletion();
1544:                    }
1545:                };
1546:
1547:                // Now actually run it.
1548:
1549:                try {
1550:                    processDeadParsedQueries();
1551:                    processDeadPortals();
1552:
1553:                    sendExecute(portal.getQuery(), portal, fetchSize);
1554:                    sendSync();
1555:
1556:                    processResults(handler, 0);
1557:                } catch (IOException e) {
1558:                    protoConnection.close();
1559:                    handler
1560:                            .handleError(new PSQLException(
1561:                                    GT
1562:                                            .tr("An I/O error occured while sending to the backend."),
1563:                                    PSQLState.CONNECTION_FAILURE, e));
1564:                }
1565:
1566:                handler.handleCompletion();
1567:            }
1568:
1569:            /*
1570:             * Receive the field descriptions from the back end.
1571:             */
1572:            private Field[] receiveFields() throws IOException {
1573:                int l_msgSize = pgStream.ReceiveInteger4();
1574:                int size = pgStream.ReceiveInteger2();
1575:                Field[] fields = new Field[size];
1576:
1577:                if (logger.logDebug())
1578:                    logger.debug(" <=BE RowDescription(" + size + ")");
1579:
1580:                for (int i = 0; i < fields.length; i++) {
1581:                    String columnLabel = pgStream.ReceiveString();
1582:                    int tableOid = pgStream.ReceiveInteger4();
1583:                    short positionInTable = (short) pgStream.ReceiveInteger2();
1584:                    int typeOid = pgStream.ReceiveInteger4();
1585:                    int typeLength = pgStream.ReceiveInteger2();
1586:                    int typeModifier = pgStream.ReceiveInteger4();
1587:                    int formatType = pgStream.ReceiveInteger2();
1588:                    fields[i] = new Field(columnLabel, null, /* name not yet determined */
1589:                    typeOid, typeLength, typeModifier, tableOid,
1590:                            positionInTable);
1591:                    fields[i].setFormat(formatType);
1592:                }
1593:
1594:                return fields;
1595:            }
1596:
1597:            private void receiveAsyncNotify() throws IOException {
1598:                int msglen = pgStream.ReceiveInteger4();
1599:                int pid = pgStream.ReceiveInteger4();
1600:                String msg = pgStream.ReceiveString();
1601:                String param = pgStream.ReceiveString();
1602:                protoConnection
1603:                        .addNotification(new org.postgresql.core.Notification(
1604:                                msg, pid, param));
1605:
1606:                if (logger.logDebug())
1607:                    logger.debug(" <=BE AsyncNotify(" + pid + "," + msg + ","
1608:                            + param + ")");
1609:            }
1610:
1611:            private SQLException receiveErrorResponse() throws IOException {
1612:                // it's possible to get more than one error message for a query
1613:                // see libpq comments wrt backend closing a connection
1614:                // so, append messages to a string buffer and keep processing
1615:                // check at the bottom to see if we need to throw an exception
1616:
1617:                int elen = pgStream.ReceiveInteger4();
1618:                String totalMessage = pgStream.ReceiveString(elen - 4);
1619:                ServerErrorMessage errorMsg = new ServerErrorMessage(
1620:                        totalMessage, logger.getLogLevel());
1621:
1622:                if (logger.logDebug())
1623:                    logger.debug(" <=BE ErrorMessage(" + errorMsg.toString()
1624:                            + ")");
1625:
1626:                return new PSQLException(errorMsg);
1627:            }
1628:
1629:            private SQLWarning receiveNoticeResponse() throws IOException {
1630:                int nlen = pgStream.ReceiveInteger4();
1631:                ServerErrorMessage warnMsg = new ServerErrorMessage(pgStream
1632:                        .ReceiveString(nlen - 4), logger.getLogLevel());
1633:
1634:                if (logger.logDebug())
1635:                    logger.debug(" <=BE NoticeResponse(" + warnMsg.toString()
1636:                            + ")");
1637:
1638:                return new PSQLWarning(warnMsg);
1639:            }
1640:
1641:            private String receiveCommandStatus() throws IOException {
1642:                //TODO: better handle the msg len
1643:                int l_len = pgStream.ReceiveInteger4();
1644:                //read l_len -5 bytes (-4 for l_len and -1 for trailing \0)
1645:                String status = pgStream.ReceiveString(l_len - 5);
1646:                //now read and discard the trailing \0
1647:                pgStream.Receive(1);
1648:
1649:                if (logger.logDebug())
1650:                    logger.debug(" <=BE CommandStatus(" + status + ")");
1651:
1652:                return status;
1653:            }
1654:
1655:            private void interpretCommandStatus(String status,
1656:                    ResultHandler handler) {
1657:                int update_count = 0;
1658:                long insert_oid = 0;
1659:
1660:                if (status.startsWith("INSERT") || status.startsWith("UPDATE")
1661:                        || status.startsWith("DELETE")
1662:                        || status.startsWith("MOVE")) {
1663:                    try {
1664:                        update_count = Integer.parseInt(status
1665:                                .substring(1 + status.lastIndexOf(' ')));
1666:                        if (status.startsWith("INSERT"))
1667:                            insert_oid = Long.parseLong(status.substring(
1668:                                    1 + status.indexOf(' '), status
1669:                                            .lastIndexOf(' ')));
1670:                    } catch (NumberFormatException nfe) {
1671:                        handler
1672:                                .handleError(new PSQLException(
1673:                                        GT
1674:                                                .tr(
1675:                                                        "Unable to interpret the update count in command completion tag: {0}.",
1676:                                                        status),
1677:                                        PSQLState.CONNECTION_FAILURE));
1678:                        return;
1679:                    }
1680:                }
1681:
1682:                handler.handleCommandStatus(status, update_count, insert_oid);
1683:            }
1684:
1685:            private void receiveRFQ() throws IOException {
1686:                if (pgStream.ReceiveInteger4() != 5)
1687:                    throw new IOException(
1688:                            "unexpected length of ReadyForQuery message");
1689:
1690:                char tStatus = (char) pgStream.ReceiveChar();
1691:                if (logger.logDebug())
1692:                    logger.debug(" <=BE ReadyForQuery(" + tStatus + ")");
1693:
1694:                // Update connection state.
1695:                switch (tStatus) {
1696:                case 'I':
1697:                    protoConnection
1698:                            .setTransactionState(ProtocolConnection.TRANSACTION_IDLE);
1699:                    break;
1700:                case 'T':
1701:                    protoConnection
1702:                            .setTransactionState(ProtocolConnection.TRANSACTION_OPEN);
1703:                    break;
1704:                case 'E':
1705:                    protoConnection
1706:                            .setTransactionState(ProtocolConnection.TRANSACTION_FAILED);
1707:                    break;
1708:                default:
1709:                    throw new IOException(
1710:                            "unexpected transaction state in ReadyForQuery message: "
1711:                                    + (int) tStatus);
1712:                }
1713:            }
1714:
1715:            private final ArrayList pendingParseQueue = new ArrayList(); // list of SimpleQuery instances
1716:            private final ArrayList pendingBindQueue = new ArrayList(); // list of Portal instances
1717:            private final ArrayList pendingExecuteQueue = new ArrayList(); // list of {SimpleQuery,Portal} object arrays
1718:            private final ArrayList pendingDescribeStatementQueue = new ArrayList(); // list of {SimpleQuery, SimpleParameterList, Boolean} object arrays
1719:
1720:            private long nextUniqueID = 1;
1721:            private final ProtocolConnectionImpl protoConnection;
1722:            private final PGStream pgStream;
1723:            private final Logger logger;
1724:            private final boolean allowEncodingChanges;
1725:
1726:            private final SimpleQuery beginTransactionQuery = new SimpleQuery(
1727:                    new String[] { "BEGIN" });
1728:
1729:            private final static SimpleQuery EMPTY_QUERY = new SimpleQuery(
1730:                    new String[] { "" });
1731:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.