/*
 * Author: Barbara Hohlt <hohltb@cs.berkeley.edu> 
 * Inception Date: July 27, 2000 
 *
 * This software is copyrighted by Barbara Hohlt and the Regents of
 * the University of California.  The following terms apply to all
 * files associated with the software unless explicitly disclaimed in
 * individual files.
 * 
 * The authors hereby grant permission to use this software without
 * fee or royalty for any non-commercial purpose.  The authors also
 * grant permission to redistribute this software, provided this
 * copyright and a copy of this license (for reference) are retained
 * in all distributed copies.
 *
 * For commercial use of this software, contact the authors.
 * 
 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
 * ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
 * DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  THIS SOFTWARE
 * IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE
 * NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
 * MODIFICATIONS.
 */

package ninja2.core.sn_btree.static_table;

import ninja2.core.io_core.interfaces.*;
import ninja2.core.io_core.interfaces.disk.*;
import ninja2.core.io_core.core.*;
import ninja2.core.io_core.util.*;
import ninja2.core.sn_btree.*;
import ninja2.core.sn_btree.lockmgr.*;
import ninja2.core.sn_btree.bufcache.*;
import ninja2.core.sn_btree.recordconverters.*;
import ninja2.util.*;

import java.util.*;
import java.io.IOException;

/**
 * The StaticTableEngine is a B-link tree 
 * 
 * 
 * @author Barbara Hohlt 
 */

public class StaticTableEngine implements TableEngineIF {
  public static final int         initial_num_buckets = 1; /* root node */ 
  private static int tablenum = 0;

  private int            this_table_num = 0;
  private SN_Btree  	_compTable = null;
  private LockMgr        globLockMgr = null;
  private BufCache       globBufCache = null;
  private DiskSegmentIF  globDSegIF = null;
  private MemAllocatorIF globMemAllocIF = null;
  private UpcallHandlerIF ht_upcall = null;
  private String        _tablename, _dataname, _overflowname; 
  //private int            num_buckets;

  private Object        _workAvailable = null;
  private int           _workOutstanding = 0;

  private TableMetadata   _tmd = null;
  // MARK: We probably don't require this
  private OverflowBitmap  _obm[] = null;

  private Hashtable       _lockRequestTable = null;  // outstanding lock reqs
  public Hashtable       _activeFSMTable = null;   // maps table:bucknum --> FSM
  private StaticHashFunction _shf = null;

/* MARK: This is a DUMMY create_sn_btree() which creates a
 * test btree with 9 nodes like the example in "Efficient Locking for 
 * Concurrent Operations on B-Trees", Lehman & Yao, Fig 7. 
 */
  public static void DUMMYcreate_sn_btree(String name, String dataname,
					 DiskSegmentIF globDSegIF,
					 MemAllocatorIF globMemAllocIF,
					 BufCache globBufCache,
					 int num_bucks)
    throws IOException {
	if (SN_Btree.debug) 
		System.out.println("Starting StaticTableEngine.create_sn_btree().");
    
    int num_buckets;

	/* for our purposes num_buckets is '9' ie for testing */ 
    if (num_bucks < initial_num_buckets)
      num_buckets = initial_num_buckets;
    else 
      num_buckets = num_bucks;

      num_buckets = 9;		/*  create 9 nodes */ 

    // table doesn't exist - create a new empty one - a long and
    // involved process, unfortunately.  These will bonk with an
    // IOException if the table already exists.
    boolean there = false;
    try {
      globDSegIF.stat(name);
      there = true;
    } catch (java.io.IOException ioe) {
    }
    if (there)
      throw new java.io.IOException("Table already exists");

    globDSegIF.create(name, 0);
    globDSegIF.create(dataname, 0);

    DiskSegmentWriterIF mainW, dataW; 
    mainW = globDSegIF.openForWrite(name, 0);
    dataW = globDSegIF.openForWrite(dataname, 0);


	int max_slots; 
	int blink[] = {0,2,0,4,5,6,0,8,0};
	for (int j=0; j<num_buckets; j++) {

	BtreeNode.BtreeEntry thbr, thbr_arr[];
   	MemRegionIF thb_m = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize)
	;

	// MARK: The nodes use only a few slots, the rest are dummies 
	// The root uses only 2.
	max_slots = (j==0) ? SN_Btree.max_root_slots : SN_Btree.max_node_slots;
    thbr_arr = new BtreeNode.BtreeEntry[max_slots];
	for (int i=0; i<max_slots; i++) 
	{
    	thbr = new BtreeNode.BtreeEntry(false);
    	thbr_arr[i] = thbr;
	}
	switch(j)
	{
		case 0 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)40,1); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)99,2); 
			break;
		case 1 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)25,-1); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)35,-1); 
			thbr_arr[2] = new BtreeNode.BtreeEntry(true,(long)40,3); 
			break;
		case 2 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)47,4); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)62,5); 
			thbr_arr[2] = new BtreeNode.BtreeEntry(true,(long)99,6); 
			break;
		case 3 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)36,-1); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)40,-1); 
			break;
		case 4 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)41,-1); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)47,-1); 
			break;
		case 5 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)51,7); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)56,8); 
			thbr_arr[2] = new BtreeNode.BtreeEntry(true,(long)57,-1); 
			thbr_arr[3] = new BtreeNode.BtreeEntry(true,(long)62,-1); 
			break;
		case 6 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)78,-1); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)99,-1); 
			break;
		case 7 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)48,0); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)51,1); 
			break;
		case 8 :
			thbr_arr[0] = new BtreeNode.BtreeEntry(true,(long)53,2); 
			thbr_arr[1] = new BtreeNode.BtreeEntry(true,(long)56,3); 
			break;
	}

	/* LSN=0 Node=j blink=blink LEAF=l slotsArray[] MemRegion */
	boolean l;
	l = (j==7 || j==8) ? true : false ;
	BtreeNode thb = null;
	/* set min/max in root node */
	if (j==0)
    {
    	thb = new BtreeRoot(0,j,blink[j],l,thbr_arr, thb_m);
		((BtreeRoot)thb).set_min(48,7);
    	((BtreeRoot)thb).set_max(56,8);
    	((BtreeRoot)thb).set_numnodes(9);
    	((BtreeRoot)thb).set_numblocks(4);
		// System.out.println("min ptr is "+thb.get_min_ptr());
		// System.out.println("max ptr is "+thb.get_max_ptr());
	} else {
    	thb = new BtreeNode(0,j,blink[j],l,thbr_arr, thb_m);
	}


    try {
	mainW.sync_enqueue(new GenericSinkElement(thb.get_byte_array(),
						  thb.get_byte_array().getSize(),
						  0,
						  null), null);
	if (mainW.num_in_queue() > 50) {
	  QueueIF fcomp = new Queue();
	  mainW.flush(fcomp, null);
	  fcomp.blocking_dequeue(0);
	}
      } catch (SinkDeadException sde) {
		  Debug.msg("Btree singlenode",
		  Debug.FATAL, "sink dead in constructor", sde,
		  Debug.FATAL);
		  System.exit(1);
      }
	} /* end for loop */


	/* add some dummy data to leaf nodes 7 & 8 */
	BtreeData btdcore[] = new BtreeData[4];
   	MemRegionIF datacore; 
	int block_num = -1;
	long datakey = -1;
	for (int i=0; i<4; i++)
	{
	datacore = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize);
	int[] record_arr = null; 
	switch (i)
	{
		case 0:
			block_num = 0;
			datakey = 48;
			int tmp0[] = {20,19,18,17,16};
			record_arr = tmp0; 
			break;
		case 1:
			block_num = 1;
			datakey = 51;
			int tmp1[] = {15,14,13,12,11};
			record_arr = tmp1; 
			break;
		case 2:
			block_num = 2;
			datakey = 53;
			int tmp2[] = {10,9,8,7,6};
			record_arr = tmp2; 
			break;
		case 3:
			block_num = 3;
			datakey = 56;
			int tmp3[] = {5,4,3,2,1};
			record_arr = tmp3; 
			break;
	}

	btdcore[i] = new BtreeData(0,block_num,datakey,record_arr,datacore);
	try {
      dataW.sync_enqueue(new GenericSinkElement(btdcore[i].get_byte_array(),
                        btdcore[i].get_byte_array().getSize(),
                        0,
                        null),
             null);

	  if (dataW.num_in_queue() > 50) {
      	QueueIF fcomp = new Queue();
      	dataW.flush(fcomp, null);
     	fcomp.blocking_dequeue(0);
      } 
    } catch (SinkDeadException sde) {
      Debug.msg("Btree singlenode",
        Debug.FATAL, "sink dead in constructor", sde,
        Debug.FATAL);
      System.exit(1);
    }
	} /* end adding Btree data */

      QueueIF tcompQ = new Queue();
      try {
		mainW.close(tcompQ, null);
		dataW.close(tcompQ, null);
      } catch (SinkDeadException sde) {
		Debug.msg("Btree singlenode",
		Debug.FATAL, "sink dead in constructor", sde,
		Debug.FATAL);
		System.exit(1);
      }
      int numdone = 0;
	  //MARK: not sure if while should test 2, 1 or 0 ??
      while (numdone != 2) {
		// System.out.println("numdone: "+numdone);
		QueueElementIF[] tcomps = tcompQ.blocking_dequeue(0);
	  if (tcomps != null)
	  	numdone += tcomps.length;
      }
	if (SN_Btree.debug)
		System.out.println("Finished StaticTableEngine.create_sn_btree().");
  }

/* MARK: This is the REAL create_sn_btree */
  public static void create_sn_btree(String name, String dataname,
					 DiskSegmentIF globDSegIF,
					 MemAllocatorIF globMemAllocIF,
					 BufCache globBufCache,
					 int num_bucks)
    throws java.io.IOException {
	if (SN_Btree.debug)
		System.out.println("Starting StaticTableEngine.create_sn_btree().");
    
    int num_buckets;

	// for our purposes num_buckets is '1' ie the root 
      num_buckets = 1;		/* just create root node */ 

    // table doesn't exist - create a new empty one - a long and
    // involved process, unfortunately.  These will bonk with an
    // IOException if the table already exists.
    boolean there = false;
    try {
      globDSegIF.stat(name);
      there = true;
    } catch (java.io.IOException ioe) {
    }
    if (there)
      throw new java.io.IOException("Table already exists");

    globDSegIF.create(name, 0);
  	globDSegIF.create(dataname, 0); 

    DiskSegmentWriterIF mainW, dataW; 
    mainW = globDSegIF.openForWrite(name, 0);
/* 	mainW = globDSegIF.openForWrite(dataname, 0); */

	BtreeNode.BtreeEntry thbr, thbr_arr[];
   	MemRegionIF thb_m = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize)
	;

/*	// MARK: For now the root node will start with the
	// min and max values. For a jump start. 
	// add other keys to root with a put 
    thbr_arr = new BtreeNode.BtreeEntry[SN_Btree.max_root_slots];

	thbr = new BtreeNode.BtreeEntry(true, 0 ,0); 
    thbr_arr[0] = thbr;
	
    thbr = new BtreeNode.BtreeEntry(true, 1000000 , 1); 
    thbr_arr[1] = thbr; 
*/ 

    thbr_arr = new BtreeNode.BtreeEntry[SN_Btree.max_root_slots];
	for (int i=0; i<SN_Btree.max_root_slots; i++) 
	{
    	thbr = new BtreeNode.BtreeEntry(false);
    	thbr_arr[i] = thbr;
	}
	// LSN=0 Node=0 Blink(N/A)=-1 LEAF=true slotsArray[] MemRegion 
    BtreeRoot thb = new BtreeRoot(0,0,-1,true,thbr_arr, thb_m);
	thb.set_numnodes(1); /* # nodes */
	thb.set_numblocks(0); /* data pages */
	thb.set_min(Long.MAX_VALUE,-1); /* init min value */ 
	thb.set_max(Long.MIN_VALUE,-1); /* init max value */ 
	
    try {
	mainW.sync_enqueue(new GenericSinkElement(thb.get_byte_array(),
						  thb.get_byte_array().getSize(),
						  0,
						  null), null);
	if (mainW.num_in_queue() > 50) {
	  QueueIF fcomp = new Queue();
	  mainW.flush(fcomp, null);
	  fcomp.blocking_dequeue(0);
	}
      } catch (SinkDeadException sde) {
		  Debug.msg("Btree singlenode",
		  Debug.FATAL, "sink dead in constructor", sde,
		  Debug.FATAL);
		  System.exit(1);
      }

/* add data pages 
	BtreeData btdcore[] = new BtreeData[2]; 
 	MemRegionIF datacore = null;
	for (int i=0; i<2; i++)
	{
	 
	datacore = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize);  
	switch(i)
	{
		case 0:
		
			btdcore[i] = 
				new BtreeData(0,0,0,null,datacore); 
			break;
		case 1:
			
			btdcore[i] = 
				new BtreeData(0,1,1000000,null,datacore); 
			break;
	}
	try {
      dataW.sync_enqueue(new GenericSinkElement( btdcore.get_byte_array(),
						btd.get_byte_array().getSize(), 0, null), null);

      if (dataW.num_in_queue() > 50) {
        QueueIF fcomp = new Queue();
        dataW.flush(fcomp, null);
        fcomp.blocking_dequeue(0);
      }
    } catch (SinkDeadException sde) {
      Debug.msg("Btree singlenode",
        Debug.FATAL, "sink dead in constructor", sde,
        Debug.FATAL);
      System.exit(1);
    } 
	} 
*/

      QueueIF tcompQ = new Queue();
      try {
		mainW.close(tcompQ, null);
/*	    dataW.close(tcompQ, null); */ 
      } catch (SinkDeadException sde) {
		Debug.msg("Btree singlenode",
		Debug.FATAL, "sink dead in constructor", sde,
		Debug.FATAL);
		System.exit(1);
      }
      int numdone = 0;
	  //MARK: not sure if while should test 2, 1 or 0 ??
      while (numdone != 1) {
		// System.out.println("numdone: "+numdone);
		QueueElementIF[] tcomps = tcompQ.blocking_dequeue(0);
	  if (tcomps != null)
	  	numdone += tcomps.length;
      }
	
  } /* end create_sn_btree */

	//MARK: wrapper for get
    public void get(SN_BtreeReadRequest rr) {
		get(rr,rr.id,0);
	}

    // MARK: This is the main btree search routine 
    public void get(SN_BtreeReadRequest rr, int id, int node) {
	int bucknum = node; // node to get
	ST_FSM fsm = new ST_FSM(_tablename, _dataname, 
		globLockMgr, globBufCache, globMemAllocIF, 
		_tmd, _obm, ST_FSM.GET_TYPE, rr, bucknum, id, _compTable); 

	if (SN_Btree.debug) 
		System.out.println("	GET:putting "+id+" into _activeFSMTable");
	fsm.fsm_state = ST_FSM.GET_FETCHING_DATA;
	_activeFSMTable.put(new Integer(id), fsm);
	_workOutstanding++;
	globBufCache.fetch_and_cache(_tablename, node, id); /* MARK: pass ID */

    }


  // MARK: This is a simple wrapper for put
  public void put(long key, int[] bytes, int id) {
      //System.out.println("starting StaticTableEngine.put()...");
      //int bucknum = 0;
      put(key, bytes, id, 0);
  }

    public void put(long key, int[] bytes, int id, int node) {
    //MARK: Should always start at Node 0. 

    // MARK: This is the main insert routine
    ST_FSM   fsm = new ST_FSM(_tablename, _dataname,
			      globLockMgr, globBufCache, globMemAllocIF,
			      _tmd, _obm,
			      ST_FSM.PUT_TYPE, key, node, 
			      bytes, id, ht_upcall, _compTable);

	if (SN_Btree.debug) 
		System.out.println("	PUT:putting "+id+" into _activeFSMTable");
	fsm.fsm_state = ST_FSM.PUT_FETCHING_DATA;
	_activeFSMTable.put(new Integer(id), fsm);
	_workOutstanding++;
	globBufCache.fetch_and_cache(_tablename, node, id); /* MARK: pass ID */

  }

  public void remove(long key, int[] bytes, int id) {
	int node = 0;
    ST_FSM   fsm = new ST_FSM(_tablename, _dataname,
			      globLockMgr, globBufCache, globMemAllocIF,
			      _tmd, _obm,
			      ST_FSM.DEL_TYPE, key, node, bytes, id);
    
	if (SN_Btree.debug) 
		System.out.println("	DEL:putting "+id+" into _activeFSMTable");
	fsm.fsm_state = ST_FSM.DEL_FETCHING_DATA;
	_activeFSMTable.put(new Integer(id), fsm);
    _workOutstanding++;
	globBufCache.fetch_and_cache(_tablename, node, id); /* MARK: pass ID */
  }

  public static void destroy_sn_btree(String name, String dataname,
					  DiskSegmentIF globDSegIF,
					  MemAllocatorIF globMemAllocIF,
					  BufCache globBufCache)
    throws java.io.IOException {

    globBufCache.done_with_segment(name, true);
    globDSegIF.destroy(name);
    globBufCache.done_with_segment(dataname, true);
    globDSegIF.destroy(dataname);
  }

  public StaticTableEngine(SN_Btree completionTable) 
    throws java.io.IOException {

    this_table_num = tablenum++;

    // initialize some private globals
    _compTable = completionTable;
    globLockMgr = _compTable.globLockMgr;
    globBufCache = _compTable.globBufCache;
    globDSegIF = _compTable.globDSegIF;
    globMemAllocIF = _compTable.globMemAllocIF;
    ht_upcall = _compTable.main_event_queue;
    _tablename = _compTable._tablename;
    _dataname   = _compTable._dataname;
//    _overflowname = _compTable._overflowname;

    _compTable.register_subtable(_tablename, this);
    _compTable.register_subtable(_dataname, this);
//    _compTable.register_subtable(_overflowname, this);

    _lockRequestTable = new Hashtable();
    _activeFSMTable = new Hashtable();
    _shf = new StaticHashFunction();

    // if table exists, great, otherwise bonk out
    DiskSegmentStat tableStat, overflowStat;
    try {
      // table exists, pull in the metadata
      tableStat = globDSegIF.stat(_tablename);
      overflowStat = globDSegIF.stat(_dataname);
      // overflowStat = globDSegIF.stat(_overflowname);

      // need to read in the tablemetadata from BtreeRoot 
      try {
		MemRegionIF mdcore = 
			globBufCache.fetch_sync_and_cache( _tablename, 0);
		BtreeRoot btr = new BtreeRoot(mdcore);
		_compTable.numnodes = btr.get_numnodes();	
		_compTable.numblocks = btr.get_numblocks();	
		_compTable.min_key = btr.get_min_key();	
		_compTable.min_ptr = btr.get_min_ptr();	
		_compTable.max_key = btr.get_max_key();	
		_compTable.max_ptr = btr.get_max_ptr();	

		// why??
		//globBufCache.write_back_cache(_tablename, 0, _btr.get_byte_array());

      } catch (java.io.IOException ioeinner) {
		Debug.msg("Btree singlenode",
		  Debug.FATAL, "static read of metadata failed..aborting ",
		  ioeinner, Debug.FATAL);
		System.exit(1);
      }

/* MARK: Don't have this stuff in Btrees!
      // need to read in the tablemetadata and obm bytes for this puppy
      try {
		MemRegionIF mdcore = 
			globBufCache.fetch_sync_and_cache( _tablename, 0);
			_tmd = new TableMetadata(mdcore);
			num_buckets = _tmd.get_num_buckets();
			globBufCache.write_back_cache(_tablename, 0, _tmd.get_byte_array());
			_obm = new OverflowBitmap[8];
	for (int i=0; i<8; i++) {
	  MemRegionIF obcore = globBufCache.fetch_sync_and_cache(
	    _overflowname, i
	    );
	  _obm[i] = new OverflowBitmap(obcore);
	  globBufCache.write_back_cache(_overflowname, i, 
					_obm[i].get_byte_array());
	}
      } catch (java.io.IOException ioeinner) {
	Debug.msg("Hashtable singlenode",
		  Debug.FATAL, "static read of metadata failed..aborting ",
		  ioeinner, Debug.FATAL);
	System.exit(1);
      }
*/

    } catch (java.io.IOException ioe) {
      // table doesn't exist yet
      throw new java.io.IOException("table " + _tablename + 
				    " doesn't exist");
    }

    // ok, we now know the table exists, and is on stable storage.
    _workAvailable = new Object();
  }


  public void sync_metadata() { 
	
	MemRegionIF mdcore;
	BtreeRoot btr = null;
	try {
		mdcore = globBufCache.fetch_sync_and_cache(_tablename,0);
		btr = new BtreeRoot(mdcore);
	} catch (IOException e) {
		e.printStackTrace();
	}

	btr.set_numnodes(_compTable.numnodes);
	btr.set_numblocks(_compTable.numblocks);
	btr.set_min(_compTable.min_key,_compTable.min_ptr);
	btr.set_max(_compTable.max_key,_compTable.max_ptr);

	globBufCache.write_back_cache(_tablename,0,btr.get_byte_array());
  }

  public void sync() {
	sync_metadata();

    globBufCache.flush_segment(_tablename, true);
    globBufCache.flush_segment(_dataname, true);
    // globBufCache.flush_segment(_overflowname, true);
  }

  public void close() {
    // wait for all outstanding work to finish.  I hope caller has already
    // waited for all completions before calling this, in which case the
    // common path will be to bypass the sleep() loop.
    int looptimes = 0;
    while (true) {
      if (_workOutstanding == 0)
	break;

      if (looptimes++ > 10) {
	try {
	  Thread.currentThread().sleep(15);
	  Thread.currentThread().yield();
	} catch (InterruptedException ie) {
	}
      }
      Thread.currentThread().yield();
    }
    cleanup();
  }

  /**
   * This gets called by an SN_Btree when it receives a LockGrant
   * destined for this table.
   */
  public void addLockGrant(LockGrant lg) {
    // get the FSM associated with this lockgrant
    if (lg.grant_type == LockGrant.TABLE_LOCK) {
      // handle table lock here
    } else {
      // handle bucket lock here
      ST_FSM fsm = null;

      fsm = (ST_FSM) _lockRequestTable.remove(new Integer(lg.reqid));
      if ((fsm == null) || (fsm.lock_request_id != lg.reqid)) {
	Debug.msg("Btree singlenode", Debug.ERR,
		  "unexpected lock grant for this node");
      }
      if (fsm != null) {
	// System.out.println("StaticTableEngine.addLockGrant(): putting "+lg.reqid+" into _activeFSMTable");
	_activeFSMTable.put(new Integer(lg.reqid), fsm);
	fsm.process_lockgrant(lg);
      } else {
	Debug.msg("Btree singlenode", Debug.ERR,
		  "dropping lock grant on floor...");
	System.exit(1);
      }
    }
  }

  public void addBufCacheElement(BufCacheElement bce ) {
    BtreeNode thb = null;
	Integer uniqueID;

	if (SN_Btree.debug) 
		System.out.println("starting StaticTableEngine.addBufCacheElement()...");
    try {
	  if (bce.region.getInt(4) == 0)
      	thb = new BtreeRoot(bce.region);
	  else
      	thb = new BtreeNode(bce.region);

    } catch (java.io.IOException ioe) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"Bogus bufcache element read in ste grab-work(): " + bce,
		ioe, Debug.FATAL);
      System.exit(1);
    }
    
    ST_FSM fsm = null;
    /* MARK: retrieve fsm by uniqueID  */
	uniqueID = bce.uniqueID.isEmpty() ? 
			new Integer(-1) : (Integer)bce.uniqueID.remove(0);
	if (SN_Btree.debug)
    	System.out.println("	getting fsm "+uniqueID+ " from _activeFSMTable");
    fsm = (ST_FSM) _activeFSMTable.get(uniqueID); 

    if (fsm == null) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"unexpected bce for this bucket: " + bce +
		", the thb is: " + thb);
      globBufCache.flush_entire_cache(true);
      System.exit(1);
	
    }

    try {

      if (fsm.fsm_type == ST_FSM.GET_TYPE) {
	SN_BtreeReadComplete rc = fsm.get_process_bce(thb);
	if (rc != null) {
	   // System.out.println("	SN_BtreeReadComplete.success = "  + new Boolean(rc.success)); 
	  clean_fsm(thb, rc); } 

      } else if (fsm.fsm_type == ST_FSM.DEL_TYPE) {
	SN_BtreeRemoveComplete dc = fsm.remove_process_bce(thb);
	if (dc != null) {
	   // System.out.println("	SN_BtreeRemoveComplete.success = "  + new Boolean(dc.success)); 
	  clean_fsm(thb, dc); } 

      } else if (fsm.fsm_type == ST_FSM.PUT_TYPE) {
	SN_BtreeWriteComplete wc = fsm.put_process_bce(thb);
	if (wc != null) {
	   if (SN_Btree.debug) 
			System.out.println("	SN_BtreeWriteComplete.success = " 
										+  new Boolean(wc.success)); 
	  clean_fsm(thb, wc); }
      } else if (fsm.fsm_type == ST_FSM.DEL_TYPE) {
	SN_BtreeRemoveComplete rc = fsm.remove_process_bce(thb);
	if (rc != null) {
	  clean_fsm(thb, rc); }
      }
    } catch (java.io.IOException ioe) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"FSM io exception!",
		ioe, Debug.FATAL);
      System.exit(1);
    }
	fsm = null;
	bce = null;
	thb = null;
  }

  private void clean_fsm(BtreeNode thb, QueueElementIF completion) {
    ST_FSM fsm = null;

	int i = 0;
	if (completion instanceof SN_BtreeReadComplete)
		i = ((SN_BtreeReadComplete)completion).uniqueID;
	else if (completion instanceof SN_BtreeWriteComplete)
		i = ((SN_BtreeWriteComplete)completion).uniqueID;
	else /* (completion instanceof SN_BtreeRemoveComplete) */
		i = ((SN_BtreeRemoveComplete)completion).uniqueID;
	 
    fsm = (ST_FSM) _activeFSMTable.remove(new Integer(i));
    _compTable.addcompletion(completion);
    globLockMgr.release_bucket_lock(fsm.lock_request_id,
				    _tablename,
				    thb.get_main_bucket_num());
    synchronized(_workAvailable) {
      _workOutstanding--;
    }
  }

  private void cleanup() {
    // thread is asked to kill self - cleanup sources and exit.
    _compTable.deregister_subtable(_tablename);
    _compTable.deregister_subtable(_dataname);
//    _compTable.deregister_subtable(_overflowname);
    globBufCache.done_with_segment(_tablename, true);
    globBufCache.done_with_segment(_dataname, true);
//    globBufCache.done_with_segment(_overflowname, true);
  }
}

