Source Code Cross Referenced for JournalledSystem.java in  » Database-DBMS » mckoi » com » mckoi » store » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database DBMS » mckoi » com.mckoi.store 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /**
0002:         * com.mckoi.store.JournalledSystem  11 Jun 2003
0003:         *
0004:         * Mckoi SQL Database ( http://www.mckoi.com/database )
0005:         * Copyright (C) 2000, 2001, 2002  Diehl and Associates, Inc.
0006:         *
0007:         * This program is free software; you can redistribute it and/or
0008:         * modify it under the terms of the GNU General Public License
0009:         * Version 2 as published by the Free Software Foundation.
0010:         *
0011:         * This program is distributed in the hope that it will be useful,
0012:         * but WITHOUT ANY WARRANTY; without even the implied warranty of
0013:         * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
0014:         * GNU General Public License Version 2 for more details.
0015:         *
0016:         * You should have received a copy of the GNU General Public License
0017:         * Version 2 along with this program; if not, write to the Free Software
0018:         * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
0019:         *
0020:         * Change Log:
0021:         * 
0022:         * 
0023:         */package com.mckoi.store;
0024:
0025:        import java.io.*;
0026:        import java.util.HashMap;
0027:        import java.util.ArrayList;
0028:        import java.util.Collections;
0029:        import java.util.Comparator;
0030:        import com.mckoi.debug.DebugLogger;
0031:        import com.mckoi.debug.Lvl;
0032:        import com.mckoi.util.ByteArrayUtil;
0033:        import com.mckoi.store.LoggingBufferManager.StoreDataAccessorFactory;
0034:
0035:        /**
0036:         * Manages a journalling data store management system.  All operations are
0037:         * written out to a log that can be easily recovered from if a crash occurs.
0038:         *
0039:         * @author Tobias Downer
0040:         */
0041:
0042:        class JournalledSystem {
0043:
0044:            /**
0045:             * Set to true for logging behaviour.
0046:             */
0047:            private final boolean ENABLE_LOGGING;
0048:
0049:            /**
0050:             * The path to the journal files.
0051:             */
0052:            private final File journal_path;
0053:
0054:            /**
0055:             * If the journal system is in read only mode.
0056:             */
0057:            private final boolean read_only;
0058:
0059:            /**
0060:             * The page size.
0061:             */
0062:            private final int page_size;
0063:
0064:            /**
0065:             * The map of all resources that are available.  (resource_name -> Resource)
0066:             */
0067:            private HashMap all_resources;
0068:
0069:            /**
0070:             * The unique sequence id counter for this session.
0071:             */
0072:            private long seq_id;
0073:
0074:            /**
0075:             * The archive of journal files currently pending (JournalFile).
0076:             */
0077:            private final ArrayList journal_archives;
0078:
0079:            /**
0080:             * The current top journal file.
0081:             */
0082:            private JournalFile top_journal_file;
0083:
0084:            /**
0085:             * The current journal file number.
0086:             */
0087:            private long journal_number;
0088:
0089:            /**
0090:             * A factory that creates StoreDataAccessor objects used to access the
0091:             * resource with the given name.
0092:             */
0093:            private final StoreDataAccessorFactory sda_factory;
0094:
0095:            /**
0096:             * Mutex when accessing the top journal file.
0097:             */
0098:            private final Object top_journal_lock = new Object();
0099:
0100:            /**
0101:             * A thread that runs in the background and persists information that is in
0102:             * the journal.
0103:             */
0104:            private JournalingThread journaling_thread;
0105:
0106:            /**
0107:             * A debug log to output information to.
0108:             */
0109:            private final DebugLogger debug;
0110:
0111:            JournalledSystem(File journal_path, boolean read_only,
0112:                    int page_size, StoreDataAccessorFactory sda_factory,
0113:                    DebugLogger debug, boolean enable_logging) {
0114:                this .journal_path = journal_path;
0115:                this .read_only = read_only;
0116:                this .page_size = page_size;
0117:                this .sda_factory = sda_factory;
0118:                all_resources = new HashMap();
0119:                journal_number = 0;
0120:                journal_archives = new ArrayList();
0121:                this .debug = debug;
0122:                this .ENABLE_LOGGING = enable_logging;
0123:            }
0124:
0125:            /**
0126:             * Returns a journal file name with the given number.  The journal number
0127:             * must be between 10 and 63
0128:             */
0129:            private static String getJournalFileName(int number) {
0130:                if (number < 10 || number > 73) {
0131:                    throw new Error("Journal file name out of range.");
0132:                }
0133:                return "jnl" + number;
0134:            }
0135:
0136:            // Lock used during initialization
0137:            private final Object init_lock = new Object();
0138:
0139:            /**
0140:             * Starts the journal system.
0141:             */
0142:            void start() throws IOException {
0143:                if (ENABLE_LOGGING) {
0144:                    synchronized (init_lock) {
0145:                        if (journaling_thread == null) {
0146:                            // Start the background journaling thread,
0147:                            journaling_thread = new JournalingThread();
0148:                            journaling_thread.start();
0149:                            // Scan for any changes and make the changes.
0150:                            rollForwardRecover();
0151:                            if (!read_only) {
0152:                                // Create a new top journal file
0153:                                newTopJournalFile();
0154:                            }
0155:                        } else {
0156:                            throw new Error(
0157:                                    "Assertion failed - already started.");
0158:                        }
0159:                    }
0160:                }
0161:            }
0162:
0163:            /**
0164:             * Stops the journal system.  This will persist any pending changes up to the
0165:             * last check point and then finish.
0166:             */
0167:            void stop() throws IOException {
0168:                if (ENABLE_LOGGING) {
0169:                    synchronized (init_lock) {
0170:                        if (journaling_thread != null) {
0171:                            // Stop the journal thread
0172:                            journaling_thread.persistArchives(0);
0173:                            journaling_thread.finish();
0174:                            journaling_thread.waitUntilFinished();
0175:                            journaling_thread = null;
0176:                        } else {
0177:                            throw new Error(
0178:                                    "Assertion failed - already stopped.");
0179:                        }
0180:                    }
0181:
0182:                    if (!read_only) {
0183:                        // Close any remaining journals and roll forward recover (shouldn't
0184:                        // actually be necessary but just incase...)
0185:                        synchronized (top_journal_lock) {
0186:                            // Close all the journals
0187:                            int sz = journal_archives.size();
0188:                            for (int i = 0; i < sz; ++i) {
0189:                                JournalFile jf = (JournalFile) journal_archives
0190:                                        .get(i);
0191:                                jf.close();
0192:                            }
0193:                            // Close the top journal
0194:                            topJournal().close();
0195:                            // Scan for journals and make the changes.
0196:                            rollForwardRecover();
0197:                        }
0198:                    }
0199:
0200:                }
0201:            }
0202:
0203:            /**
0204:             * Recovers any lost operations that are currently in the journal.  This
0205:             * retries all logged entries.  This would typically be called before any
0206:             * other IO operations.
0207:             */
0208:            void rollForwardRecover() throws IOException {
0209:                //    System.out.println("rollForwardRecover()");
0210:
0211:                // The list of all journal files,
0212:                ArrayList journal_files_list = new ArrayList();
0213:
0214:                // Scan the journal path for any journal files.
0215:                for (int i = 10; i < 74; ++i) {
0216:                    String journal_fn = getJournalFileName(i);
0217:                    File f = new File(journal_path, journal_fn);
0218:                    // If the journal exists, create a summary of the journal
0219:                    if (f.exists()) {
0220:                        if (read_only) {
0221:                            throw new IOException(
0222:                                    "Journal file "
0223:                                            + f
0224:                                            + " exists for a read-only session.  "
0225:                                            + "There may not be any pending journals for a read-only session.");
0226:                        }
0227:
0228:                        JournalFile jf = new JournalFile(f, read_only);
0229:                        // Open the journal file for recovery.  This will set various
0230:                        // information about the journal such as the last check point and the
0231:                        // id of the journal file.
0232:                        JournalSummary summary = jf.openForRecovery();
0233:                        // If the journal can be recovered from.
0234:                        if (summary.can_be_recovered) {
0235:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0236:                                debug.write(Lvl.INFORMATION, this , "Journal "
0237:                                        + jf + " found - can be recovered.");
0238:                            }
0239:                            journal_files_list.add(summary);
0240:                        } else {
0241:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0242:                                debug.write(Lvl.INFORMATION, this , "Journal "
0243:                                        + jf
0244:                                        + " deleting - nothing to recover.");
0245:                            }
0246:                            // Otherwise close and delete it
0247:                            jf.closeAndDelete();
0248:                        }
0249:                    }
0250:                }
0251:
0252:                //    if (journal_files_list.size() == 0) {
0253:                //      System.out.println("Nothing to recover.");
0254:                //    }
0255:
0256:                // Sort the journal file list from oldest to newest.  The oldest journals
0257:                // are recovered first.
0258:                Collections.sort(journal_files_list, journal_list_comparator);
0259:
0260:                long last_journal_number = -1;
0261:
0262:                // Persist the journals
0263:                for (int i = 0; i < journal_files_list.size(); ++i) {
0264:                    JournalSummary summary = (JournalSummary) journal_files_list
0265:                            .get(i);
0266:
0267:                    // Check the resources for this summary
0268:                    ArrayList res_list = summary.resource_list;
0269:                    for (int n = 0; n < res_list.size(); ++n) {
0270:                        String resource_name = (String) res_list.get(n);
0271:                        // This puts the resource into the hash map.
0272:                        JournalledResource resource = createResource(resource_name);
0273:                    }
0274:
0275:                    // Assert that we are recovering the journals in the correct order
0276:                    JournalFile jf = summary.journal_file;
0277:                    if (jf.journal_number < last_journal_number) {
0278:                        throw new Error("Assertion failed, sort failed.");
0279:                    }
0280:                    last_journal_number = jf.journal_number;
0281:
0282:                    if (debug.isInterestedIn(Lvl.INFORMATION)) {
0283:                        debug.write(Lvl.INFORMATION, this , "Recovering: " + jf
0284:                                + " (8 .. " + summary.last_checkpoint + ")");
0285:                    }
0286:
0287:                    jf.persist(8, summary.last_checkpoint);
0288:                    // Then close and delete.
0289:                    jf.closeAndDelete();
0290:
0291:                    // Check the resources for this summary and close them
0292:                    for (int n = 0; n < res_list.size(); ++n) {
0293:                        String resource_name = (String) res_list.get(n);
0294:                        AbstractResource resource = (AbstractResource) createResource(resource_name);
0295:                        // When we finished, make sure the resource is closed again
0296:                        // Close the resource
0297:                        resource.persistClose();
0298:                        // Post recover notification
0299:                        resource.notifyPostRecover();
0300:                    }
0301:
0302:                }
0303:
0304:            }
0305:
0306:            private Comparator journal_list_comparator = new Comparator() {
0307:
0308:                public int compare(Object ob1, Object ob2) {
0309:                    JournalSummary js1 = (JournalSummary) ob1;
0310:                    JournalSummary js2 = (JournalSummary) ob2;
0311:
0312:                    long jn1 = js1.journal_file.getJournalNumber();
0313:                    long jn2 = js2.journal_file.getJournalNumber();
0314:
0315:                    if (jn1 > jn2) {
0316:                        return 1;
0317:                    } else if (jn1 < jn2) {
0318:                        return -1;
0319:                    } else {
0320:                        return 0;
0321:                    }
0322:                }
0323:
0324:            };
0325:
0326:            /**
0327:             * Creates a new top journal file.
0328:             */
0329:            private void newTopJournalFile() throws IOException {
0330:                //    // Move the old journal to the archive?
0331:                //    if (top_journal_file != null) {
0332:                //      journal_archives.add(top_journal_file);
0333:                //    }
0334:
0335:                String journal_fn = getJournalFileName((int) ((journal_number & 63) + 10));
0336:                ++journal_number;
0337:
0338:                File f = new File(journal_path, journal_fn);
0339:                if (f.exists()) {
0340:                    throw new IOException("Journal file already exists.");
0341:                }
0342:
0343:                top_journal_file = new JournalFile(f, read_only);
0344:                top_journal_file.open(journal_number - 1);
0345:            }
0346:
0347:            /**
0348:             * Returns the current top journal file.
0349:             */
0350:            private JournalFile topJournal() {
0351:                synchronized (top_journal_lock) {
0352:                    return top_journal_file;
0353:                }
0354:            }
0355:
0356:            /**
0357:             * Creates a resource.
0358:             */
0359:            public JournalledResource createResource(String resource_name) {
0360:                AbstractResource resource;
0361:                synchronized (all_resources) {
0362:                    // Has this resource previously been open?
0363:                    resource = (AbstractResource) all_resources
0364:                            .get(resource_name);
0365:                    if (resource == null) {
0366:                        // No...
0367:                        // Create a unique id for this
0368:                        final long id = seq_id;
0369:                        ++seq_id;
0370:                        // Create the StoreDataAccessor for this resource.
0371:                        StoreDataAccessor accessor = sda_factory
0372:                                .createStoreDataAccessor(resource_name);
0373:                        if (ENABLE_LOGGING) {
0374:                            resource = new Resource(resource_name, id, accessor);
0375:                        } else {
0376:                            resource = new NonLoggingResource(resource_name,
0377:                                    id, accessor);
0378:                        }
0379:                        // Put this in the map.
0380:                        all_resources.put(resource_name, resource);
0381:                    }
0382:                }
0383:
0384:                // Return the resource
0385:                return resource;
0386:            }
0387:
0388:            /**
0389:             * Sets a check point in the log.  If 'flush_journals' is true then when the
0390:             * method returns we are guarenteed that all the journals are flushed and the
0391:             * data is absolutely current.  If 'flush_journals' is false then we can't
0392:             * assume the journals will be empty when the method returns.
0393:             */
0394:            void setCheckPoint(boolean flush_journals) throws IOException {
0395:                // No Logging
0396:                if (!ENABLE_LOGGING) {
0397:                    return;
0398:                }
0399:                // Return if read-only
0400:                if (read_only) {
0401:                    return;
0402:                }
0403:
0404:                boolean something_to_persist;
0405:
0406:                synchronized (top_journal_lock) {
0407:                    JournalFile top_j = topJournal();
0408:
0409:                    // When the journal exceeds a threshold then we cycle the top journal
0410:                    if (flush_journals || top_j.size() > (256 * 1024)) {
0411:                        // Cycle to the next journal file
0412:                        newTopJournalFile();
0413:                        // Add this to the archives
0414:                        journal_archives.add(top_j);
0415:                    }
0416:                    something_to_persist = journal_archives.size() > 0;
0417:                    top_j.setCheckPoint();
0418:                }
0419:
0420:                if (something_to_persist) {
0421:                    // Notifies the background thread that there is something to persist.
0422:                    // This will block until there are at most 10 journal files open.
0423:                    journaling_thread.persistArchives(10);
0424:                }
0425:
0426:            }
0427:
0428:            /**
0429:             * Returns the Resource with the given name.
0430:             */
0431:            private AbstractResource getResource(String resource_name) {
0432:                synchronized (all_resources) {
0433:                    return (AbstractResource) all_resources.get(resource_name);
0434:                }
0435:            }
0436:
0437:            // ---------- Inner classes ----------
0438:
0439:            /**
0440:             * A JournalFile represents a file in which modification are logged out to
0441:             * when changes are made.  A JournalFile contains instructions for rebuilding
0442:             * a resource to a known stable state.
0443:             */
0444:            private final class JournalFile {
0445:
0446:                /**
0447:                 * The File object of this journal in the file system.
0448:                 */
0449:                private File file;
0450:
0451:                /**
0452:                 * True if the journal file is read only.
0453:                 */
0454:                private boolean read_only;
0455:
0456:                /**
0457:                 * The StreamFile object for reading and writing entries to/from the
0458:                 * journal.
0459:                 */
0460:                private StreamFile data;
0461:
0462:                /**
0463:                 * A DataOutputStream object used to write entries to the journal file.
0464:                 */
0465:                private DataOutputStream data_out;
0466:
0467:                /**
0468:                 * Small buffer.
0469:                 */
0470:                private byte[] buffer;
0471:
0472:                /**
0473:                 * A map between a resource name and an id for this journal file.
0474:                 */
0475:                private HashMap resource_id_map;
0476:
0477:                /**
0478:                 * The sequence id for resources modified in this log.
0479:                 */
0480:                private long cur_seq_id;
0481:
0482:                /**
0483:                 * The journal number of this journal.
0484:                 */
0485:                private long journal_number;
0486:
0487:                /**
0488:                 * True when open.
0489:                 */
0490:                private boolean is_open;
0491:
0492:                /**
0493:                 * The number of threads currently looking at info in this journal.
0494:                 */
0495:                private int reference_count;
0496:
0497:                /**
0498:                 * Constructs the journal file.
0499:                 */
0500:                public JournalFile(File file, boolean read_only) {
0501:                    this .file = file;
0502:                    this .read_only = read_only;
0503:                    this .is_open = false;
0504:                    buffer = new byte[36];
0505:                    resource_id_map = new HashMap();
0506:                    cur_seq_id = 0;
0507:                    reference_count = 1;
0508:                }
0509:
0510:                /**
0511:                 * Returns the size of the journal file in bytes.
0512:                 */
0513:                long size() {
0514:                    return data.length();
0515:                }
0516:
0517:                /**
0518:                 * Returns the journal number assigned to this journal.
0519:                 */
0520:                long getJournalNumber() {
0521:                    return journal_number;
0522:                }
0523:
0524:                /**
0525:                 * Opens the journal file.  If the journal file exists then an error is
0526:                 * generated.
0527:                 */
0528:                void open(long journal_number) throws IOException {
0529:                    if (is_open) {
0530:                        throw new IOException("Journal file is already open.");
0531:                    }
0532:                    if (file.exists()) {
0533:                        throw new IOException("Journal file already exists.");
0534:                    }
0535:
0536:                    this .journal_number = journal_number;
0537:                    data = new StreamFile(file, read_only ? "r" : "rw");
0538:                    data_out = new DataOutputStream(new BufferedOutputStream(
0539:                            data.getOutputStream()));
0540:                    data_out.writeLong(journal_number);
0541:                    is_open = true;
0542:                }
0543:
0544:                /**
0545:                 * Opens the journal for recovery.  This scans the journal and generates
0546:                 * some statistics about the journal file such as the last check point and
0547:                 * the journal number.  If the journal file doesn't exist then an error is
0548:                 * generated.
0549:                 */
0550:                JournalSummary openForRecovery() throws IOException {
0551:                    if (is_open) {
0552:                        throw new IOException("Journal file is already open.");
0553:                    }
0554:                    if (!file.exists()) {
0555:                        throw new IOException("Journal file does not exists.");
0556:                    }
0557:
0558:                    // Open the random access file to this journal
0559:                    data = new StreamFile(file, read_only ? "r" : "rw");
0560:                    //      data_out = new DataOutputStream(
0561:                    //                           new BufferedOutputStream(data.getOutputStream()));
0562:
0563:                    is_open = true;
0564:
0565:                    // Create the summary object (by default, not recoverable).
0566:                    JournalSummary summary = new JournalSummary(this );
0567:
0568:                    long end_pointer = data.length();
0569:
0570:                    // If end_pointer < 8 then can't recovert this journal
0571:                    if (end_pointer < 8) {
0572:                        return summary;
0573:                    }
0574:
0575:                    // The input stream.
0576:                    final DataInputStream din = new DataInputStream(
0577:                            new BufferedInputStream(data.getInputStream()));
0578:
0579:                    try {
0580:                        // Set the journal number for this
0581:                        this .journal_number = din.readLong();
0582:                        long position = 8;
0583:
0584:                        ArrayList checkpoint_res_list = new ArrayList();
0585:
0586:                        // Start scan
0587:                        while (true) {
0588:
0589:                            // If we can't read 12 bytes ahead, return the summary
0590:                            if (position + 12 > end_pointer) {
0591:                                return summary;
0592:                            }
0593:
0594:                            long type = din.readLong();
0595:                            int size = din.readInt();
0596:
0597:                            //          System.out.println("Scan: " + type + " pos=" + position + " size=" + size);
0598:                            position = position + size + 12;
0599:
0600:                            boolean skip_body = true;
0601:
0602:                            // If checkpoint reached then we are recoverable
0603:                            if (type == 100) {
0604:                                summary.last_checkpoint = position;
0605:                                summary.can_be_recovered = true;
0606:
0607:                                // Add the resources in this check point
0608:                                summary.resource_list
0609:                                        .addAll(checkpoint_res_list);
0610:                                // And clear the temporary list.
0611:                                checkpoint_res_list.clear();
0612:
0613:                            }
0614:
0615:                            // If end reached, or type is not understood then return
0616:                            else if (position >= end_pointer || type < 1
0617:                                    || type > 7) {
0618:                                return summary;
0619:                            }
0620:
0621:                            // If we are resource type, then load the resource
0622:                            if (type == 2) {
0623:
0624:                                // We don't skip body for this type, we read the content
0625:                                skip_body = false;
0626:                                long id = din.readLong();
0627:                                int str_len = din.readInt();
0628:                                StringBuffer str = new StringBuffer(str_len);
0629:                                for (int i = 0; i < str_len; ++i) {
0630:                                    str.append(din.readChar());
0631:                                }
0632:
0633:                                String resource_name = new String(str);
0634:                                checkpoint_res_list.add(resource_name);
0635:
0636:                            }
0637:
0638:                            if (skip_body) {
0639:                                int to_skip = size;
0640:                                while (to_skip > 0) {
0641:                                    to_skip -= din.skip(to_skip);
0642:                                }
0643:                            }
0644:
0645:                        }
0646:
0647:                    } finally {
0648:                        din.close();
0649:                    }
0650:
0651:                }
0652:
0653:                /**
0654:                 * Closes the journal file.
0655:                 */
0656:                void close() throws IOException {
0657:                    synchronized (this ) {
0658:                        if (!is_open) {
0659:                            throw new IOException(
0660:                                    "Journal file is already closed.");
0661:                        }
0662:
0663:                        data.close();
0664:                        data = null;
0665:                        is_open = false;
0666:                    }
0667:                }
0668:
0669:                /**
0670:                 * Returns true if the journal is deleted.
0671:                 */
0672:                boolean isDeleted() {
0673:                    synchronized (this ) {
0674:                        return data == null;
0675:                    }
0676:                }
0677:
0678:                /**
0679:                 * Closes and deletes the journal file.  This may not immediately close and
0680:                 * delete the journal file if there are currently references to it (for
0681:                 * example, in the middle of a read operation).
0682:                 */
0683:                void closeAndDelete() throws IOException {
0684:                    synchronized (this ) {
0685:                        --reference_count;
0686:                        if (reference_count == 0) {
0687:                            // Close and delete the journal file.
0688:                            close();
0689:                            boolean b = file.delete();
0690:                            if (!b) {
0691:                                System.out
0692:                                        .println("Unable to delete journal file: "
0693:                                                + file);
0694:                            }
0695:                        }
0696:                    }
0697:                }
0698:
0699:                /**
0700:                 * Adds a reference preventing the journal file from being deleted.
0701:                 */
0702:                void addReference() {
0703:                    synchronized (this ) {
0704:                        if (reference_count != 0) {
0705:                            ++reference_count;
0706:                        }
0707:                    }
0708:                }
0709:
0710:                /**
0711:                 * Removes a reference, if we are at the last reference the journal file is
0712:                 * deleted.
0713:                 */
0714:                void removeReference() throws IOException {
0715:                    closeAndDelete();
0716:                }
0717:
0718:                /**
0719:                 * Plays the log from the given offset in the file to the next checkpoint.
0720:                 * This will actually persist the log.  Returns -1 if the end of the journal
0721:                 * is reached.
0722:                 * <p>
0723:                 * NOTE: This will not verify that the journal is correct.  Verification
0724:                 *   should be done before the persist.
0725:                 */
0726:                void persist(final long start, final long end)
0727:                        throws IOException {
0728:
0729:                    if (debug.isInterestedIn(Lvl.INFORMATION)) {
0730:                        debug.write(Lvl.INFORMATION, this , "Persisting: "
0731:                                + file);
0732:                    }
0733:
0734:                    final DataInputStream din = new DataInputStream(
0735:                            new BufferedInputStream(data.getInputStream()));
0736:                    long count = start;
0737:                    // Skip to the offset
0738:                    while (count > 0) {
0739:                        count -= din.skip(count);
0740:                    }
0741:
0742:                    // The list of resources we updated
0743:                    ArrayList resources_updated = new ArrayList();
0744:
0745:                    // A map from resource id to resource name for this journal.
0746:                    HashMap id_name_map = new HashMap();
0747:
0748:                    boolean finished = false;
0749:                    long position = start;
0750:
0751:                    while (!finished) {
0752:                        long type = din.readLong();
0753:                        int size = din.readInt();
0754:                        position = position + size + 12;
0755:
0756:                        if (type == 2) { // Resource id tag
0757:                            long id = din.readLong();
0758:                            int len = din.readInt();
0759:                            StringBuffer buf = new StringBuffer(len);
0760:                            for (int i = 0; i < len; ++i) {
0761:                                buf.append(din.readChar());
0762:                            }
0763:                            String resource_name = new String(buf);
0764:
0765:                            // Put this in the map
0766:                            id_name_map.put(new Long(id), resource_name);
0767:
0768:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0769:                                debug.write(Lvl.INFORMATION, this ,
0770:                                        "Journal Command: Tag: " + id + " = "
0771:                                                + resource_name);
0772:                            }
0773:
0774:                            // Add this to the list of resources we updated.
0775:                            resources_updated.add(getResource(resource_name));
0776:
0777:                        } else if (type == 6) { // Resource delete
0778:                            long id = din.readLong();
0779:                            String resource_name = (String) id_name_map
0780:                                    .get(new Long(id));
0781:                            AbstractResource resource = getResource(resource_name);
0782:
0783:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0784:                                debug.write(Lvl.INFORMATION, this ,
0785:                                        "Journal Command: Delete: "
0786:                                                + resource_name);
0787:                            }
0788:
0789:                            resource.persistDelete();
0790:
0791:                        } else if (type == 3) { // Resource size change
0792:                            long id = din.readLong();
0793:                            long new_size = din.readLong();
0794:                            String resource_name = (String) id_name_map
0795:                                    .get(new Long(id));
0796:                            AbstractResource resource = getResource(resource_name);
0797:
0798:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0799:                                debug.write(Lvl.INFORMATION, this ,
0800:                                        "Journal Command: Set Size: "
0801:                                                + resource_name + " size = "
0802:                                                + new_size);
0803:                            }
0804:
0805:                            resource.persistSetSize(new_size);
0806:
0807:                        } else if (type == 1) { // Page modification
0808:                            long id = din.readLong();
0809:                            long page = din.readLong();
0810:                            int off = din.readInt();
0811:                            int len = din.readInt();
0812:
0813:                            String resource_name = (String) id_name_map
0814:                                    .get(new Long(id));
0815:                            AbstractResource resource = getResource(resource_name);
0816:
0817:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0818:                                debug.write(Lvl.INFORMATION, this ,
0819:                                        "Journal Command: Page Modify: "
0820:                                                + resource_name + " page = "
0821:                                                + page + " off = " + off
0822:                                                + " len = " + len);
0823:                            }
0824:
0825:                            resource.persistPageChange(page, off, len, din);
0826:
0827:                        } else if (type == 100) { // Checkpoint (end)
0828:
0829:                            if (debug.isInterestedIn(Lvl.INFORMATION)) {
0830:                                debug.write(Lvl.INFORMATION, this ,
0831:                                        "Journal Command: Check Point.");
0832:                            }
0833:
0834:                            if (position == end) {
0835:                                finished = true;
0836:                            }
0837:                        }
0838:
0839:                        else {
0840:                            throw new Error("Unknown tag type: " + type
0841:                                    + " position = " + position);
0842:                        }
0843:
0844:                    } // while (!finished)
0845:
0846:                    // Synch all the resources that we have updated.
0847:                    int sz = resources_updated.size();
0848:                    for (int i = 0; i < sz; ++i) {
0849:                        AbstractResource r = (AbstractResource) resources_updated
0850:                                .get(i);
0851:                        if (debug.isInterestedIn(Lvl.INFORMATION)) {
0852:                            debug.write(Lvl.INFORMATION, this , "Synch: " + r);
0853:                        }
0854:                        r.synch();
0855:                    }
0856:
0857:                    din.close();
0858:
0859:                }
0860:
0861:                /**
0862:                 * Writes a resource identifier to the stream for the resource with the
0863:                 * given name.
0864:                 */
0865:                private Long writeResourceName(String resource_name,
0866:                        DataOutputStream out) throws IOException {
0867:                    Long v;
0868:                    synchronized (resource_id_map) {
0869:                        v = (Long) resource_id_map.get(resource_name);
0870:                        if (v == null) {
0871:                            ++cur_seq_id;
0872:
0873:                            int len = resource_name.length();
0874:
0875:                            // Write the header for this resource
0876:                            out.writeLong(2);
0877:                            out.writeInt(8 + 4 + (len * 2));
0878:                            out.writeLong(cur_seq_id);
0879:                            out.writeInt(len);
0880:                            out.writeChars(resource_name);
0881:
0882:                            // Put this id in the cache
0883:                            v = new Long(cur_seq_id);
0884:                            resource_id_map.put(resource_name, v);
0885:                        }
0886:                    }
0887:
0888:                    return v;
0889:                }
0890:
0891:                /**
0892:                 * Logs that a resource was deleted.
0893:                 */
0894:                void logResourceDelete(String resource_name) throws IOException {
0895:
0896:                    synchronized (this ) {
0897:                        // Build the header,
0898:                        Long v = writeResourceName(resource_name, data_out);
0899:
0900:                        // Write the header
0901:                        long resource_id = v.longValue();
0902:                        data_out.writeLong(6);
0903:                        data_out.writeInt(8);
0904:                        data_out.writeLong(resource_id);
0905:
0906:                    }
0907:
0908:                }
0909:
0910:                /**
0911:                 * Logs a resource size change.
0912:                 */
0913:                void logResourceSizeChange(String resource_name, long new_size)
0914:                        throws IOException {
0915:                    synchronized (this ) {
0916:                        // Build the header,
0917:                        Long v = writeResourceName(resource_name, data_out);
0918:
0919:                        // Write the header
0920:                        long resource_id = v.longValue();
0921:                        data_out.writeLong(3);
0922:                        data_out.writeInt(8 + 8);
0923:                        data_out.writeLong(resource_id);
0924:                        data_out.writeLong(new_size);
0925:
0926:                    }
0927:
0928:                }
0929:
0930:                /**
0931:                 * Sets a check point.  This will add an entry to the log.
0932:                 */
0933:                void setCheckPoint() throws IOException {
0934:                    synchronized (this ) {
0935:
0936:                        data_out.writeLong(100);
0937:                        data_out.writeInt(0);
0938:
0939:                        // Flush and synch the journal file
0940:                        flushAndSynch();
0941:                    }
0942:                }
0943:
0944:                /**
0945:                 * Logs a page modification to the end of the log and returns a pointer
0946:                 * in the file to the modification.
0947:                 */
0948:                JournalEntry logPageModification(String resource_name,
0949:                        long page_number, byte[] buf, int off, int len)
0950:                        throws IOException {
0951:
0952:                    long ref;
0953:                    synchronized (this ) {
0954:                        // Build the header,
0955:                        Long v = writeResourceName(resource_name, data_out);
0956:
0957:                        // The absolute position of the page,
0958:                        final long absolute_position = page_number * page_size;
0959:
0960:                        // Write the header
0961:                        long resource_id = v.longValue();
0962:                        data_out.writeLong(1);
0963:                        data_out.writeInt(8 + 8 + 4 + 4 + len);
0964:                        data_out.writeLong(resource_id);
0965:                        //        data_out.writeLong(page_number);
0966:                        //        data_out.writeInt(off);
0967:                        data_out.writeLong(absolute_position / 8192);
0968:                        data_out.writeInt(off
0969:                                + (int) (absolute_position & 8191));
0970:                        data_out.writeInt(len);
0971:
0972:                        data_out.write(buf, off, len);
0973:
0974:                        // Flush the changes so we can work out the pointer.
0975:                        data_out.flush();
0976:                        ref = data.length() - len - 36;
0977:                    }
0978:
0979:                    // Returns a JournalEntry object
0980:                    return new JournalEntry(resource_name, this , ref,
0981:                            page_number);
0982:                }
0983:
0984:                /**
0985:                 * Reconstructs a modification that is logged in this journal.
0986:                 */
0987:                void buildPage(long in_page_number, long position, byte[] buf,
0988:                        int off) throws IOException {
0989:                    long type;
0990:                    long resource_id;
0991:                    long page_number;
0992:                    int page_offset;
0993:                    int page_length;
0994:
0995:                    synchronized (this ) {
0996:                        data.readFully(position, buffer, 0, 36);
0997:                        type = ByteArrayUtil.getLong(buffer, 0);
0998:                        resource_id = ByteArrayUtil.getLong(buffer, 12);
0999:                        page_number = ByteArrayUtil.getLong(buffer, 20);
1000:                        page_offset = ByteArrayUtil.getInt(buffer, 28);
1001:                        page_length = ByteArrayUtil.getInt(buffer, 32);
1002:
1003:                        // Some asserts,
1004:                        if (type != 1) {
1005:                            throw new IOException("Invalid page type. type = "
1006:                                    + type + " pos = " + position);
1007:                        }
1008:                        if (page_number != in_page_number) {
1009:                            throw new IOException("Page numbers do not match.");
1010:                        }
1011:
1012:                        // Read the content.
1013:                        data.readFully(position + 36, buf, off + page_offset,
1014:                                page_length);
1015:                    }
1016:
1017:                }
1018:
1019:                /**
1020:                 * Synchronizes the log.
1021:                 */
1022:                void flushAndSynch() throws IOException {
1023:                    synchronized (this ) {
1024:                        data_out.flush();
1025:                        data.synch();
1026:                    }
1027:                }
1028:
1029:                public String toString() {
1030:                    return "[JOURNAL: " + file.getName() + "]";
1031:                }
1032:
1033:            }
1034:
1035:            /**
1036:             * A JournalEntry represents a modification that has been logging in the
1037:             * journal for a specific page of a resource.  It contains the name of the
1038:             * log file, the position in the journal of the modification, and the page
1039:             * number.
1040:             */
1041:            private static final class JournalEntry {
1042:
1043:                /**
1044:                 * The resource that this page is on.
1045:                 */
1046:                private final String resource_name;
1047:
1048:                /**
1049:                 * The journal file.
1050:                 */
1051:                private final JournalFile journal;
1052:
1053:                /**
1054:                 * The position in the journal file.
1055:                 */
1056:                private final long position;
1057:
1058:                /**
1059:                 * The page number of this modification.
1060:                 */
1061:                private final long page_number;
1062:
1063:                /**
1064:                 * The next journal entry with the same page number
1065:                 */
1066:                JournalEntry next_page;
1067:
1068:                /**
1069:                 * Constructs the entry.
1070:                 */
1071:                public JournalEntry(String resource_name, JournalFile journal,
1072:                        long position, long page_number) {
1073:                    this .resource_name = resource_name;
1074:                    this .journal = journal;
1075:                    this .position = position;
1076:                    this .page_number = page_number;
1077:                }
1078:
1079:                /**
1080:                 * Returns the journal file for this entry.
1081:                 */
1082:                public JournalFile getJournalFile() {
1083:                    return journal;
1084:                }
1085:
1086:                /**
1087:                 * Returns the position of the log entry in the journal file.
1088:                 */
1089:                public long getPosition() {
1090:                    return position;
1091:                }
1092:
1093:                /**
1094:                 * Returns the page number of this modification log entry.
1095:                 */
1096:                public long getPageNumber() {
1097:                    return page_number;
1098:                }
1099:
1100:            }
1101:
1102:            /**
1103:             * An abstract resource.
1104:             */
1105:            private abstract class AbstractResource implements 
1106:                    JournalledResource {
1107:
1108:                /**
1109:                 * The unique name given this resource (the file name).
1110:                 */
1111:                protected final String name;
1112:
1113:                /**
1114:                 * The id assigned to this resource by this session.  This id should not
1115:                 * be used in any external source.
1116:                 */
1117:                protected final long id;
1118:
1119:                /**
1120:                 * The backing object.
1121:                 */
1122:                protected final StoreDataAccessor data;
1123:
1124:                /**
1125:                 * True if this resource is read_only.
1126:                 */
1127:                protected boolean read_only;
1128:
1129:                /**
1130:                 * Constructs the resource.
1131:                 */
1132:                AbstractResource(String name, long id, StoreDataAccessor data) {
1133:                    this .name = name;
1134:                    this .id = id;
1135:                    this .data = data;
1136:                }
1137:
1138:                // ---------- Persist methods ----------
1139:
1140:                abstract void persistClose() throws IOException;
1141:
1142:                abstract void persistDelete() throws IOException;
1143:
1144:                abstract void persistSetSize(final long new_size)
1145:                        throws IOException;
1146:
1147:                abstract void persistPageChange(final long page, final int off,
1148:                        int len, DataInputStream din) throws IOException;
1149:
1150:                abstract void synch() throws IOException;
1151:
1152:                // Called after a rollForwardRecover to notify the resource to update its
1153:                // state to reflect the fact that changes have occurred.
1154:                abstract void notifyPostRecover();
1155:
1156:                // ----------
1157:
1158:                /**
1159:                 * Returns the size of the page.
1160:                 */
1161:                public int getPageSize() {
1162:                    return page_size;
1163:                }
1164:
1165:                /**
1166:                 * Returns the unique id of this page.
1167:                 */
1168:                public long getID() {
1169:                    return id;
1170:                }
1171:
1172:                public String toString() {
1173:                    return name;
1174:                }
1175:
1176:            }
1177:
1178:            /**
1179:             * An implementation of AbstractResource that doesn't log.
1180:             */
1181:            private final class NonLoggingResource extends AbstractResource {
1182:
1183:                /**
1184:                 * Constructs the resource.
1185:                 */
1186:                NonLoggingResource(String name, long id, StoreDataAccessor data) {
1187:                    super (name, id, data);
1188:                }
1189:
1190:                // ---------- Persist methods ----------
1191:
1192:                void persistClose() throws IOException {
1193:                    // No-op
1194:                }
1195:
1196:                public void persistDelete() throws IOException {
1197:                    // No-op
1198:                }
1199:
1200:                public void persistSetSize(final long new_size)
1201:                        throws IOException {
1202:                    // No-op
1203:                }
1204:
1205:                public void persistPageChange(final long page, final int off,
1206:                        int len, DataInputStream din) throws IOException {
1207:                    // No-op
1208:                }
1209:
1210:                public void synch() throws IOException {
1211:                    data.synch();
1212:                }
1213:
1214:                public void notifyPostRecover() {
1215:                    // No-op
1216:                }
1217:
1218:                // ----------
1219:
1220:                /**
1221:                 * Opens the resource.
1222:                 */
1223:                public void open(boolean read_only) throws IOException {
1224:                    this .read_only = read_only;
1225:                    data.open(read_only);
1226:                }
1227:
1228:                /**
1229:                 * Reads a page from the resource.
1230:                 */
1231:                public void read(final long page_number, final byte[] buf,
1232:                        final int off) throws IOException {
1233:                    // Read the data.
1234:                    long page_position = page_number * page_size;
1235:                    data.read(page_position + off, buf, off, page_size);
1236:                }
1237:
1238:                /**
1239:                 * Writes a page of some previously specified size.
1240:                 */
1241:                public void write(final long page_number, byte[] buf, int off,
1242:                        int len) throws IOException {
1243:                    long page_position = page_number * page_size;
1244:                    data.write(page_position + off, buf, off, len);
1245:                }
1246:
1247:                /**
1248:                 * Sets the size of the resource.
1249:                 */
1250:                public void setSize(long size) throws IOException {
1251:                    data.setSize(size);
1252:                }
1253:
1254:                /**
1255:                 * Returns the size of this resource.
1256:                 */
1257:                public long getSize() throws IOException {
1258:                    return data.getSize();
1259:                }
1260:
1261:                /**
1262:                 * Closes the resource.
1263:                 */
1264:                public void close() throws IOException {
1265:                    data.close();
1266:                }
1267:
1268:                /**
1269:                 * Deletes the resource.
1270:                 */
1271:                public void delete() throws IOException {
1272:                    data.delete();
1273:                }
1274:
1275:                /**
1276:                 * Returns true if the resource currently exists.
1277:                 */
1278:                public boolean exists() {
1279:                    return data.exists();
1280:                }
1281:
1282:            }
1283:
1284:            /**
1285:             * Represents a resource in this system.  A resource is backed by a
1286:             * StoreDataAccessor and may have one or more modifications to it in the
1287:             * journal.
1288:             */
1289:            private final class Resource extends AbstractResource {
1290:
1291:                /**
1292:                 * The size of the resource.
1293:                 */
1294:                private long size;
1295:
1296:                /**
1297:                 * True if there is actually data to be read in the above object.
1298:                 */
1299:                private boolean there_is_backing_data;
1300:
1301:                /**
1302:                 * True if the underlying resource is really open.
1303:                 */
1304:                private boolean really_open;
1305:
1306:                /**
1307:                 * True if the data store exists.
1308:                 */
1309:                private boolean data_exists;
1310:
1311:                /**
1312:                 * True if the data resource is open.
1313:                 */
1314:                private boolean data_open;
1315:
1316:                /**
1317:                 * True if the data resource was deleted.
1318:                 */
1319:                private boolean data_deleted;
1320:
1321:                /**
1322:                 * The hash of all journal entries on this resource (JournalEntry).
1323:                 */
1324:                private final JournalEntry[] journal_map;
1325:
1326:                /**
1327:                 * A temporary buffer the size of a page.
1328:                 */
1329:                private final byte[] page_buffer;
1330:
1331:                /**
1332:                 * Constructs the resource.
1333:                 */
1334:                Resource(String name, long id, StoreDataAccessor data) {
1335:                    super (name, id, data);
1336:                    journal_map = new JournalEntry[257];
1337:                    data_open = false;
1338:                    data_exists = data.exists();
1339:                    data_deleted = false;
1340:                    if (data_exists) {
1341:                        try {
1342:                            size = data.getSize();
1343:                            //          System.out.println("Setting size of " + name + " to " + size);
1344:                        } catch (IOException e) {
1345:                            throw new Error("Error getting size of resource: "
1346:                                    + e.getMessage());
1347:                        }
1348:                    }
1349:                    really_open = false;
1350:                    page_buffer = new byte[page_size];
1351:                }
1352:
1353:                // ---------- Persist methods ----------
1354:
1355:                private void persistOpen(boolean read_only) throws IOException {
1356:                    //      System.out.println(name + " Open");
1357:                    if (!really_open) {
1358:                        data.open(read_only);
1359:                        there_is_backing_data = true;
1360:                        really_open = true;
1361:                    }
1362:                }
1363:
1364:                void persistClose() throws IOException {
1365:                    //      System.out.println(name + " Close");
1366:                    if (really_open) {
1367:                        // When we close we reset the size attribute.  We do this because of
1368:                        // the roll forward recovery.
1369:                        size = data.getSize();
1370:                        data.synch();
1371:                        data.close();
1372:                        really_open = false;
1373:                    }
1374:                }
1375:
1376:                public void persistDelete() throws IOException {
1377:                    //      System.out.println(name + " Delete");
1378:                    // If open then close
1379:                    if (really_open) {
1380:                        persistClose();
1381:                    }
1382:                    data.delete();
1383:                    there_is_backing_data = false;
1384:                }
1385:
1386:                public void persistSetSize(final long new_size)
1387:                        throws IOException {
1388:                    //      System.out.println(name + " Set Size " + size);
1389:                    // If not open then open.
1390:                    if (!really_open) {
1391:                        persistOpen(false);
1392:                    }
1393:                    // Don't let us set a size that's smaller than the current size.
1394:                    if (new_size > data.getSize()) {
1395:                        data.setSize(new_size);
1396:                    }
1397:                }
1398:
1399:                public void persistPageChange(final long page, final int off,
1400:                        int len, DataInputStream din) throws IOException {
1401:                    if (!really_open) {
1402:                        persistOpen(false);
1403:                    }
1404:
1405:                    // Buffer to read the page content into
1406:                    byte[] buf;
1407:                    if (len <= page_buffer.length) {
1408:                        // If length is smaller or equal to the size of a page then use the
1409:                        // local page buffer.
1410:                        buf = page_buffer;
1411:                    } else {
1412:                        // Otherwise create a new buffer of the required size (this may happen
1413:                        // if the page size changes between sessions).
1414:                        buf = new byte[len];
1415:                    }
1416:
1417:                    // Read the change from the input stream
1418:                    din.readFully(buf, 0, len);
1419:                    // Write the change out to the underlying resource container
1420:                    long pos = page * 8192; //page_size;
1421:                    data.write(pos + off, buf, 0, len);
1422:                }
1423:
1424:                public void synch() throws IOException {
1425:                    if (really_open) {
1426:                        data.synch();
1427:                    }
1428:                }
1429:
1430:                public void notifyPostRecover() {
1431:                    data_exists = data.exists();
1432:                }
1433:
1434:                // ----------
1435:
1436:                /**
1437:                 * Opens the resource.  This method will check if the resource exists.  If
1438:                 * it doesn't exist the 'read' method will return just the journal
1439:                 * modifications of a page.  If it does exist it opens the resource and uses
1440:                 * that as the backing to any 'read' operations.
1441:                 */
1442:                public void open(boolean read_only) throws IOException {
1443:                    this .read_only = read_only;
1444:
1445:                    if (!data_deleted && data.exists()) {
1446:                        // It does exist so open it.
1447:                        persistOpen(read_only);
1448:                    } else {
1449:                        there_is_backing_data = false;
1450:                        data_deleted = false;
1451:                    }
1452:                    data_open = true;
1453:                    data_exists = true;
1454:                }
1455:
1456:                /**
1457:                 * Reads a page from the resource.  This method reconstructs the page
1458:                 * from the underlying data, and from any journal entries.  This should
1459:                 * read the data to be put into a buffer in memory.
1460:                 */
1461:                public void read(final long page_number, final byte[] buf,
1462:                        final int off) throws IOException {
1463:
1464:                    synchronized (journal_map) {
1465:                        if (!data_open) {
1466:                            throw new IOException(
1467:                                    "Assertion failed: Data file is not open.");
1468:                        }
1469:                    }
1470:
1471:                    // The list of all journal entries on this page number
1472:                    final ArrayList all_journal_entries = new ArrayList(4);
1473:                    try {
1474:                        // The map index.
1475:                        synchronized (journal_map) {
1476:                            int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1477:                            JournalEntry entry = (JournalEntry) journal_map[i];
1478:                            JournalEntry prev = null;
1479:
1480:                            while (entry != null) {
1481:                                boolean deleted_hash = false;
1482:
1483:                                JournalFile file = entry.getJournalFile();
1484:                                // Note that once we have a reference the journal file can not be
1485:                                // deleted.
1486:                                file.addReference();
1487:
1488:                                // If the file is closed (or deleted)
1489:                                if (file.isDeleted()) {
1490:                                    deleted_hash = true;
1491:                                    // Deleted so remove the reference to the journal
1492:                                    file.removeReference();
1493:                                    // Remove the journal entry from the chain.
1494:                                    if (prev == null) {
1495:                                        journal_map[i] = entry.next_page;
1496:                                    } else {
1497:                                        prev.next_page = entry.next_page;
1498:                                    }
1499:                                }
1500:                                // Else if not closed then is this entry the page number?
1501:                                else if (entry.getPageNumber() == page_number) {
1502:                                    all_journal_entries.add(entry);
1503:                                } else {
1504:                                    // Not the page we are looking for so remove the reference to the
1505:                                    // file.
1506:                                    file.removeReference();
1507:                                }
1508:
1509:                                // Only move prev is we have NOT deleted a hash entry
1510:                                if (!deleted_hash) {
1511:                                    prev = entry;
1512:                                }
1513:                                entry = entry.next_page;
1514:                            }
1515:                        }
1516:
1517:                        // Read any data from the underlying file
1518:                        if (there_is_backing_data) {
1519:                            long page_position = page_number * page_size;
1520:                            // First read the page from the underlying store.
1521:                            data.read(page_position, buf, off, page_size);
1522:                        } else {
1523:                            // Clear the buffer
1524:                            for (int i = off; i < (page_size + off); ++i) {
1525:                                buf[i] = 0;
1526:                            }
1527:                        }
1528:
1529:                        // Rebuild from the journal file(s)
1530:                        final int sz = all_journal_entries.size();
1531:                        for (int i = 0; i < sz; ++i) {
1532:                            JournalEntry entry = (JournalEntry) all_journal_entries
1533:                                    .get(i);
1534:                            JournalFile file = entry.getJournalFile();
1535:                            final long position = entry.getPosition();
1536:                            synchronized (file) {
1537:                                file.buildPage(page_number, position, buf, off);
1538:                            }
1539:                        }
1540:
1541:                    } finally {
1542:
1543:                        // Make sure we remove the reference for all the journal files.
1544:                        final int sz = all_journal_entries.size();
1545:                        for (int i = 0; i < sz; ++i) {
1546:                            JournalEntry entry = (JournalEntry) all_journal_entries
1547:                                    .get(i);
1548:                            JournalFile file = entry.getJournalFile();
1549:                            file.removeReference();
1550:                        }
1551:
1552:                    }
1553:
1554:                }
1555:
1556:                /**
1557:                 * Writes a page of some previously specified size to the top log.  This
1558:                 * will add a single entry to the log and any 'read' operations after will
1559:                 * contain the written data.
1560:                 */
1561:                public void write(final long page_number, byte[] buf, int off,
1562:                        int len) throws IOException {
1563:
1564:                    synchronized (journal_map) {
1565:                        if (!data_open) {
1566:                            throw new IOException(
1567:                                    "Assertion failed: Data file is not open.");
1568:                        }
1569:
1570:                        // Make this modification in the log
1571:                        JournalEntry journal;
1572:                        synchronized (top_journal_lock) {
1573:                            journal = topJournal().logPageModification(name,
1574:                                    page_number, buf, off, len);
1575:                        }
1576:
1577:                        // This adds the modification to the END of the hash list.  This means
1578:                        // when we reconstruct the page the journals will always be in the
1579:                        // correct order - from oldest to newest.
1580:
1581:                        // The map index.
1582:                        int i = ((int) (page_number & 0x0FFFFFFF) % journal_map.length);
1583:                        JournalEntry entry = (JournalEntry) journal_map[i];
1584:                        // Make sure this entry is added to the END
1585:                        if (entry == null) {
1586:                            // Add at the head if no first entry
1587:                            journal_map[i] = journal;
1588:                            journal.next_page = null;
1589:                        } else {
1590:                            // Otherwise search to the end
1591:                            // The number of journal entries in the linked list
1592:                            int journal_entry_count = 0;
1593:                            while (entry.next_page != null) {
1594:                                entry = entry.next_page;
1595:                                ++journal_entry_count;
1596:                            }
1597:                            // and add to the end
1598:                            entry.next_page = journal;
1599:                            journal.next_page = null;
1600:
1601:                            // If there are over 35 journal entries, scan and remove all entries
1602:                            // on journals that have persisted
1603:                            if (journal_entry_count > 35) {
1604:                                int entries_cleaned = 0;
1605:                                entry = (JournalEntry) journal_map[i];
1606:                                JournalEntry prev = null;
1607:
1608:                                while (entry != null) {
1609:                                    boolean deleted_hash = false;
1610:
1611:                                    JournalFile file = entry.getJournalFile();
1612:                                    // Note that once we have a reference the journal file can not be
1613:                                    // deleted.
1614:                                    file.addReference();
1615:
1616:                                    // If the file is closed (or deleted)
1617:                                    if (file.isDeleted()) {
1618:                                        deleted_hash = true;
1619:                                        // Deleted so remove the reference to the journal
1620:                                        file.removeReference();
1621:                                        // Remove the journal entry from the chain.
1622:                                        if (prev == null) {
1623:                                            journal_map[i] = entry.next_page;
1624:                                        } else {
1625:                                            prev.next_page = entry.next_page;
1626:                                        }
1627:                                        ++entries_cleaned;
1628:                                    }
1629:                                    // Remove the reference
1630:                                    file.removeReference();
1631:
1632:                                    // Only move prev is we have NOT deleted a hash entry
1633:                                    if (!deleted_hash) {
1634:                                        prev = entry;
1635:                                    }
1636:                                    entry = entry.next_page;
1637:                                }
1638:
1639:                            }
1640:                        }
1641:                    }
1642:
1643:                }
1644:
1645:                /**
1646:                 * Sets the size of the resource.
1647:                 */
1648:                public void setSize(long size) throws IOException {
1649:                    synchronized (journal_map) {
1650:                        this .size = size;
1651:                    }
1652:                    synchronized (top_journal_lock) {
1653:                        topJournal().logResourceSizeChange(name, size);
1654:                    }
1655:                }
1656:
1657:                /**
1658:                 * Returns the size of this resource.
1659:                 */
1660:                public long getSize() throws IOException {
1661:                    synchronized (journal_map) {
1662:                        return this .size;
1663:                    }
1664:                }
1665:
1666:                /**
1667:                 * Closes the resource.  This will actually simply log that the resource
1668:                 * has been closed.
1669:                 */
1670:                public void close() throws IOException {
1671:                    synchronized (journal_map) {
1672:                        data_open = false;
1673:                    }
1674:                }
1675:
1676:                /**
1677:                 * Deletes the resource.  This will actually simply log that the resource
1678:                 * has been deleted.
1679:                 */
1680:                public void delete() throws IOException {
1681:                    // Log that this resource was deleted.
1682:                    synchronized (top_journal_lock) {
1683:                        topJournal().logResourceDelete(name);
1684:                    }
1685:                    synchronized (journal_map) {
1686:                        data_exists = false;
1687:                        data_deleted = true;
1688:                        size = 0;
1689:                    }
1690:                }
1691:
1692:                /**
1693:                 * Returns true if the resource currently exists.
1694:                 */
1695:                public boolean exists() {
1696:                    return data_exists;
1697:                }
1698:
1699:            }
1700:
1701:            /**
1702:             * Summary information about a journal.
1703:             */
1704:            private static class JournalSummary {
1705:
1706:                /**
1707:                 * The JournalFile object that is a summary of.
1708:                 */
1709:                JournalFile journal_file;
1710:
1711:                /**
1712:                 * True if the journal is recoverable (has one or more complete check
1713:                 * points available).
1714:                 */
1715:                boolean can_be_recovered = false;
1716:
1717:                /**
1718:                 * The position of the last checkpoint in the journal.
1719:                 */
1720:                long last_checkpoint;
1721:
1722:                /**
1723:                 * The list of all resource names that this journal 'touches'.
1724:                 */
1725:                ArrayList resource_list = new ArrayList();
1726:
1727:                /**
1728:                 * Constructor.
1729:                 */
1730:                public JournalSummary(JournalFile journal_file) {
1731:                    this .journal_file = journal_file;
1732:                }
1733:
1734:            }
1735:
1736:            /**
1737:             * Thread that persists the journal in the backgroudn.
1738:             */
1739:            private class JournalingThread extends Thread {
1740:
1741:                private boolean finished = false;
1742:                private boolean actually_finished;
1743:
1744:                /**
1745:                 * Constructor.
1746:                 */
1747:                JournalingThread() {
1748:                    setName("Mckoi - Background Journaling");
1749:                    // This is a daemon thread.  it should be safe if this thread
1750:                    // dies at any time.
1751:                    setDaemon(true);
1752:                }
1753:
1754:                public void run() {
1755:                    boolean local_finished = false;
1756:
1757:                    while (!local_finished) {
1758:
1759:                        ArrayList to_process = null;
1760:                        synchronized (top_journal_lock) {
1761:                            if (journal_archives.size() > 0) {
1762:                                to_process = new ArrayList();
1763:                                to_process.addAll(journal_archives);
1764:                            }
1765:                        }
1766:
1767:                        if (to_process == null) {
1768:                            // Nothing to process so wait
1769:                            synchronized (this ) {
1770:                                if (!finished) {
1771:                                    try {
1772:                                        wait();
1773:                                    } catch (InterruptedException e) { /* ignore */
1774:                                    }
1775:                                }
1776:                            }
1777:
1778:                        } else if (to_process.size() > 0) {
1779:                            // Something to process, so go ahead and process the journals,
1780:                            int sz = to_process.size();
1781:                            // For all journals
1782:                            for (int i = 0; i < sz; ++i) {
1783:                                // Pick the lowest journal to persist
1784:                                JournalFile jf = (JournalFile) to_process
1785:                                        .get(i);
1786:                                try {
1787:                                    // Persist the journal
1788:                                    jf.persist(8, jf.size());
1789:                                    // Close and then delete the journal file
1790:                                    jf.closeAndDelete();
1791:                                } catch (IOException e) {
1792:                                    debug.write(Lvl.ERROR, this ,
1793:                                            "Error persisting journal: " + jf);
1794:                                    debug.writeException(Lvl.ERROR, e);
1795:                                    // If there is an error persisting the best thing to do is
1796:                                    // finish
1797:                                    synchronized (this ) {
1798:                                        finished = true;
1799:                                    }
1800:                                }
1801:                            }
1802:                        }
1803:
1804:                        synchronized (this ) {
1805:                            local_finished = finished;
1806:                            // Remove the journals that we have just persisted.
1807:                            if (to_process != null) {
1808:                                synchronized (top_journal_lock) {
1809:                                    int sz = to_process.size();
1810:                                    for (int i = 0; i < sz; ++i) {
1811:                                        journal_archives.remove(0);
1812:                                    }
1813:                                }
1814:                            }
1815:                            // Notify any threads waiting
1816:                            notifyAll();
1817:                        }
1818:
1819:                    }
1820:
1821:                    synchronized (this ) {
1822:                        actually_finished = true;
1823:                        notifyAll();
1824:                    }
1825:                }
1826:
1827:                public synchronized void finish() {
1828:                    finished = true;
1829:                    notifyAll();
1830:                }
1831:
1832:                public synchronized void waitUntilFinished() {
1833:                    try {
1834:                        while (!actually_finished) {
1835:                            wait();
1836:                        }
1837:                    } catch (InterruptedException e) {
1838:                        throw new Error("Interrupted: " + e.getMessage());
1839:                    }
1840:                }
1841:
1842:                /**
1843:                 * Persists the journal_archives list until the list is at least the
1844:                 * given size.
1845:                 */
1846:                public synchronized void persistArchives(int until_size) {
1847:                    notifyAll();
1848:                    int sz;
1849:                    synchronized (top_journal_lock) {
1850:                        sz = journal_archives.size();
1851:                    }
1852:                    // Wait until the sz is smaller than 'until_size'
1853:                    while (sz > until_size) {
1854:                        try {
1855:                            wait();
1856:                        } catch (InterruptedException e) { /* ignore */
1857:                        }
1858:
1859:                        synchronized (top_journal_lock) {
1860:                            sz = journal_archives.size();
1861:                        }
1862:                    }
1863:                }
1864:
1865:            }
1866:
1867:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.