/*
 * Author: 
 * Inception Date: 
 *
 * This software is copyrighted by 		and the Regents of
 * the University of California.  The following terms apply to all
 * files associated with the software unless explicitly disclaimed in
 * individual files.
 * 
 * The authors hereby grant permission to use this software without
 * fee or royalty for any non-commercial purpose.  The authors also
 * grant permission to redistribute this software, provided this
 * copyright and a copy of this license (for reference) are retained
 * in all distributed copies.
 *
 * For commercial use of this software, contact the authors.
 * 
 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
 * ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
 * DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  THIS SOFTWARE
 * IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE
 * NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
 * MODIFICATIONS.
 */

package ninja2.core.sn_btree.bufcache;

import ninja2.core.io_core.interfaces.*;
import ninja2.core.io_core.interfaces.disk.*;
import ninja2.core.io_core.core.*;
import ninja2.core.io_core.util.*;
import ninja2.core.io_core.thread_pool.*;
import ninja2.core.sn_btree.*;
import ninja2.util.*;
import java.util.*;

// these are imported only for the test code in main()
import ninja2.core.io_core.fs_disk.*;

/**
 * The BufCache class is a very simplistic, non-blocking non-associative cache
 * of <code>MemRegionIF</code> blocks that we use to read from and write to
 * disk segments.  The cache is a write-back cache that allows the caller
 * to flush individual blocks in the cache, or flush the entire cache of
 * dirty data.  Callers can also register a callback that gets invoked on
 * cache eviction.
 * <P>
 * Note that since this cache is non-blocking, it uses completions
 * to let the caller know when the block is ready.  Some funny things
 * happen with memory allocation inside here - I assume that the disk
 * segment source will be alloc'ing buffers for the reads, so for now I
 * don't worry about managing my own alloc'ed buffers.
 * <P>
 * The BufCache supports both the PassiveSourceIF and UpcallRegisterIF
 * queueing completion models.
 *
 * @author 
 * @see ninja2.core.io_core.interfaces.MemRegionIF 
 */

public class BufCache implements ninja2.core.io_core.interfaces.PassiveSourceIF,
                                 ninja2.core.io_core.interfaces.UpcallRegisterIF,
		                 ninja2.core.io_core.interfaces.UpcallHandlerIF,
                                 BufCacheEvictionCallbackIF {
  private int               _num_blocks;
  private int               _blocksize;
  private BufCacheElement[] _elements;         // invalid blocks are null
  private Hashtable         _segmentReaders;
  private Hashtable         _segmentWriters;
  private BufCacheEvictionCallbackIF _evictionCB;
  private Queue             _compQ;
  public  DiskSegmentIF     _segIF;
  private MemAllocatorIF    _allocator;
  private UpcallHandlerIF   _devnull;

  private static BufCache   globBufCache = null;
  private static final int  NUM_THREADS_PER_SEGMENT_READER = 5;

  /**
   * Using this static method, all callers can get access to a single
   * globally shared buffer cache, currently of size 24MB or so.
   */
  public static BufCache getGlobalBufCache(ThreadPool tp) {

    if (globBufCache == null) {
      ByteArrayMemAllocator bama = new ByteArrayMemAllocator();
      FSDiskSegmentVizier fdsv = new FSDiskSegmentVizier(tp);

      // XXX these shouldn't really be magic numbers - parameterize later
      globBufCache = new BufCache(bama, fdsv, SN_Btree.table_blocksize,
				  SN_Btree.num_blocks_in_cache);
    }
    return globBufCache;
  }

  /**
   * Creates a buffer cache, allowing the caller to specify the blocksize
   * and the number of blocks to keep in the cache.  These numbers are
   * _not_ modifyable; you get to specify them at cache creation time only.
   *
   * @param allocator  the allocator that we'll use for grabbing mem regions
   * @param segIF      the DiskSegmentIF to use for interacting with disk
   * @param blocksize  the size of the cache blocks, in bytes
   * @param num_blocks the number of blocks in the cache
   */
  public BufCache(MemAllocatorIF allocator,
		  DiskSegmentIF segIF, int blocksize, int num_blocks) {
    _segIF = segIF;
    _allocator = allocator;
    _num_blocks = num_blocks;
    _blocksize = blocksize;
    _evictionCB = null;
    _compQ = new Queue();
    _devnull = new DevNull_Upcall();

    _elements = new BufCacheElement[_num_blocks];
    _segmentReaders = new Hashtable();
    _segmentWriters = new Hashtable();
  }
  
  /**
   * @return the size of blocks stored in this cache
   */
  public int get_blocksize() {
    return _blocksize;
  }
  
  /**
   * @return the maximum number of blocks stored in this cache
   */
  public int get_numblocks() {
    return _num_blocks;
  }

  /**
   * Registers a callback function to call when a block gets evicted from
   * the cache.  This callback function will only be called once at most
   * simultaneously;  it will never be called a second time before the first
   * call as returned.  However, it may be called at any time.  Also note
   * that it is theoretically possible for a block to be in the completion
   * queue when this callback function is called on it.  If so, it is
   * left on the queue, since the data won't change just by being written
   * to disk.  If this method is never called, then no callback is ever
   * invoked.  Note that the callback is evoked when the victim is selected,
   * but before it is written back.  The callback is evoked only if
   * the victim is dirty.
   */
  public void 
     register_callback(BufCacheEvictionCallbackIF evictionCB) {
    _evictionCB = evictionCB;
  }

  /**
   * Causes a block to be fetched from disk if it is not in the cache.
   * Either way, the block (represented by a BufCacheElement) will be added
   * to this cache's completion queue (accessible through the PassiveSourceIF
   * interface).
   */
   //MARK: pass uniqueID
  public void fetch_and_cache(String segname, int blocknum ) {
	if (SN_Btree.debug)
		System.out.println("starting BufCache.fetch_and_cache()...old");
		fetch_and_cache(segname,blocknum,-1);
  }
  public void fetch_and_cache(String segname, int blocknum, int id) {
	if (SN_Btree.debug)
		System.out.println("starting BufCache.fetch_and_cache()...ID:"+id);
    BufCacheElement el = null;
    boolean do_enqueue = false;

    int elno = name_and_blocknum_to_offset(segname, blocknum);
    if (_elements[elno] != null) {
      el = _elements[elno];

      if (el != null) {
	if (el.segment_name.equals(segname) && 
	    (blocknum == el.block_num)) {
	  do_enqueue = true;
	} else {
	  // here because element was wrong one.  evict that puppy.
	  write_out_element(el, true);
	  _elements[elno] = null;
	}
      }
    }
    if (do_enqueue == true) {
      // got the element from the cache, put it on our queue. done!
      // hopefully this was the common case.
//      Thread.currentThread().yield();
		el.uniqueID.addElement(new Integer(id)); /* MARK: pass uniqueID */	 
      _compQ.enqueue(el);
      return;
    }

    if (SN_Btree.debug) 
		System.out.println("		Not in cache");
    // here because there is nothing in the cache for this puppy,
    // either because the wrong element was in there and we evicted
    // it, or because the cache line was cold.  job:  issue the read
    // to get the element into the cache.
    DiskSegmentReaderIF rif = null;
    DiskSegmentWriterIF wif = null;

    rif = (DiskSegmentReaderIF) _segmentReaders.get(segname);
    if (rif == null) {
      try {
	rif = _segIF.openForRead(segname, NUM_THREADS_PER_SEGMENT_READER);
	rif.register_upcall(SN_Btree.main_event_queue);
	wif = _segIF.openForWrite(segname, 0);
      } catch (java.io.IOException ioe) {
	Debug.msg("Btree singlenode", Debug.FATAL,
		  "Disk is dead - aborting (3)!",
		  ioe, Debug.FATAL);
	System.exit(1);
      }
      _segmentReaders.put(segname, rif);
      _segmentWriters.put(segname, wif);
    }
    try {
      MemRegionIF reg = null;
      
      try {
	reg = _allocator.allocateRegion(_blocksize);
      } catch (java.lang.OutOfMemoryError ome) {
	Debug.msg("Btree singlenode", Debug.FATAL,
		  "Out of memory in buffer cache fetching block "
		  + blocknum + " from segment " + segname,
		  ome, Debug.FATAL);
	System.exit(1);
      }
	  /* MARK: pass unique id as 3rd arg */
      /* put element in cache */
      rif.registerRead(blocknum*_blocksize, _blocksize, new Integer(id), reg);
      if (SN_Btree.debug) 
			System.out.println("		DiskSegmentReader.registerRead()");
    } catch (java.lang.IndexOutOfBoundsException iobe) {
      // generate a null completion for this, don't put in cache
      BufCacheElement els = new BufCacheElement();
      els.segment_name = segname;
      els.block_size = _blocksize;
      els.block_num = blocknum;
      els.dirty = false;
      els.region = null;
      els.uniqueID.addElement(new Integer(id)); /* MARK: pass uniqueID */
      _compQ.enqueue(els);
      Debug.msg("Btree singlenode", Debug.WARN,
		"Warning: enqueuing null completion from buffer"
		+ " cache, as read past end of file...better " +
		"double check that my code expects this!");
    }
  }

  /**
   * Updates the data in the cache.  Doesn't cause anything to be
   * written to disk necessarily, but rather just causes the block to
   * be marked as dirty inside the cache.  If this causes another element
   * to be evicted, then a write will happen.  Note that the eviction callback
   * will be triggered first, however.
   */
  public void write_back_cache(String segname, int blocknum,
			       MemRegionIF data) {
	if (SN_Btree.debug) System.out.println("starting BufCache.write_back_cache()...");
    BufCacheElement el = null;

    int elno = name_and_blocknum_to_offset(segname, blocknum);
    if (_elements[elno] != null) {
      el = _elements[elno];
	
      if (el.segment_name.equals(segname) && (blocknum == el.block_num)) {
	// is an element in the cache and it's the right one; replace it
	// and mark it dirty.
	el.region = data;
	el.dirty = true;
	return;
      }

      // here because element was wrong one.  evict that puppy.
      // element was wrong one
      write_out_element(el, true);
      _elements[elno] = null;
    }
    // here because there is nothing in the cache for this puppy,
    // either because the wrong element was in there and we evicted
    // it, or because the cache line was cold.  job:  create a cache
    // element to hold this data.
    BufCacheElement newElement = new BufCacheElement();
    newElement.segment_name = segname;
    newElement.block_size = _blocksize;
    newElement.block_num = blocknum;
    newElement.dirty = true;
    newElement.region = data;
    _elements[elno] = newElement;
  }

  /**
   * Updates the data in the cache, and forces data through the cache
   * onto disk.  If this causes another element
   * to be evicted, then a write will happen.  Note that the eviction callback
   * will be triggered first, however.
   */
  public void write_through_cache(String segname, int blocknum,
				  MemRegionIF data) {
	if (SN_Btree.debug) 
		System.out.println("starting BufCache.write_through_cache()...");
    BufCacheElement el = null;

    int elno = name_and_blocknum_to_offset(segname, blocknum);

    if (_elements[elno] != null) {
      el = _elements[elno];

      if (el != null) {
	if (el.segment_name.equals(segname) && (blocknum == el.block_num)) {
	  // is an element in the cache and it's the right one; replace it
	  // and mark it dirty.
	  el.region = data;
	  el.dirty = true;
	  write_out_element(el, true);
	  return;
	} else {
	  // here because element was wrong one.  evict that puppy.
	  _elements[elno] = null;
	  write_out_element(el, true);
	}
      }
    }

    // here because there is nothing in the cache for this puppy,
    // either because the wrong element was in there and we evicted
    // it, or because the cache line was cold.  job:  create a cache
    // element to hold this data.
    BufCacheElement newElement = new BufCacheElement();
    newElement.segment_name = segname;
    newElement.block_size = _blocksize;
    newElement.block_num = blocknum;
    newElement.dirty = true;
    newElement.region = data;
    _elements[elno] = newElement;
    write_out_element(newElement, true);
  }

  /**
   * Deliberately causes all blocks to be flushed from the cache back to
   * disk. Causes the eviction callback to be triggered once per block
   * being flushed.
   *
   * @param block if block is true, this method call will wait until the
   *        entire cache is flushed before moving on.  Otherwise, the
   *        flush will happen in the background.
   *
   */
  public void flush_entire_cache(boolean block) {

    QueueIF compQ = null;

    if (block)
      compQ = new Queue();

    for (int i=0; i<_num_blocks; i++) {
      BufCacheElement el = _elements[i];
	
      if (el != null) {
	write_out_element(el, false);
	el.dirty = false;
      }
    }

    int         numflush = 0;
    Enumeration swenum = _segmentWriters.elements();
    while (swenum.hasMoreElements()) {
      DiskSegmentWriterIF wif = null;
      try {
	wif = (DiskSegmentWriterIF) (swenum.nextElement());
	if (wif != null) {
	  try {
	    if (block) {
	      wif.flush(compQ, null);
	      numflush++;
	    } else
	      wif.flush(_devnull, null);
	  } catch (SinkDeadException sde) {
	  }
	}
      } catch (NoSuchElementException nsee) {
      }
    }

    // do the block until all is done
    while (numflush > 0) {
      QueueElementIF[] els = compQ.blocking_dequeue(0);
      if (els != null)
	numflush -= els.length;
    }
  }

  /**
   * Causes all blocks associated with this segment to be flushed out.
   *
   * @param segname the segment to flush
   * @param block if block is true, this method will block until all bytes
   *              have hit the disk. Otherwise, the flush will happen
   *              in the background.
   */
  public void flush_segment(String segname, boolean block) {

    QueueIF compQ = null;

    if (block)
      compQ = new Queue();

    for (int i=0; i<_num_blocks; i++) {
      BufCacheElement el = _elements[i];
	
      if ((el != null) && (el.segment_name.equals(segname))) {
	write_out_element(el, false);
	_elements[i] = null;
      }
    }

    int numflush = 0;
    DiskSegmentWriterIF wif = null;
    wif  = (DiskSegmentWriterIF) 
      _segmentWriters.get(segname);
    if (wif != null) {
      try {
	if (block) {
	  wif.flush(compQ, null);
	  numflush++;
	} else
	  wif.flush(_devnull, null);
      } catch (SinkDeadException sde) {
      }
    }
    while (numflush > 0) {
      QueueElementIF[] els = compQ.blocking_dequeue(0);
      if (els != null)
	numflush -= els.length;
    }
  }

  /**
   * Causes all blocks associated with this particular segment to
   * be flushed out, and closes all readers/writers associated
   * with the segment.
   *
   * @param segname the segment to flush out
   * @param block if block is true, this method blocks until the flush 
   *              finishes.  If not, the flush happens in the background.
   */
  public void done_with_segment(String segname, boolean block) {
    QueueIF compQ = null;

    if (block)
      compQ = new Queue();

    for (int i=0; i<_num_blocks; i++) {
      BufCacheElement el = _elements[i];
	
      if ((el != null) && (el.segment_name.equals(segname))) {
	_elements[i] = null;
	write_out_element(el, false);
      }
    }

    DiskSegmentReaderIF rif = null;
    DiskSegmentWriterIF wif = null;
    int numflush = 0;
    rif = (DiskSegmentReaderIF) _segmentReaders.remove(segname);
    wif = (DiskSegmentWriterIF) _segmentWriters.remove(segname);

    if (rif != null) {
      rif.doneReading();
    }
    if (wif != null) {
      try {
	if (block) {
	  wif.close(compQ, null);
	  numflush++;
	} else {
	  wif.close(_devnull, null);
	}
      } catch (SinkDeadException sde) {
      }
    }
    rif = null;
    wif = null;

    while (numflush > 0) {
      QueueElementIF[] els = compQ.blocking_dequeue(0);
      if (els != null)
	numflush -= els.length;
    }
  }

  /**
   * Sometimes its useful to grab blocks synchronously.  This method
   * allows you to do exactly that;  if the blocko is in the cache, it
   * returns it, otherwise it reads it from disk and returns it, and
   * puts it in the cache.
   */
  public MemRegionIF fetch_sync_and_cache(String segname, int blocknum) 
    throws java.io.IOException {

	if (SN_Btree.debug)
		System.out.println("starting BufCache.fetch_sync_and_cache()...");
    // is it in the cache?
    BufCacheElement bcel = null;
    boolean do_return = false;

    int elno = name_and_blocknum_to_offset(segname, blocknum);
    if (_elements[elno] != null) {
      bcel = _elements[elno];

      if (bcel != null) {
	if (bcel.segment_name.equals(segname) && 
	    (blocknum == bcel.block_num)) {
	  do_return = true;
	} else {
	  // here because element was wrong one.  evict that puppy.
	  write_out_element(bcel, true);
	  _elements[elno] = null;
	}
      }
    }
    if (do_return == true) {
      // got the element from the cache, put it on our queue. done!
      // hopefully this was the common case.
//      Thread.currentThread().yield();
      return bcel.region;
    }

    if (SN_Btree.debug) 
		System.out.println("		Sync:Not in cache");
    // not in the cache, need to read it in synchronously
    MemRegionIF reg = null;
    DiskSegmentReaderIF rif = null;
    QueueIF fsncq = (QueueIF) new Queue();
    try {
      rif = _segIF.openForRead(segname, NUM_THREADS_PER_SEGMENT_READER);
      reg = _allocator.allocateRegion(_blocksize);
      rif.register_upcall(fsncq);
      rif.registerRead(blocknum*_blocksize, _blocksize, null, reg);
    } catch (java.io.IOException ioe) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"disk is dead - aborting (3.994)!",
		ioe, Debug.FATAL);
      System.exit(1);
    } catch (java.lang.OutOfMemoryError ome) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"out of memory in buf cache (3.994)",
		ome, Debug.FATAL);
      System.exit(1);
    }

    QueueElementIF el[];
    int            num_tries = 0;
    
    el = fsncq.blocking_dequeue(0);
    if (el == null) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"uh oh - fsncq dequeue is null in BufCache!");
      System.exit(1);
    }
    if (el.length != 1) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"disk read returned more than one completion " +
		"event in buf cache");
      System.exit(1);
    }
    if (!(el[0] instanceof DiskSegmentReadFinished)) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"disk read failed in buf cache (3.99) - " + el[0] +
		", was for (" + segname + "," + blocknum + ")");
      System.exit(1);
    }
    DiskSegmentReadFinished dsrg = (DiskSegmentReadFinished) el[0];
    if ((dsrg.read_amount_requested != _blocksize) ||
	(dsrg.read_amount_achieved != _blocksize)) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"read completion not blocksize (3.99)");
      System.exit(1);
    }

    BufCacheElement bce = new BufCacheElement();
    if ((dsrg.read_amount_requested != _blocksize) ||
	(dsrg.read_amount_achieved != _blocksize)) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"Bogus read completion - not blocksize!!");
      System.exit(1);
    }
    rif.doneReading();
    bce.segment_name = dsrg.segname;
    bce.block_size = _blocksize;
    bce.block_num = dsrg.starting_offset / _blocksize;
    bce.dirty = false;
    bce.region = dsrg.filled_data;
    int offset = name_and_blocknum_to_offset(bce.segment_name,
					     bce.block_num);
    _elements[offset] = bce;
    return reg;
  }

  // the PassiveSourceIF methods
  /**
   * dequeues a bufcache completion (i.e. a read).  A bufcache
   * completion QueueElementIF is of type BufCacheElement.
   */
  public QueueElementIF dequeue() {
    return _compQ.dequeue();
  }

  /**
   * dequeues all bufcache completions (i.e. all reads).  A bufcache
   * completion QueueElementIF is of type BufCacheElement.
   */
  public QueueElementIF[] dequeue_all() {
    return _compQ.dequeue_all();
  }

  public void register_upcall(UpcallHandlerIF upcall_handler) {
    _compQ.register_upcall(upcall_handler);
  }

  /**
   * This is here just as a simple debugging eviction callback
   */
  public void about_to_evict(BufCacheElement element) {
    System.out.println("Evicting: " + element);
  }


  // here are upcall functions from the underlying disk readers/writers
  public void enqueue_many(QueueElementIF[] els) {
    int els_len = els.length;

    for (int i=0; i<els_len; i++)
      enqueue(els[i]);
  }
  public void enqueue(QueueElementIF queue_el) {

	if (SN_Btree.debug) System.out.println("starting BufCache.enqueue()...");
    // what kind of event is this?
    if (queue_el instanceof DiskSegmentReadFinished) {
      // got our element - add it to our local queue, and stuff it
      // in the cache
      BufCacheElement bce = new BufCacheElement();
      DiskSegmentReadFinished dsrf = (DiskSegmentReadFinished) queue_el;
      int             offset;

      if ((dsrf.read_amount_requested != _blocksize) ||
	  (dsrf.read_amount_achieved != _blocksize)) {
	Debug.msg("Btree singlenode", Debug.FATAL,
		  "Bogus read completion - not blocksize!!");
	System.exit(1);
      }
      bce.segment_name = dsrf.segname;
      bce.block_size = _blocksize;
      bce.block_num = dsrf.starting_offset / _blocksize;
      bce.dirty = false;
      bce.region = dsrf.filled_data;
												/* MARK: pass uniqueID */
	  //bce.uniqueID.addElement(((Integer)dsrf.id_object).intValue()); 
	  bce.uniqueID.addElement(dsrf.id_object); 
      offset = name_and_blocknum_to_offset(bce.segment_name,
					   bce.block_num);
      BufCacheElement el;
      // careful logic here - multiple reads may be issued but
      // not completed, that, once completed, will conflict in
      // the cache.  It's ok to overwrite the cache, as
      // long as we enqueue both.  But, the cache may be dirty
      // by the time we read it, so we may need to evict.
      el = _elements[offset];

      if (el != null) {
	if (!((el.segment_name.equals(bce.segment_name)) &&
	      (el.block_num == bce.block_num))) {
	  write_out_element(el, true);
	}
      }
      _elements[offset] = bce;
      if (SN_Btree.debug) System.out.println("		Now in cache");
      _compQ.enqueue(bce);
    } else if (queue_el instanceof DiskSegmentReadFailed) {
      // uh oh - disk dead?
      Debug.msg("Btree singlenode", Debug.FATAL,
		"disk seems dead(1) - aborting!!");
      System.exit(1);
    } else if (queue_el instanceof DiskSegmentWriteCloggedEvent) {
      // uh oh - disk dead?
      Debug.msg("Btree singlenode", Debug.FATAL,
		"disk seems dead - aborting!");
      System.exit(1);
    } else if (queue_el instanceof DiskSegmentWriteDrainedEvent) {
      // great!  our flush/eviction worked.  Do nothing.
    }
  }

  private void write_out_element(BufCacheElement el, boolean do_flush) {
    if (el == null)
      return;

    if ((_evictionCB != null) && (el.dirty == true))
      _evictionCB.about_to_evict(el);

    if (el.dirty) {
      DiskSegmentReaderIF rif = null;
      DiskSegmentWriterIF wif = null;

      wif = (DiskSegmentWriterIF) _segmentWriters.get(el.segment_name);

      if (wif == null) {
	try {
	  rif = _segIF.openForRead(el.segment_name,
				   NUM_THREADS_PER_SEGMENT_READER);
	  rif.register_upcall(SN_Btree.main_event_queue);
	  wif = _segIF.openForWrite(el.segment_name, 0);
	} catch (java.io.IOException ioe) {
	  Debug.msg("Btree singlenode", Debug.FATAL,
		    "Disk is dead - aborting (3.5)!" +
		    "filename " + el.segment_name,
		    ioe, Debug.FATAL);
	  System.exit(1);
	}
	_segmentReaders.put(el.segment_name, rif);
	_segmentWriters.put(el.segment_name, wif);
      }
      try {
	wif.seek(el.block_size*el.block_num);
      } catch (java.io.IOException ioe) {
	Debug.msg("Btree singlenode", Debug.FATAL,
		  "segment is dead!(1)", ioe, Debug.FATAL);
	System.exit(1);
      }
      SinkElementIF writeIF = new GenericSinkElement(
	el.region, el.block_size, 0, this
	); // offset 0 is start of write offset
      try {
	wif.sync_enqueue(writeIF, null);
	if (do_flush) {
	  wif.flush(_devnull, null);
	}
//	Thread.currentThread().yield();
      } catch (SinkDeadException sde) {
	Debug.msg("Btree singlenode", Debug.FATAL,
		  "segment " + el.segment_name +
		  ", Segment is dead!(2)",
		  sde, Debug.FATAL);
	  System.exit(1);
      }
    } else {
//      System.out.println("wasn't dirty, no write happened");
    }
  }

  private int name_and_blocknum_to_offset(String name, int blocknum) {
    int num = (name.hashCode()*13);
    if (num < 0)
      num *= -1;
    num = (num + blocknum ) % _num_blocks;
    return num;
  }


  public static MemRegionIF getNewReg(ByteArrayMemAllocator bama,
				      byte fillbyte,
				      int size) {
    MemRegionIF bamr = bama.allocateRegion(size);
    for (int i=0; i<size; i++) {
      try {
	bamr.putByte(fillbyte, i);
      } catch (ArrayIndexOutOfBoundsException aiobe) {
      }
    }
    return bamr;
  }

  public static void main(String args[]) {
    ThreadPool            tp = new ThreadPool(30,35,40);
    ByteArrayMemAllocator bama = new ByteArrayMemAllocator();
    FSDiskSegmentVizier fdsv = new FSDiskSegmentVizier(tp);
    String testCache1 = "testcache1";
    String testCache2 = "testcache2";
    MemRegionIF newData;
    QueueIF compQ = new Queue();

    System.out.println("Creating some files to bufcacherize..");
    try {
      fdsv.create(testCache1, 0);
      fdsv.create(testCache2, 8192);
    } catch (java.io.IOException ioe) {
      ioe.printStackTrace();
      System.exit(1);
      }

    System.out.println("Creating a bufcache, registering self as evictCB..");
    BufCache myCache = new BufCache(bama, fdsv, 8192, 40);  // 8MB
    myCache.register_callback(myCache);
    myCache.register_upcall(compQ);

    // this is kind of icky - the bufcache is hardwired to send completions
    // to the SN_Btree now, so that single threading through all fsms
    // works.
    SN_Btree.init_sn_btree();
    SN_Btree.globBufCache = myCache;

    System.out.println("Writing a few blocks to the bufcache");
    newData = getNewReg(bama, (byte) 0x41, 8192);
    myCache.write_back_cache(testCache1, 0, newData);
    myCache.write_back_cache(testCache1, 2, newData);
    System.out.println("flushing cache");
    myCache.flush_entire_cache(true);

    System.out.println("Getting serious now....");
    int issuenum = 0;
    for (int i=0; i<20; i++) {
      newData = getNewReg(bama, (byte) (0x40 + i), 8192);
      myCache.write_back_cache(testCache1, 0+i, newData);
      myCache.write_back_cache(testCache2, 0+i, newData);
    }
    for (int i=0; i<20; i++) {
      System.out.println("issuing read block " + i);
      myCache.fetch_and_cache(testCache1, i);
      issuenum++;
      if (i % 2 == 1) {
	System.out.println("issuing read2 block " + i);
	myCache.fetch_and_cache(testCache2, i);
	issuenum++;
      }
    }
    
    int gotnum = 0;
    while (gotnum < issuenum) {
      QueueElementIF[] foo = compQ.blocking_dequeue(0);
      if (foo == null)
	continue;

      for (int i=0; i<foo.length; i++) {
	if (foo[i] instanceof BufCacheElement) {
	  BufCacheElement bce = (BufCacheElement) foo[i];
	  System.out.println("** got BufCacheElement " + bce);
	} else {
	  System.err.println("Bogus instance of returned from bufCache..");
	  System.out.println("foo is " + foo[i]);
	  System.exit(1);
	}
	gotnum++;
      }
    }
    System.out.println("doing flush");
    myCache.flush_entire_cache(true);
    System.exit(0);
  }
}

