001: /*-
002: * See the file LICENSE for redistribution information.
003: *
004: * Copyright (c) 2002,2008 Oracle. All rights reserved.
005: *
006: * $Id: FileReader.java,v 1.99.2.8 2008/01/11 22:02:51 cwl Exp $
007: */
008:
009: package com.sleepycat.je.log;
010:
011: import java.io.IOException;
012: import java.nio.Buffer;
013: import java.nio.ByteBuffer;
014:
015: import com.sleepycat.je.DatabaseException;
016: import com.sleepycat.je.config.EnvironmentParams;
017: import com.sleepycat.je.dbi.DbConfigManager;
018: import com.sleepycat.je.dbi.EnvironmentImpl;
019: import com.sleepycat.je.log.entry.LogEntry;
020: import com.sleepycat.je.utilint.DbLsn;
021: import com.sleepycat.je.utilint.Tracer;
022:
023: /**
024: * A FileReader is an abstract class that traverses the log files, reading in
025: * chunks of the file at a time. Concrete subclasses perform a particular
026: * action to each entry.
027: */
028: public abstract class FileReader {
029:
030: protected EnvironmentImpl envImpl;
031: protected FileManager fileManager;
032:
033: /* Buffering reads */
034: private ByteBuffer readBuffer; // buffer for reading from the file
035: private ByteBuffer saveBuffer; // for piecing together data
036: private int maxReadBufferSize; // read buffer can't grow larger than this
037:
038: /* Managing the buffer reads */
039: private boolean singleFile; // if true, do not read across files
040: protected boolean eof; // true if at end of the log.
041: // XXX, use exception instead of status?
042: private boolean forward; // if true, we're reading forward
043:
044: /*
045: * ReadBufferFileNum, readBufferFileStart and readBufferFileEnd indicate
046: * how the read buffer maps to the file. For example, if the read buffer
047: * size is 100 and the read buffer was filled from file 9, starting at byte
048: * 100, then
049: * readBufferFileNum = 9
050: * readBufferFileStart = 100
051: * readBufferFileEnd = 200
052: */
053: protected long readBufferFileNum; // file number we're pointing to
054: protected long readBufferFileStart;// file position that maps to buf start
055: protected long readBufferFileEnd; // file position that maps to buf end
056:
057: /* stats */
058: private int nRead; // num entries we've seen
059:
060: /*
061: * The number of times we've tried to read in a log entry that was too
062: * large for the read buffer.
063: */
064: private long nRepeatIteratorReads;
065:
066: /* Number of reads since the last time getAndResetNReads was called. */
067: private int nReadOperations;
068:
069: /* The log entry header for the entry that was just read. */
070: protected LogEntryHeader currentEntryHeader;
071:
072: /*
073: * In general, currentEntryPrevOffset is the same as
074: * currentEntryHeader.getPrevOffset(), but it's initialized and used before
075: * a header is read.
076: */
077: protected long currentEntryPrevOffset;
078:
079: /*
080: * nextEntryOffset is used to set the currentEntryOffset after we've read
081: * an entry.
082: */
083: protected long currentEntryOffset;
084: protected long nextEntryOffset;
085: protected long startLsn; // We start reading from this LSN.
086: private long finishLsn; // If going backwards, read up to this LSN.
087:
088: /* For checking checksum on the read. */
089: protected ChecksumValidator cksumValidator;
090: private boolean doValidateChecksum; // Validate checksums
091: private boolean alwaysValidateChecksum; // Validate for all entry types
092:
093: /* True if this is the scavenger and we are expecting checksum issues. */
094: protected boolean anticipateChecksumErrors;
095:
096: /**
097: * A FileReader just needs to know what size chunks to read in.
098: * @param endOfFileLsn indicates the end of the log file
099: */
100: public FileReader(EnvironmentImpl envImpl, int readBufferSize,
101: boolean forward, long startLsn, Long singleFileNumber,
102: long endOfFileLsn, long finishLsn) throws IOException,
103: DatabaseException {
104:
105: this .envImpl = envImpl;
106: this .fileManager = envImpl.getFileManager();
107: this .doValidateChecksum = envImpl.getLogManager()
108: .getChecksumOnRead();
109:
110: /* Allocate a read buffer. */
111: this .singleFile = (singleFileNumber != null);
112: this .forward = forward;
113:
114: readBuffer = ByteBuffer.allocate(readBufferSize);
115: threadSafeBufferFlip(readBuffer);
116: saveBuffer = ByteBuffer.allocate(readBufferSize);
117:
118: DbConfigManager configManager = envImpl.getConfigManager();
119: maxReadBufferSize = configManager
120: .getInt(EnvironmentParams.LOG_ITERATOR_MAX_SIZE);
121:
122: /* Determine the starting position. */
123: this .startLsn = startLsn;
124: this .finishLsn = finishLsn;
125: initStartingPosition(endOfFileLsn, singleFileNumber);
126:
127: /* stats */
128: nRead = 0;
129: if (doValidateChecksum) {
130: cksumValidator = new ChecksumValidator();
131: }
132: anticipateChecksumErrors = false;
133: }
134:
135: /**
136: * Helper for determining the starting position and opening up a file at
137: * the desired location.
138: */
139: protected void initStartingPosition(long endOfFileLsn,
140: Long ignoreSingleFileNumber) throws IOException,
141: DatabaseException {
142:
143: eof = false;
144: if (forward) {
145:
146: /*
147: * Start off at the startLsn. If that's null, start at the
148: * beginning of the log. If there are no log files, set eof.
149: */
150: if (startLsn != DbLsn.NULL_LSN) {
151: readBufferFileNum = DbLsn.getFileNumber(startLsn);
152: readBufferFileEnd = DbLsn.getFileOffset(startLsn);
153: } else {
154: Long firstNum = fileManager.getFirstFileNum();
155: if (firstNum == null) {
156: eof = true;
157: } else {
158: readBufferFileNum = firstNum.longValue();
159: readBufferFileEnd = 0;
160: }
161: }
162:
163: /*
164: * After we read the first entry, the currentEntry will point here.
165: */
166: nextEntryOffset = readBufferFileEnd;
167: } else {
168:
169: /*
170: * Make the read buffer look like it's positioned off the end of
171: * the file. Initialize the first LSN we want to read. When
172: * traversing the log backwards, we always start at the very end.
173: */
174: assert startLsn != DbLsn.NULL_LSN;
175: readBufferFileNum = DbLsn.getFileNumber(endOfFileLsn);
176: readBufferFileStart = DbLsn.getFileOffset(endOfFileLsn);
177: readBufferFileEnd = readBufferFileStart;
178:
179: /*
180: * currentEntryPrevOffset points to the entry we want to start out
181: * reading when going backwards. If it's 0, the entry we want to
182: * read is in a different file.
183: */
184: if (DbLsn.getFileNumber(startLsn) == DbLsn
185: .getFileNumber(endOfFileLsn)) {
186: currentEntryPrevOffset = DbLsn.getFileOffset(startLsn);
187: } else {
188: currentEntryPrevOffset = 0;
189: }
190: currentEntryOffset = DbLsn.getFileOffset(endOfFileLsn);
191: }
192: }
193:
194: /**
195: * Whether to always validate the checksum, even for non-target entries.
196: */
197: public void setAlwaysValidateChecksum(boolean validate) {
198: alwaysValidateChecksum = validate;
199: }
200:
201: /**
202: * @return the number of entries processed by this reader.
203: */
204: public int getNumRead() {
205: return nRead;
206: }
207:
208: public long getNRepeatIteratorReads() {
209: return nRepeatIteratorReads;
210: }
211:
212: /**
213: * Get LSN of the last entry read.
214: */
215: public long getLastLsn() {
216: return DbLsn.makeLsn(readBufferFileNum, currentEntryOffset);
217: }
218:
219: /**
220: * Returns the total size (including header) of the last entry read.
221: */
222: public int getLastEntrySize() {
223: return currentEntryHeader.getSize()
224: + currentEntryHeader.getItemSize();
225: }
226:
227: /**
228: * A bottleneck for all calls to LogEntry.readEntry. This method ensures
229: * that setLastLogSize is called after LogEntry.readEntry, and should be
230: * called by all FileReaders instead of calling LogEntry.readEntry
231: * directly.
232: */
233: void readEntry(LogEntry entry, ByteBuffer buffer,
234: boolean readFullItem) throws DatabaseException {
235:
236: entry.readEntry(currentEntryHeader, buffer, readFullItem);
237:
238: /*
239: * Some entries (LNs) save the last logged size. However, we can only
240: * set the size if we have read the full item; if the full item is not
241: * read, the LN in the LNLogEntry may be null.
242: */
243: if (readFullItem) {
244: entry.setLastLoggedSize(getLastEntrySize());
245: }
246: }
247:
248: /**
249: * readNextEntry scans the log files until either it's reached the end of
250: * the log or has hit an invalid portion. It then returns false.
251: *
252: * @return true if an element has been read
253: */
254: public boolean readNextEntry() throws DatabaseException,
255: IOException {
256:
257: boolean foundEntry = false;
258: try {
259: while ((!eof) && (!foundEntry)) {
260:
261: /* Read the next header. */
262: getLogEntryInReadBuffer();
263: ByteBuffer dataBuffer = readData(
264: LogEntryHeader.MIN_HEADER_SIZE, true);
265:
266: readBasicHeader(dataBuffer);
267: if (currentEntryHeader.getReplicate()) {
268: dataBuffer = readData(currentEntryHeader
269: .getVariablePortionSize(), true);
270: currentEntryHeader.readVariablePortion(dataBuffer);
271: }
272:
273: boolean isTargetEntry = isTargetEntry(
274: currentEntryHeader.getType(),
275: currentEntryHeader.getVersion());
276: boolean doValidate = doValidateChecksum
277: && (isTargetEntry || alwaysValidateChecksum);
278: boolean collectData = doValidate || isTargetEntry;
279:
280: /* Initialize the checksum with the header. */
281: if (doValidate) {
282: startChecksum(dataBuffer);
283: }
284:
285: /*
286: * Read in the body of the next entry. Note that even if this
287: * isn't a targeted entry, we have to move the buffer position
288: * along.
289: */
290: dataBuffer = readData(currentEntryHeader.getItemSize(),
291: collectData);
292:
293: /*
294: * We've read an entry. Move up our offsets if we're moving
295: * forward. If we're moving backwards, we set our offset before
296: * we read the header, because we knew where the entry started.
297: */
298: if (forward) {
299: currentEntryOffset = nextEntryOffset;
300: nextEntryOffset += currentEntryHeader.getSize() + // header size
301: currentEntryHeader.getItemSize(); // item size
302: }
303:
304: /* Validate the log entry checksum. */
305: if (doValidate) {
306: validateChecksum(dataBuffer);
307: }
308:
309: if (isTargetEntry) {
310:
311: /*
312: * For a target entry, call the subclass reader's
313: * processEntry method to do whatever we need with the
314: * entry. It returns true if this entry is one that should
315: * be returned. Note that some entries, although targetted
316: * and read, are not returned.
317: */
318: if (processEntry(dataBuffer)) {
319: foundEntry = true;
320: nRead++;
321: }
322: } else if (collectData) {
323:
324: /*
325: * For a non-target entry that was validated, the buffer is
326: * positioned at the start of the entry; skip over it.
327: */
328: threadSafeBufferPosition(dataBuffer,
329: threadSafeBufferPosition(dataBuffer)
330: + currentEntryHeader.getItemSize());
331: }
332: }
333: } catch (EOFException e) {
334: eof = true;
335: } catch (DatabaseException e) {
336: eof = true;
337: /* Report on error. */
338: if (currentEntryHeader != null) {
339: LogEntryType problemType = LogEntryType.findType(
340: currentEntryHeader.getType(),
341: currentEntryHeader.getVersion());
342: Tracer
343: .trace(
344: envImpl,
345: "FileReader",
346: "readNextEntry",
347: "Halted log file reading at file 0x"
348: + Long
349: .toHexString(readBufferFileNum)
350: + " offset 0x"
351: + Long
352: .toHexString(nextEntryOffset)
353: + " offset(decimal)="
354: + nextEntryOffset
355: + ":\nentry="
356: + problemType
357: + "(typeNum="
358: + currentEntryHeader.getType()
359: + ",version="
360: + currentEntryHeader
361: .getVersion()
362: + ")\nprev=0x"
363: + Long
364: .toHexString(currentEntryPrevOffset)
365: + "\nsize="
366: + currentEntryHeader
367: .getItemSize()
368: + "\nNext entry should be at 0x"
369: + Long
370: .toHexString((nextEntryOffset
371: + currentEntryHeader
372: .getSize() + currentEntryHeader
373: .getItemSize()))
374: + "\n:", e);
375: } else {
376: Tracer
377: .trace(
378: envImpl,
379: "FileReader",
380: "readNextEntry",
381: "Halted log file reading at file 0x"
382: + Long
383: .toHexString(readBufferFileNum)
384: + " offset 0x"
385: + Long
386: .toHexString(nextEntryOffset)
387: + " offset(decimal)="
388: + nextEntryOffset
389: + " prev=0x"
390: + Long
391: .toHexString(currentEntryPrevOffset),
392: e);
393: }
394: throw e;
395: }
396: return foundEntry;
397: }
398:
399: protected boolean resyncReader(long nextGoodRecordPostCorruption,
400: boolean dumpCorruptedBounds) throws DatabaseException,
401: IOException {
402:
403: /* Resync not allowed for straight FileReader runs. */
404: return false;
405: }
406:
407: /**
408: * Make sure that the start of the target log entry is in the header. This
409: * is a no-op if we're reading forwards
410: */
411: private void getLogEntryInReadBuffer() throws IOException,
412: DatabaseException, EOFException {
413:
414: /*
415: * If we're going forward, because we read every byte sequentially,
416: * we're always sure the read buffer is positioned at the right spot.
417: * If we go backwards, we need to jump the buffer position.
418: */
419: if (!forward) {
420:
421: /*
422: * currentEntryPrevOffset is the entry before the current entry.
423: * currentEntryOffset is the entry we just read (or the end of the
424: * file if we're starting out.
425: */
426: if ((currentEntryPrevOffset != 0)
427: && (currentEntryPrevOffset >= readBufferFileStart)) {
428:
429: /* The next log entry has passed the start LSN. */
430: long nextLsn = DbLsn.makeLsn(readBufferFileNum,
431: currentEntryPrevOffset);
432: if (finishLsn != DbLsn.NULL_LSN) {
433: if (DbLsn.compareTo(nextLsn, finishLsn) == -1) {
434: throw new EOFException();
435: }
436: }
437:
438: /* This log entry starts in this buffer, just reposition. */
439: threadSafeBufferPosition(
440: readBuffer,
441: (int) (currentEntryPrevOffset - readBufferFileStart));
442: } else {
443:
444: /*
445: * If the start of the log entry is not in this read buffer,
446: * fill the buffer again. If the target log entry is in a
447: * different file from the current read buffer file, just start
448: * the read from the target LSN. If the target log entry is the
449: * same file but the log entry is larger than the read chunk
450: * size, also start the next read buffer from the target
451: * LSN. Otherwise, try to position the next buffer chunk so the
452: * target entry is held within the buffer, all the way at the
453: * end.
454: */
455: if (currentEntryPrevOffset == 0) {
456: /* Go to another file. */
457: currentEntryPrevOffset = fileManager
458: .getFileHeaderPrevOffset(readBufferFileNum);
459: Long prevFileNum = fileManager.getFollowingFileNum(
460: readBufferFileNum, false);
461: if (prevFileNum == null) {
462: throw new EOFException();
463: }
464: if (readBufferFileNum - prevFileNum.longValue() != 1) {
465:
466: if (!resyncReader(DbLsn.makeLsn(prevFileNum
467: .longValue(), DbLsn.MAX_FILE_OFFSET),
468: false)) {
469:
470: throw new DatabaseException(
471: "Cannot read backward over cleaned file"
472: + " from "
473: + readBufferFileNum
474: + " to " + prevFileNum);
475: }
476: }
477: readBufferFileNum = prevFileNum.longValue();
478: readBufferFileStart = currentEntryPrevOffset;
479: } else if ((currentEntryOffset - currentEntryPrevOffset) > readBuffer
480: .capacity()) {
481:
482: /*
483: * The entry is in the same file, but is bigger than one
484: * buffer.
485: */
486: readBufferFileStart = currentEntryPrevOffset;
487: } else {
488:
489: /* In same file, but not in this buffer. */
490: long newPosition = currentEntryOffset
491: - readBuffer.capacity();
492: readBufferFileStart = (newPosition < 0) ? 0
493: : newPosition;
494: }
495:
496: /* The next log entry has passed the start LSN. */
497: long nextLsn = DbLsn.makeLsn(readBufferFileNum,
498: currentEntryPrevOffset);
499: if (finishLsn != DbLsn.NULL_LSN) {
500: if (DbLsn.compareTo(nextLsn, finishLsn) == -1) {
501: throw new EOFException();
502: }
503: }
504:
505: /*
506: * Now that we've set readBufferFileNum and
507: * readBufferFileStart, do the read.
508: */
509: FileHandle fileHandle = fileManager
510: .getFileHandle(readBufferFileNum);
511: try {
512: readBuffer.clear();
513: fileManager.readFromFile(fileHandle.getFile(),
514: readBuffer, readBufferFileStart);
515: nReadOperations += 1;
516:
517: assert EnvironmentImpl.maybeForceYield();
518: } finally {
519: fileHandle.release();
520: }
521: readBufferFileEnd = readBufferFileStart
522: + threadSafeBufferPosition(readBuffer);
523: threadSafeBufferFlip(readBuffer);
524: threadSafeBufferPosition(
525: readBuffer,
526: (int) (currentEntryPrevOffset - readBufferFileStart));
527: }
528:
529: /* The current entry will start at this offset. */
530: currentEntryOffset = currentEntryPrevOffset;
531: } else {
532:
533: /*
534: * Going forward, and an end point has been specified. Check if
535: * we've gone past.
536: */
537: if (finishLsn != DbLsn.NULL_LSN) {
538: /* The next log entry has passed the end LSN. */
539: long nextLsn = DbLsn.makeLsn(readBufferFileNum,
540: nextEntryOffset);
541: if (DbLsn.compareTo(nextLsn, finishLsn) >= 0) {
542: throw new EOFException();
543: }
544: }
545: }
546: }
547:
548: /**
549: * Read the basic log entry header, leaving the buffer mark at the
550: * beginning of the checksummed header data.
551: */
552: private void readBasicHeader(ByteBuffer dataBuffer)
553: throws DatabaseException {
554:
555: /* Read the header for this entry. */
556: currentEntryHeader = new LogEntryHeader(envImpl, dataBuffer,
557: anticipateChecksumErrors);
558:
559: /*
560: * currentEntryPrevOffset is a separate field, and is not obtained
561: * directly from the currentEntryHeader, because is was initialized and
562: * used before any log entry was read.
563: */
564: currentEntryPrevOffset = currentEntryHeader.getPrevOffset();
565: }
566:
567: /**
568: * Reset the checksum validator and add the new header bytes. Assumes that
569: * the data buffer is positioned at the start of the log item.
570: */
571: private void startChecksum(ByteBuffer dataBuffer)
572: throws DatabaseException {
573:
574: /* Clear out any previous data. */
575: cksumValidator.reset();
576:
577: /*
578: * Move back up to the beginning of portion of the log entry header
579: * covered by the checksum.
580: */
581: int itemStart = threadSafeBufferPosition(dataBuffer);
582: int headerSizeMinusChecksum = currentEntryHeader
583: .getSizeMinusChecksum();
584: threadSafeBufferPosition(dataBuffer, itemStart
585: - headerSizeMinusChecksum);
586: cksumValidator.update(envImpl, dataBuffer,
587: headerSizeMinusChecksum, anticipateChecksumErrors);
588:
589: /* Move the data buffer back to where the log entry starts. */
590: threadSafeBufferPosition(dataBuffer, itemStart);
591: }
592:
593: /**
594: * Add the entry bytes to the checksum and check the value. This method
595: * must be called with the buffer positioned at the start of the entry.
596: */
597: private void validateChecksum(ByteBuffer entryBuffer)
598: throws DatabaseException {
599:
600: cksumValidator.update(envImpl, entryBuffer, currentEntryHeader
601: .getItemSize(), anticipateChecksumErrors);
602: cksumValidator.validate(envImpl, currentEntryHeader
603: .getChecksum(), readBufferFileNum, currentEntryOffset,
604: anticipateChecksumErrors);
605: }
606:
607: /**
608: * Try to read a specified number of bytes.
609: * @param amountToRead is the number of bytes we need
610: * @param collectData is true if we need to actually look at the data.
611: * If false, we know we're skipping this entry, and all we need to
612: * do is to count until we get to the right spot.
613: * @return a byte buffer positioned at the head of the desired portion,
614: * or null if we reached eof.
615: */
616: private ByteBuffer readData(int amountToRead, boolean collectData)
617: throws IOException, DatabaseException, EOFException {
618:
619: int alreadyRead = 0;
620: ByteBuffer completeBuffer = null;
621: saveBuffer.clear();
622:
623: while ((alreadyRead < amountToRead) && !eof) {
624:
625: int bytesNeeded = amountToRead - alreadyRead;
626: if (readBuffer.hasRemaining()) {
627:
628: /* There's data in the read buffer, process it. */
629: if (collectData) {
630: /*
631: * Save data in a buffer for processing.
632: */
633: if ((alreadyRead > 0)
634: || (readBuffer.remaining() < bytesNeeded)) {
635:
636: /* We need to piece an entry together. */
637:
638: copyToSaveBuffer(bytesNeeded);
639: alreadyRead = threadSafeBufferPosition(saveBuffer);
640: completeBuffer = saveBuffer;
641: } else {
642:
643: /* A complete entry is available in this buffer. */
644:
645: completeBuffer = readBuffer;
646: alreadyRead = amountToRead;
647: }
648: } else {
649: /*
650: * No need to save data, just move buffer positions.
651: */
652: int positionIncrement = (readBuffer.remaining() > bytesNeeded) ? bytesNeeded
653: : readBuffer.remaining();
654:
655: alreadyRead += positionIncrement;
656: threadSafeBufferPosition(readBuffer,
657: threadSafeBufferPosition(readBuffer)
658: + positionIncrement);
659: completeBuffer = readBuffer;
660: }
661: } else {
662: /*
663: * Look for more data.
664: */
665: fillReadBuffer(bytesNeeded);
666: }
667: }
668:
669: /* Flip the save buffer just in case we've been accumulating in it. */
670: threadSafeBufferFlip(saveBuffer);
671:
672: return completeBuffer;
673: }
674:
675: /**
676: * Change the read buffer size if we start hitting large log
677: * entries so we don't get into an expensive cycle of multiple reads
678: * and piecing together of log entries.
679: */
680: private void adjustReadBufferSize(int amountToRead) {
681: int readBufferSize = readBuffer.capacity();
682: /* We need to read something larger than the current buffer size. */
683: if (amountToRead > readBufferSize) {
684: /* We're not at the max yet. */
685: if (readBufferSize < maxReadBufferSize) {
686:
687: /*
688: * Make the buffer the minimum of amountToRead or a
689: * maxReadBufferSize.
690: */
691: if (amountToRead < maxReadBufferSize) {
692: readBufferSize = amountToRead;
693: /* Make it a modulo of 1K */
694: int remainder = readBufferSize % 1024;
695: readBufferSize += 1024 - remainder;
696: readBufferSize = Math.min(readBufferSize,
697: maxReadBufferSize);
698: } else {
699: readBufferSize = maxReadBufferSize;
700: }
701: readBuffer = ByteBuffer.allocate(readBufferSize);
702: }
703:
704: if (amountToRead > readBuffer.capacity()) {
705: nRepeatIteratorReads++;
706: }
707: }
708: }
709:
710: /**
711: * Copy the required number of bytes into the save buffer.
712: */
713: private void copyToSaveBuffer(int bytesNeeded) {
714: /* How much can we get from this current read buffer? */
715: int bytesFromThisBuffer;
716:
717: if (bytesNeeded <= readBuffer.remaining()) {
718: bytesFromThisBuffer = bytesNeeded;
719: } else {
720: bytesFromThisBuffer = readBuffer.remaining();
721: }
722:
723: /* Gather it all into this save buffer. */
724: ByteBuffer temp;
725:
726: /* Make sure the save buffer is big enough. */
727: if (saveBuffer.capacity()
728: - threadSafeBufferPosition(saveBuffer) < bytesFromThisBuffer) {
729: /* Grow the save buffer. */
730: temp = ByteBuffer.allocate(saveBuffer.capacity()
731: + bytesFromThisBuffer);
732: threadSafeBufferFlip(saveBuffer);
733: temp.put(saveBuffer);
734: saveBuffer = temp;
735: }
736:
737: /*
738: * Bulk copy only the required section from the read buffer into the
739: * save buffer. We need from readBuffer.position() to
740: * readBuffer.position() + bytesFromThisBuffer.
741: */
742: temp = readBuffer.slice();
743: temp.limit(bytesFromThisBuffer);
744: saveBuffer.put(temp);
745: threadSafeBufferPosition(readBuffer,
746: threadSafeBufferPosition(readBuffer)
747: + bytesFromThisBuffer);
748: }
749:
750: /**
751: * Fill up the read buffer with more data.
752: */
753: private void fillReadBuffer(int bytesNeeded)
754: throws DatabaseException, EOFException {
755:
756: FileHandle fileHandle = null;
757: try {
758: adjustReadBufferSize(bytesNeeded);
759:
760: /* Get a file handle to read in more log. */
761: fileHandle = fileManager.getFileHandle(readBufferFileNum);
762: boolean fileOk = false;
763:
764: /*
765: * Check to see if we've come to the end of the file. If so, get
766: * the next file.
767: */
768: if (readBufferFileEnd < fileHandle.getFile().length()) {
769: fileOk = true;
770: } else {
771: /* This file is done -- can we read in the next file? */
772: if (!singleFile) {
773: Long nextFile = fileManager.getFollowingFileNum(
774: readBufferFileNum, forward);
775: if (nextFile != null) {
776: readBufferFileNum = nextFile.longValue();
777: fileHandle.release();
778: fileHandle = fileManager
779: .getFileHandle(readBufferFileNum);
780: fileOk = true;
781: readBufferFileEnd = 0;
782: nextEntryOffset = 0;
783: }
784: }
785: }
786:
787: if (fileOk) {
788: readBuffer.clear();
789: fileManager.readFromFile(fileHandle.getFile(),
790: readBuffer, readBufferFileEnd);
791: nReadOperations += 1;
792:
793: assert EnvironmentImpl.maybeForceYield();
794:
795: readBufferFileStart = readBufferFileEnd;
796: readBufferFileEnd = readBufferFileStart
797: + threadSafeBufferPosition(readBuffer);
798: threadSafeBufferFlip(readBuffer);
799: } else {
800: throw new EOFException();
801: }
802: } catch (IOException e) {
803: e.printStackTrace();
804: throw new DatabaseException(
805: "Problem in fillReadBuffer, readBufferFileNum = "
806: + readBufferFileNum + ": " + e.getMessage());
807:
808: } finally {
809: if (fileHandle != null) {
810: fileHandle.release();
811: }
812: }
813: }
814:
815: /**
816: * Returns the number of reads since the last time this method was called.
817: */
818: public int getAndResetNReads() {
819: int tmp = nReadOperations;
820: nReadOperations = 0;
821: return tmp;
822: }
823:
824: /**
825: * @return true if this reader should process this entry, or just
826: * skip over it.
827: */
828: protected boolean isTargetEntry(byte logEntryTypeNumber,
829: byte logEntryTypeVersion) throws DatabaseException {
830:
831: return true;
832: }
833:
834: /**
835: * Each file reader implements this method to process the entry data.
836: * @param enteryBuffer contains the entry data and is positioned at the
837: * data
838: * @return true if this entry should be returned
839: */
840: protected abstract boolean processEntry(ByteBuffer entryBuffer)
841: throws DatabaseException;
842:
843: private static class EOFException extends Exception {
844: }
845:
846: /**
847: * Note that we catch Exception here because it is possible that another
848: * thread is modifying the state of buffer simultaneously. Specifically,
849: * this can happen if another thread is writing this log buffer out and it
850: * does (e.g.) a flip operation on it. The actual mark/pos of the buffer
851: * may be caught in an unpredictable state. We could add another latch to
852: * protect this buffer, but that's heavier weight than we need. So the
853: * easiest thing to do is to just retry the duplicate operation. See
854: * [#9822].
855: */
856: private Buffer threadSafeBufferFlip(ByteBuffer buffer) {
857: while (true) {
858: try {
859: return buffer.flip();
860: } catch (IllegalArgumentException IAE) {
861: continue;
862: }
863: }
864: }
865:
866: private int threadSafeBufferPosition(ByteBuffer buffer) {
867: while (true) {
868: try {
869: return buffer.position();
870: } catch (IllegalArgumentException IAE) {
871: continue;
872: }
873: }
874: }
875:
876: Buffer threadSafeBufferPosition(ByteBuffer buffer, int newPosition) {
877: while (true) {
878: try {
879: return buffer.position(newPosition);
880: } catch (IllegalArgumentException IAE) {
881: if (newPosition > buffer.capacity()) {
882: throw IAE;
883: }
884: continue;
885: }
886: }
887: }
888: }
|