/*
* Author:
* Inception Date:
*
* This software is copyrighted by and the Regents of
* the University of California. The following terms apply to all
* files associated with the software unless explicitly disclaimed in
* individual files.
*
* The authors hereby grant permission to use this software without
* fee or royalty for any non-commercial purpose. The authors also
* grant permission to redistribute this software, provided this
* copyright and a copy of this license (for reference) are retained
* in all distributed copies.
*
* For commercial use of this software, contact the authors.
*
* IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
* FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
* ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
* DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
* THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT. THIS SOFTWARE
* IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE
* NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
* MODIFICATIONS.
*/
package ninja2.core.sn_btree.bufcache;
import ninja2.core.io_core.interfaces.*;
import ninja2.core.io_core.interfaces.disk.*;
import ninja2.core.io_core.core.*;
import ninja2.core.io_core.util.*;
import ninja2.core.io_core.thread_pool.*;
import ninja2.core.sn_btree.*;
import ninja2.util.*;
import java.util.*;
// these are imported only for the test code in main()
import ninja2.core.io_core.fs_disk.*;
/**
* The BufCache class is a very simplistic, non-blocking non-associative cache
* of MemRegionIF
blocks that we use to read from and write to
* disk segments. The cache is a write-back cache that allows the caller
* to flush individual blocks in the cache, or flush the entire cache of
* dirty data. Callers can also register a callback that gets invoked on
* cache eviction.
*
* Note that since this cache is non-blocking, it uses completions * to let the caller know when the block is ready. Some funny things * happen with memory allocation inside here - I assume that the disk * segment source will be alloc'ing buffers for the reads, so for now I * don't worry about managing my own alloc'ed buffers. *
* The BufCache supports both the PassiveSourceIF and UpcallRegisterIF
* queueing completion models.
*
* @author
* @see ninja2.core.io_core.interfaces.MemRegionIF
*/
public class BufCache implements ninja2.core.io_core.interfaces.PassiveSourceIF,
ninja2.core.io_core.interfaces.UpcallRegisterIF,
ninja2.core.io_core.interfaces.UpcallHandlerIF,
BufCacheEvictionCallbackIF {
private int _num_blocks;
private int _blocksize;
private BufCacheElement[] _elements; // invalid blocks are null
private Hashtable _segmentReaders;
private Hashtable _segmentWriters;
private BufCacheEvictionCallbackIF _evictionCB;
private Queue _compQ;
public DiskSegmentIF _segIF;
private MemAllocatorIF _allocator;
private UpcallHandlerIF _devnull;
private static BufCache globBufCache = null;
private static final int NUM_THREADS_PER_SEGMENT_READER = 5;
/**
* Using this static method, all callers can get access to a single
* globally shared buffer cache, currently of size 24MB or so.
*/
public static BufCache getGlobalBufCache(ThreadPool tp) {
if (globBufCache == null) {
ByteArrayMemAllocator bama = new ByteArrayMemAllocator();
FSDiskSegmentVizier fdsv = new FSDiskSegmentVizier(tp);
// XXX these shouldn't really be magic numbers - parameterize later
globBufCache = new BufCache(bama, fdsv, SN_Btree.table_blocksize,
SN_Btree.num_blocks_in_cache);
}
return globBufCache;
}
/**
* Creates a buffer cache, allowing the caller to specify the blocksize
* and the number of blocks to keep in the cache. These numbers are
* _not_ modifyable; you get to specify them at cache creation time only.
*
* @param allocator the allocator that we'll use for grabbing mem regions
* @param segIF the DiskSegmentIF to use for interacting with disk
* @param blocksize the size of the cache blocks, in bytes
* @param num_blocks the number of blocks in the cache
*/
public BufCache(MemAllocatorIF allocator,
DiskSegmentIF segIF, int blocksize, int num_blocks) {
_segIF = segIF;
_allocator = allocator;
_num_blocks = num_blocks;
_blocksize = blocksize;
_evictionCB = null;
_compQ = new Queue();
_devnull = new DevNull_Upcall();
_elements = new BufCacheElement[_num_blocks];
_segmentReaders = new Hashtable();
_segmentWriters = new Hashtable();
}
/**
* @return the size of blocks stored in this cache
*/
public int get_blocksize() {
return _blocksize;
}
/**
* @return the maximum number of blocks stored in this cache
*/
public int get_numblocks() {
return _num_blocks;
}
/**
* Registers a callback function to call when a block gets evicted from
* the cache. This callback function will only be called once at most
* simultaneously; it will never be called a second time before the first
* call as returned. However, it may be called at any time. Also note
* that it is theoretically possible for a block to be in the completion
* queue when this callback function is called on it. If so, it is
* left on the queue, since the data won't change just by being written
* to disk. If this method is never called, then no callback is ever
* invoked. Note that the callback is evoked when the victim is selected,
* but before it is written back. The callback is evoked only if
* the victim is dirty.
*/
public void
register_callback(BufCacheEvictionCallbackIF evictionCB) {
_evictionCB = evictionCB;
}
/**
* Causes a block to be fetched from disk if it is not in the cache.
* Either way, the block (represented by a BufCacheElement) will be added
* to this cache's completion queue (accessible through the PassiveSourceIF
* interface).
*/
//MARK: pass uniqueID
public void fetch_and_cache(String segname, int blocknum ) {
if (SN_Btree.debug)
System.out.println("starting BufCache.fetch_and_cache()...old");
fetch_and_cache(segname,blocknum,-1);
}
public void fetch_and_cache(String segname, int blocknum, int id) {
if (SN_Btree.debug)
System.out.println("starting BufCache.fetch_and_cache()...ID:"+id);
BufCacheElement el = null;
boolean do_enqueue = false;
int elno = name_and_blocknum_to_offset(segname, blocknum);
if (_elements[elno] != null) {
el = _elements[elno];
if (el != null) {
if (el.segment_name.equals(segname) &&
(blocknum == el.block_num)) {
do_enqueue = true;
} else {
// here because element was wrong one. evict that puppy.
write_out_element(el, true);
_elements[elno] = null;
}
}
}
if (do_enqueue == true) {
// got the element from the cache, put it on our queue. done!
// hopefully this was the common case.
// Thread.currentThread().yield();
el.uniqueID.addElement(new Integer(id)); /* MARK: pass uniqueID */
_compQ.enqueue(el);
return;
}
if (SN_Btree.debug)
System.out.println(" Not in cache");
// here because there is nothing in the cache for this puppy,
// either because the wrong element was in there and we evicted
// it, or because the cache line was cold. job: issue the read
// to get the element into the cache.
DiskSegmentReaderIF rif = null;
DiskSegmentWriterIF wif = null;
rif = (DiskSegmentReaderIF) _segmentReaders.get(segname);
if (rif == null) {
try {
rif = _segIF.openForRead(segname, NUM_THREADS_PER_SEGMENT_READER);
rif.register_upcall(SN_Btree.main_event_queue);
wif = _segIF.openForWrite(segname, 0);
} catch (java.io.IOException ioe) {
Debug.msg("Btree singlenode", Debug.FATAL,
"Disk is dead - aborting (3)!",
ioe, Debug.FATAL);
System.exit(1);
}
_segmentReaders.put(segname, rif);
_segmentWriters.put(segname, wif);
}
try {
MemRegionIF reg = null;
try {
reg = _allocator.allocateRegion(_blocksize);
} catch (java.lang.OutOfMemoryError ome) {
Debug.msg("Btree singlenode", Debug.FATAL,
"Out of memory in buffer cache fetching block "
+ blocknum + " from segment " + segname,
ome, Debug.FATAL);
System.exit(1);
}
/* MARK: pass unique id as 3rd arg */
/* put element in cache */
rif.registerRead(blocknum*_blocksize, _blocksize, new Integer(id), reg);
if (SN_Btree.debug)
System.out.println(" DiskSegmentReader.registerRead()");
} catch (java.lang.IndexOutOfBoundsException iobe) {
// generate a null completion for this, don't put in cache
BufCacheElement els = new BufCacheElement();
els.segment_name = segname;
els.block_size = _blocksize;
els.block_num = blocknum;
els.dirty = false;
els.region = null;
els.uniqueID.addElement(new Integer(id)); /* MARK: pass uniqueID */
_compQ.enqueue(els);
Debug.msg("Btree singlenode", Debug.WARN,
"Warning: enqueuing null completion from buffer"
+ " cache, as read past end of file...better " +
"double check that my code expects this!");
}
}
/**
* Updates the data in the cache. Doesn't cause anything to be
* written to disk necessarily, but rather just causes the block to
* be marked as dirty inside the cache. If this causes another element
* to be evicted, then a write will happen. Note that the eviction callback
* will be triggered first, however.
*/
public void write_back_cache(String segname, int blocknum,
MemRegionIF data) {
if (SN_Btree.debug) System.out.println("starting BufCache.write_back_cache()...");
BufCacheElement el = null;
int elno = name_and_blocknum_to_offset(segname, blocknum);
if (_elements[elno] != null) {
el = _elements[elno];
if (el.segment_name.equals(segname) && (blocknum == el.block_num)) {
// is an element in the cache and it's the right one; replace it
// and mark it dirty.
el.region = data;
el.dirty = true;
return;
}
// here because element was wrong one. evict that puppy.
// element was wrong one
write_out_element(el, true);
_elements[elno] = null;
}
// here because there is nothing in the cache for this puppy,
// either because the wrong element was in there and we evicted
// it, or because the cache line was cold. job: create a cache
// element to hold this data.
BufCacheElement newElement = new BufCacheElement();
newElement.segment_name = segname;
newElement.block_size = _blocksize;
newElement.block_num = blocknum;
newElement.dirty = true;
newElement.region = data;
_elements[elno] = newElement;
}
/**
* Updates the data in the cache, and forces data through the cache
* onto disk. If this causes another element
* to be evicted, then a write will happen. Note that the eviction callback
* will be triggered first, however.
*/
public void write_through_cache(String segname, int blocknum,
MemRegionIF data) {
if (SN_Btree.debug)
System.out.println("starting BufCache.write_through_cache()...");
BufCacheElement el = null;
int elno = name_and_blocknum_to_offset(segname, blocknum);
if (_elements[elno] != null) {
el = _elements[elno];
if (el != null) {
if (el.segment_name.equals(segname) && (blocknum == el.block_num)) {
// is an element in the cache and it's the right one; replace it
// and mark it dirty.
el.region = data;
el.dirty = true;
write_out_element(el, true);
return;
} else {
// here because element was wrong one. evict that puppy.
_elements[elno] = null;
write_out_element(el, true);
}
}
}
// here because there is nothing in the cache for this puppy,
// either because the wrong element was in there and we evicted
// it, or because the cache line was cold. job: create a cache
// element to hold this data.
BufCacheElement newElement = new BufCacheElement();
newElement.segment_name = segname;
newElement.block_size = _blocksize;
newElement.block_num = blocknum;
newElement.dirty = true;
newElement.region = data;
_elements[elno] = newElement;
write_out_element(newElement, true);
}
/**
* Deliberately causes all blocks to be flushed from the cache back to
* disk. Causes the eviction callback to be triggered once per block
* being flushed.
*
* @param block if block is true, this method call will wait until the
* entire cache is flushed before moving on. Otherwise, the
* flush will happen in the background.
*
*/
public void flush_entire_cache(boolean block) {
QueueIF compQ = null;
if (block)
compQ = new Queue();
for (int i=0; i<_num_blocks; i++) {
BufCacheElement el = _elements[i];
if (el != null) {
write_out_element(el, false);
el.dirty = false;
}
}
int numflush = 0;
Enumeration swenum = _segmentWriters.elements();
while (swenum.hasMoreElements()) {
DiskSegmentWriterIF wif = null;
try {
wif = (DiskSegmentWriterIF) (swenum.nextElement());
if (wif != null) {
try {
if (block) {
wif.flush(compQ, null);
numflush++;
} else
wif.flush(_devnull, null);
} catch (SinkDeadException sde) {
}
}
} catch (NoSuchElementException nsee) {
}
}
// do the block until all is done
while (numflush > 0) {
QueueElementIF[] els = compQ.blocking_dequeue(0);
if (els != null)
numflush -= els.length;
}
}
/**
* Causes all blocks associated with this segment to be flushed out.
*
* @param segname the segment to flush
* @param block if block is true, this method will block until all bytes
* have hit the disk. Otherwise, the flush will happen
* in the background.
*/
public void flush_segment(String segname, boolean block) {
QueueIF compQ = null;
if (block)
compQ = new Queue();
for (int i=0; i<_num_blocks; i++) {
BufCacheElement el = _elements[i];
if ((el != null) && (el.segment_name.equals(segname))) {
write_out_element(el, false);
_elements[i] = null;
}
}
int numflush = 0;
DiskSegmentWriterIF wif = null;
wif = (DiskSegmentWriterIF)
_segmentWriters.get(segname);
if (wif != null) {
try {
if (block) {
wif.flush(compQ, null);
numflush++;
} else
wif.flush(_devnull, null);
} catch (SinkDeadException sde) {
}
}
while (numflush > 0) {
QueueElementIF[] els = compQ.blocking_dequeue(0);
if (els != null)
numflush -= els.length;
}
}
/**
* Causes all blocks associated with this particular segment to
* be flushed out, and closes all readers/writers associated
* with the segment.
*
* @param segname the segment to flush out
* @param block if block is true, this method blocks until the flush
* finishes. If not, the flush happens in the background.
*/
public void done_with_segment(String segname, boolean block) {
QueueIF compQ = null;
if (block)
compQ = new Queue();
for (int i=0; i<_num_blocks; i++) {
BufCacheElement el = _elements[i];
if ((el != null) && (el.segment_name.equals(segname))) {
_elements[i] = null;
write_out_element(el, false);
}
}
DiskSegmentReaderIF rif = null;
DiskSegmentWriterIF wif = null;
int numflush = 0;
rif = (DiskSegmentReaderIF) _segmentReaders.remove(segname);
wif = (DiskSegmentWriterIF) _segmentWriters.remove(segname);
if (rif != null) {
rif.doneReading();
}
if (wif != null) {
try {
if (block) {
wif.close(compQ, null);
numflush++;
} else {
wif.close(_devnull, null);
}
} catch (SinkDeadException sde) {
}
}
rif = null;
wif = null;
while (numflush > 0) {
QueueElementIF[] els = compQ.blocking_dequeue(0);
if (els != null)
numflush -= els.length;
}
}
/**
* Sometimes its useful to grab blocks synchronously. This method
* allows you to do exactly that; if the blocko is in the cache, it
* returns it, otherwise it reads it from disk and returns it, and
* puts it in the cache.
*/
public MemRegionIF fetch_sync_and_cache(String segname, int blocknum)
throws java.io.IOException {
if (SN_Btree.debug)
System.out.println("starting BufCache.fetch_sync_and_cache()...");
// is it in the cache?
BufCacheElement bcel = null;
boolean do_return = false;
int elno = name_and_blocknum_to_offset(segname, blocknum);
if (_elements[elno] != null) {
bcel = _elements[elno];
if (bcel != null) {
if (bcel.segment_name.equals(segname) &&
(blocknum == bcel.block_num)) {
do_return = true;
} else {
// here because element was wrong one. evict that puppy.
write_out_element(bcel, true);
_elements[elno] = null;
}
}
}
if (do_return == true) {
// got the element from the cache, put it on our queue. done!
// hopefully this was the common case.
// Thread.currentThread().yield();
return bcel.region;
}
if (SN_Btree.debug)
System.out.println(" Sync:Not in cache");
// not in the cache, need to read it in synchronously
MemRegionIF reg = null;
DiskSegmentReaderIF rif = null;
QueueIF fsncq = (QueueIF) new Queue();
try {
rif = _segIF.openForRead(segname, NUM_THREADS_PER_SEGMENT_READER);
reg = _allocator.allocateRegion(_blocksize);
rif.register_upcall(fsncq);
rif.registerRead(blocknum*_blocksize, _blocksize, null, reg);
} catch (java.io.IOException ioe) {
Debug.msg("Btree singlenode", Debug.FATAL,
"disk is dead - aborting (3.994)!",
ioe, Debug.FATAL);
System.exit(1);
} catch (java.lang.OutOfMemoryError ome) {
Debug.msg("Btree singlenode", Debug.FATAL,
"out of memory in buf cache (3.994)",
ome, Debug.FATAL);
System.exit(1);
}
QueueElementIF el[];
int num_tries = 0;
el = fsncq.blocking_dequeue(0);
if (el == null) {
Debug.msg("Btree singlenode", Debug.FATAL,
"uh oh - fsncq dequeue is null in BufCache!");
System.exit(1);
}
if (el.length != 1) {
Debug.msg("Btree singlenode", Debug.FATAL,
"disk read returned more than one completion " +
"event in buf cache");
System.exit(1);
}
if (!(el[0] instanceof DiskSegmentReadFinished)) {
Debug.msg("Btree singlenode", Debug.FATAL,
"disk read failed in buf cache (3.99) - " + el[0] +
", was for (" + segname + "," + blocknum + ")");
System.exit(1);
}
DiskSegmentReadFinished dsrg = (DiskSegmentReadFinished) el[0];
if ((dsrg.read_amount_requested != _blocksize) ||
(dsrg.read_amount_achieved != _blocksize)) {
Debug.msg("Btree singlenode", Debug.FATAL,
"read completion not blocksize (3.99)");
System.exit(1);
}
BufCacheElement bce = new BufCacheElement();
if ((dsrg.read_amount_requested != _blocksize) ||
(dsrg.read_amount_achieved != _blocksize)) {
Debug.msg("Btree singlenode", Debug.FATAL,
"Bogus read completion - not blocksize!!");
System.exit(1);
}
rif.doneReading();
bce.segment_name = dsrg.segname;
bce.block_size = _blocksize;
bce.block_num = dsrg.starting_offset / _blocksize;
bce.dirty = false;
bce.region = dsrg.filled_data;
int offset = name_and_blocknum_to_offset(bce.segment_name,
bce.block_num);
_elements[offset] = bce;
return reg;
}
// the PassiveSourceIF methods
/**
* dequeues a bufcache completion (i.e. a read). A bufcache
* completion QueueElementIF is of type BufCacheElement.
*/
public QueueElementIF dequeue() {
return _compQ.dequeue();
}
/**
* dequeues all bufcache completions (i.e. all reads). A bufcache
* completion QueueElementIF is of type BufCacheElement.
*/
public QueueElementIF[] dequeue_all() {
return _compQ.dequeue_all();
}
public void register_upcall(UpcallHandlerIF upcall_handler) {
_compQ.register_upcall(upcall_handler);
}
/**
* This is here just as a simple debugging eviction callback
*/
public void about_to_evict(BufCacheElement element) {
System.out.println("Evicting: " + element);
}
// here are upcall functions from the underlying disk readers/writers
public void enqueue_many(QueueElementIF[] els) {
int els_len = els.length;
for (int i=0; i