/* Author: Barbara Hohlt <hohtlb@cs.berkeley.edu> 
 * Inception Date: July 27, 2000 
 *
 * This software is copyrighted by Barbara Hohlt and the Regents of
 * the University of California.  The following terms apply to all
 * files associated with the software unless explicitly disclaimed in
 * individual files.
 * 
 * The authors hereby grant permission to use this software without
 * fee or royalty for any non-commercial purpose.  The authors also
 * grant permission to redistribute this software, provided this
 * copyright and a copy of this license (for reference) are retained
 * in all distributed copies.
 *
 * For commercial use of this software, contact the authors.
 * 
 * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
 * FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
 * ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
 * DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 *
 * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE, AND NON-INFRINGEMENT.  THIS SOFTWARE
 * IS PROVIDED ON AN "AS IS" BASIS, AND THE AUTHORS AND DISTRIBUTORS HAVE
 * NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR
 * MODIFICATIONS.
 */

package ninja2.core.sn_btree.static_table;

import ninja2.core.io_core.interfaces.*;
import ninja2.core.io_core.interfaces.disk.*;
import ninja2.core.io_core.core.*;
import ninja2.core.io_core.util.*;
import ninja2.core.sn_btree.*;
import ninja2.core.sn_btree.lockmgr.*;
import ninja2.core.sn_btree.bufcache.*;
import ninja2.core.sn_btree.recordconverters.*;
import ninja2.util.*;
import java.util.*;
import java.io.IOException;
import java.lang.InterruptedException;
import java.lang.ArrayIndexOutOfBoundsException;

/**
 * A ST_FSM captures the state of a particular task through an FSM,
 * as well as declaring the states that are possible
 * 
 * @author Barbara Hohlt 
 */

public class ST_FSM {

  public static final int GET_TYPE = 0, PUT_TYPE = 1, DEL_TYPE = 2;

  public static final int LT=0, LE=1, GT=2, GE=3, EQ=4, GE_MIN=5, LE_MAX=6;

  public static final int GET_REQUEST_RECEIVED = 0,
                          GET_FETCHING_DATA = 1,
						  GET_MOVING_RIGHT = 11;
			
  
  public static final int PUT_REQUEST_RECEIVED = 2,
                          PUT_FETCHING_DATA = 3,
      					  PUT_REBALANCING = 4,
      					  PUT_GOING_DOWN = 8,
      					  PUT_MOVING_RIGHT = 9,
      					  PUT_DOING_INSERTION = 10,
      					  PUT_MOVING_UP = 11;
  
  public static final int DEL_REQUEST_RECEIVED = 5,
                          DEL_FETCHING_DATA = 6,
                          DEL_DOING_DELETION = 12,
                          DEL_REBALANCING = 7;
  
  public String    segname;
  public String    overflowname;
  public String    dataname;
  public LockMgr   globLockMgr;
  public BufCache  globBufCache;
  public MemAllocatorIF globMemAllocIF;
  public TableMetadata   _tmd;
  public OverflowBitmap  _obm[];
  public int       fsm_type;
  public int       fsm_state;
  public Vector    blocks_read;
  public Vector    bucket_records_read;
  public long      key;
  public int	   ptr;
  public long      min_key;
  public int	   min_ptr;
  public int	   min_pred;
  public long      max_key;
  public int	   max_ptr;
  public int	   max_pred;
  public int[]     get_bytes; // MARK: data are int IDs 
  public int       bucket;   // 0 is root 
  public int       lock_request_id;
  public int[]     put_bytes; // assume the data to put are integers
  public int[]     del_bytes; // assume the data to del are integers
  public LockGrant lock_held;

  // put/del specific data
  public int[]		old_data = null;
  public long		aKey;
  public int 		aPtr;  // points to A child, <-> 'w' in L&Y 
  public int 		newptr;  // points to new child, <-> 'w' in L&Y 
  public int      	old_data_vecoffset;
  public Stack		traversed_nodes = null;
  public UpcallHandlerIF ht_upcall = null;
  public SN_Btree	sn_btree;
  public BtreeData	update_btd; 

  /**
   * This is the constructor for put requests, modified to handle UpcallHandler
   */
  public ST_FSM(String segment_name, String data_name,
		LockMgr lm, BufCache bc, MemAllocatorIF ma,
		TableMetadata tmd, OverflowBitmap obm[],
		int type, long request_key, int request_bucket,
		int[] put_bs, int lock_reqid, UpcallHandlerIF upcall, SN_Btree sn_Btree) {

    // initialize the machine's state
    segname = segment_name;
    dataname = data_name;
    //overflowname = of_name;
    globLockMgr = lm;
    globBufCache = bc;
    globMemAllocIF = ma;
    _tmd = tmd;
    _obm = obm;
    if (type == PUT_TYPE) {
      fsm_type = type;
      fsm_state = PUT_REQUEST_RECEIVED;
      put_bytes = put_bs;
      ptr = -1; // link ptr, used by create-thb, 
				// -1 indicates that it is pointing nowhere 
    } else {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"Bogus type for this put constructor, dummy");
      System.exit(1);
    }

    blocks_read = new Vector();
    // bucket_records_read = new Vector();
    lock_held = null;
    key = request_key;
    bucket = request_bucket;
    lock_request_id = lock_reqid;
    ht_upcall=upcall;
    traversed_nodes = new Stack();
    sn_btree = sn_Btree;
	min_pred = -1;
    //numnodes = 0; // change later!!!
  }


/**
   * This is the constructor for del requests, i.e. requests
   * that don't have large byte[] baggage associated with them.
   */

  public ST_FSM(String segment_name, String data_name,
		LockMgr lm, BufCache bc, MemAllocatorIF ma,
		TableMetadata tmd, OverflowBitmap obm[],
		int type, long request_key, int request_bucket,
		int[] del_bs, int lock_reqid ) {

    // initialize the machine's state
    segname = segment_name;
    //overflowname = of_name;
    dataname = data_name;
    globLockMgr = lm;
    globBufCache = bc;
    globMemAllocIF = ma;
    _tmd = tmd;
    _obm = obm;
    if (type == DEL_TYPE) {
      fsm_type = type;
      fsm_state = DEL_REQUEST_RECEIVED;
      del_bytes = del_bs;
      ptr = -1; // link ptr, used by create-thb, 
				// -1 indicates that it is pointing nowhere 
    } else {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"Bogus type for this delete constructor, dummy");
      System.exit(1);
    }

    blocks_read = new Vector();
    bucket_records_read = new Vector();
    lock_held = null;
    key = request_key;
    bucket = request_bucket;
    lock_request_id = lock_reqid;
	traversed_nodes = new Stack();
  }


  /**
   * This is the constructor for get requests, i.e. requests
   * that don't have large byte[] baggage associated with them.
   */
  public ST_FSM(String segment_name, String data_name,
		LockMgr lm, BufCache bc, MemAllocatorIF ma,
		TableMetadata tmd, OverflowBitmap obm[],
		int type, SN_BtreeReadRequest rr, int request_bucket,
		int lock_reqid, SN_Btree sn_Btree ) {

    // initialize the machine's state
    segname = segment_name;
    //overflowname = of_name;
    dataname = data_name;
    globLockMgr = lm;
    globBufCache = bc;
    globMemAllocIF = ma;
    _tmd = tmd;
    _obm = obm;
    if (type == GET_TYPE) {
      fsm_type = type;
      fsm_state = GET_REQUEST_RECEIVED;
    } else if (type == DEL_TYPE) {
      fsm_type = type;
      fsm_state = DEL_REQUEST_RECEIVED;
    } else {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"Bogus type for this get constructor, dummy");
      System.exit(1);
    }

    blocks_read = new Vector();
    bucket_records_read = new Vector();
    lock_held = null;
    min_key = rr.min_key; 
    max_key = rr.max_key; 
    min_pred = rr.min_pred; 
    max_pred = rr.max_pred; 
    bucket = request_bucket;
    lock_request_id = lock_reqid;
    sn_btree = sn_Btree;
  }

  /**
   * Here's where the FSM receives lock grant events, and pushes itself
   * onwards accordingly.
   */
  public void process_lockgrant(LockGrant lg) {
      // System.out.println("ST_FSM.process_lockgrant() begin");
    if (lock_held != null) {
      Debug.msg("Btree singlenode", Debug.FATAL,
		"lockgrant given at bad time. :(");
      System.exit(1);
    }
    lock_held = lg;
    old_data = null;

    // in all cases, transition and read bucket
    switch(fsm_type) {
      case GET_TYPE:
	if (fsm_state != GET_REQUEST_RECEIVED) {
	  Debug.msg("Btree singlenode", Debug.FATAL,
		    "get FSM, but not in GET_REQUEST_RECEIVED");
	  System.exit(1);
	}
	fsm_state = GET_FETCHING_DATA;
	break;
      case PUT_TYPE:
	  if (fsm_state != PUT_REQUEST_RECEIVED) {
	  Debug.msg("Btree singlenode", Debug.FATAL,
		    "put FSM, but not in PUT_REQUEST_RECEIVED");
	  System.exit(1);
	}
	fsm_state = PUT_FETCHING_DATA;
	break;
      case DEL_TYPE:
	if (fsm_state != DEL_REQUEST_RECEIVED) {
	  Debug.msg("Btree singlenode", Debug.FATAL,
		    "get FSM, but not in DEL_REQUEST_RECEIVED");
	  System.exit(1);
	}
	fsm_state = DEL_FETCHING_DATA;
	break;
    }

    // System.out.println("ST_FSM.process_lock_grant() calls fetch_and_catch()");
    // because it's a btree the node (bucket) should always be 0 (root)
    globBufCache.fetch_and_cache(segname, bucket, lock_request_id ); 
//    Thread.currentThread().yield();
  }


  /**
   * Here's where a PUT FSM receives a bufcacheelement, and pushes itself
   * onwards accordingly. If this event triggers the completion of
   * the FSM, it returns a non-null SN_BtreeWriteComplete.  Otherwise,
   * it returns null. A java.io.IOException is thrown if this element
   * shouldn't have been dispatched to this FSM, and therefore the FSM
   * breaks.
   *
   * i.e. insert routine
   */
    public SN_BtreeWriteComplete put_process_bce(BtreeNode el)
					throws IOException {
	if (SN_Btree.debug) System.out.println("starting ST_FSM.put_process_bce()...");
	int t;	   /* node number */
	boolean l; /* leaf indicator */
	if ( fsm_state != PUT_FETCHING_DATA && 
		 fsm_state != PUT_DOING_INSERTION && 
		 fsm_state != PUT_MOVING_UP ) { 
	    throw new IOException("not in PUT_TYPE");
	}

	switch(fsm_state)
	{
	case PUT_FETCHING_DATA:
		t = el.get_main_bucket_num(); 
		l = el.get_is_leaf_node();
		
		//MARK: t == root
		if (t==0 && l == true)
		{
			fsm_state = PUT_DOING_INSERTION;
			return doInsertion(el);
		}
			
		min_key = key;
		bucket = find_min_node(el);	/* like scan_node() */
		if (bucket  >= 0 ) /* if not blink ptr */ 
		{
			// System.out.println("		push "+t);
			traversed_nodes.push(new Integer(t));
		}

		if (bucket == 0 || bucket == -1) 
		{
			el = null;
			return null;	/* more work to do */
		}
		else {
	       /* We're finally where we want to be....
	 	*   el is some leaf node pointing to our desired
		*   data insertion point 
 	 	*/ 
										/* back up one */
	    	bucket = ((Integer) traversed_nodes.pop()).intValue(); 
			fsm_state = PUT_DOING_INSERTION;
			return doInsertion(el);
		}
	case PUT_DOING_INSERTION:
	case PUT_MOVING_UP:
		return doInsertion(el);
	}
		el = null;
		return null;
    }

  /**
   * Here's where a REMOVE FSM receives a bufcacheelement, and pushes itself
   * onwards accordingly.    If this event triggers the completion of
   * the FSM, it returns a non-null SN_HashtableRemoveComplete.  Otherwise,
   * it returns null.  A java.io.IOException is thrown if this element
   * shouldn't have been dispatched to this FSM, and therefore the FSM
   * breaks.
   */
    public SN_BtreeRemoveComplete remove_process_bce(BtreeNode el)
					throws IOException {
	if (SN_Btree.debug) System.out.println("starting ST_FSM.remove_process_bce()...");
	SN_BtreeReadComplete rc;
	if ( fsm_state != DEL_FETCHING_DATA) { 
	    throw new IOException("not in DEL_TYPE");
	}

		min_key = key;
		rc = search(el);	

		if (rc == null) {
      		return null;		/* more work to do */
		} else {
	   /* We're finally where we want to be....
	 	*   el is some leaf node pointing to our desired
		*   data deletion point 
 	 	*/ 
			fsm_state = DEL_DOING_DELETION;
			return doDeletion(rc.value);
		}
    }


    /* 	reads all records plus key to insert
		into Vector bucket_records_read 
	 */
	public boolean insertKey(BtreeNode el) 
		throws IOException {
	int i,t;
	BtreeNode.BtreeEntry thbr = null;
	BtreeNode.BtreeEntry newRec = null;
	bucket_records_read = new Vector();	
	int num_recs = el.get_num_bucket_records(); 
	int num_used = el.get_num_buckets_used();
	long reckey = 0; 
	if (SN_Btree.debug)
		System.out.println("Starting insertKey()...");

	/* add key & data if this is an empty leaf */
	if (num_used == 0 && el.get_is_leaf_node())
	{
		if (SN_Btree.debug)
			System.out.println("Inserting very first record...");
		
		newRec = insertData(el);
		bucket_records_read.addElement(newRec);
		return true;
	}
		
	MemRegionIF data; 
	BtreeData btd ; 
	for (i=0; i<num_recs; i++) {
		data = null;
		btd = null;
	    thbr = el.get_bucket_record(i); 
	    // probably useful for deletions
	    if (thbr.get_is_used()) { 
		reckey = thbr.get_record_key();
		if (reckey >= key) { 
		    // We have found a proper place to insert
		    if (reckey == key) {
				if (SN_Btree.debug)
					System.out.println("Key already exists for key = " 
			    		+reckey +" Replacing ptr/appending new data");
				if (el.get_is_leaf_node())
				{
					/* update leaf node and data page */					

					/* get data which is pointed to by leaf node */
      				data = globBufCache.fetch_sync_and_cache(dataname,
														thbr.get_ptr());
      				btd = new BtreeData(data);
					int block_num = btd.get_block_num();
					if (SN_Btree.debug)
						System.out.println("	data block "+block_num);
					old_data = btd.get_record_data();
					if (SN_Btree.debug)
						System.out.println("	old data is null!");
					btd.append_record_data(put_bytes); 
				    /* write data to disk */	
					globBufCache.write_through_cache( dataname, 
							block_num, btd.get_byte_array()); 

					newRec = new BtreeNode.BtreeEntry(true, key, block_num); 
			
				} else if (fsm_state == PUT_MOVING_UP) {
					if (aPtr != thbr.get_ptr()) {
						System.out.println("ST_FSM.insertKey(): FATAL ERROR while PUT_MOVING_UP");
						System.exit(-1);
					}
					/* update two slots due to split in child */
					newRec = new BtreeNode.BtreeEntry(true, aKey, aPtr); 
					bucket_records_read.addElement(newRec);
					newRec = new BtreeNode.BtreeEntry(true, key, newptr); 
				} else {
					/* update this slot */
					newRec = new BtreeNode.BtreeEntry(true, key, newptr); 
				}

				/* add the new record */
				bucket_records_read.addElement(newRec);
				break; // out of the for loop 
		    } else {
				if (SN_Btree.debug)
				{
					System.out.println("Inserting the key now " + key +
									" node "+ el.get_main_bucket_num());
				if (fsm_state == PUT_MOVING_UP)
					System.out.println("ST_FSM.insertKey(): WARNING while PUT_MOVING_UP");
				}
				if (el.get_is_leaf_node())
				{
					/* insert the key into the node and add new data page */
					newRec = insertData(el);
				} else if (fsm_state == PUT_MOVING_UP) {
					if ((aPtr != thbr.get_ptr()) && 
							(key != sn_btree.max_key)) {
						System.out.println("ST_FSM.insertKey(): FATAL ERROR while PUT_MOVING_UP updating max_key");
						System.exit(-1);
					}
					/* update two slots due to split in child */
					/* special case when high_key > sn_btree.max_key */
					newRec = new BtreeNode.BtreeEntry(true, aKey, aPtr); 
					thbr.set_ptr(newptr);
				} else {
					newRec = new BtreeNode.BtreeEntry(true, key, newptr);
				}
				
				bucket_records_read.addElement(newRec);
				bucket_records_read.addElement(thbr);
				break; // out of the for loop
		    }
		} else { 
			bucket_records_read.addElement(thbr); 
		}
	    }
	} /* end for loop */
	
	for (i++; i<num_recs; i++) { // add the remaining records
	    thbr = el.get_bucket_record(i);
		if (thbr.get_is_used()) {
			reckey = thbr.get_record_key();
	    	bucket_records_read.addElement(thbr); 
		}
	}

    // We now have to check if we have to follow blinks
    // i.e. move right
    if (reckey < key) {
		int blink = el.get_overflow_bucket_num();

		/* first check if key > max_key in btree */
		if ( (blink == -1) && (key > sn_btree.max_key) )
		{
			if (el.get_is_leaf_node())
			{
				/* insert the key into the node and add new data page */
				newRec = insertData(el);
			} else {
				newRec = new BtreeNode.BtreeEntry(true, key, newptr);
			}
	    	bucket_records_read.addElement(newRec); 
			newRec = null;

			return true;
		}

		bucket = blink; 
		if ((bucket == 0) || (bucket == -1))
		{
			System.out.println("ST_FSM.insertKey()....FATAL ERROR");
			System.exit(-1); /* error */
		} else {
			if (SN_Btree.debug) 
				System.out.println("		following blink: "+bucket);
			globBufCache.fetch_and_cache(segname, bucket, lock_request_id); 
			return false; 
		}
	} 

    // We have successfully inserted the key 
	// in the node
		return true;
 }

	/* insertData
     * creates a new data page and inserts data
     * returns the BtreeEntry slot
     */
	private BtreeNode.BtreeEntry insertData(BtreeNode el)
	{
		if (SN_Btree.debug)
			System.out.println("Inserting the data for " + key +
							" node "+ el.get_main_bucket_num());
		if (el.get_is_leaf_node())
		{
			/* insert the key into the node and add new data page */

			int nextblock = sn_btree.numblocks;
			sn_btree.numblocks++;
			MemRegionIF datacore = 
				globMemAllocIF.allocateRegion(SN_Btree.table_blocksize);

			/* update Btree min/max values if necessary */ 
			checkMetaData(el.get_main_bucket_num(),key);

			BtreeData btd = new BtreeData(0,nextblock,key,
										put_bytes,datacore);

			datacore = null;

		    /* write data to cache and disk */	
			globBufCache.write_through_cache( dataname, 
								nextblock, btd.get_byte_array());  

			btd = null;
			/* avoid race condition TESTING */
			/* Thread.currentThread().yield(); */

			return (new BtreeNode.BtreeEntry(true, key, nextblock));
		} else {
			System.out.println("ST_FSM.insertData()....WARNING NOT LEAF");
			return null;
		}
   }

	/* 	MARK: We differ from Lehman Yao in that we do insertion 
    ** 	irrespective of whether data for a certain key is present.
	**  Data is appended to the data page.
	*/
    public SN_BtreeWriteComplete doInsertion (BtreeNode el)
										throws IOException {
		if ( fsm_state != PUT_DOING_INSERTION &&
			 fsm_state != PUT_MOVING_UP ) { 
	    throw new IOException("not in PUT_DOING_INSERTION");
		}

		BtreeNode.BtreeEntry thbr = null;
		bucket_records_read = null;
		bucket_records_read = new Vector();	
		if (SN_Btree.debug) 
			System.out.println("ST_FSM.doInsertion(): num slots used " +
											el.get_num_buckets_used());
	    // we are safe
	    // if (num_used < max_slots) 
		int max_slots = 
			(el.get_main_bucket_num()==0) ?  
				SN_Btree.max_root_slots : SN_Btree.max_node_slots ; 

		if (!insertKey(el))
			return null;	/* move right */

	    int num_used = bucket_records_read.size(); 
		/* max_slots = 4;	for TESTING */
	    if (num_used <= max_slots) 
			return simplePut(el);	/* return SN_BtreeWriteComplete */
		else {
			if (SN_Btree.debug)
				System.out.println("ST_FSM.doInsertion()...SPLITTING");

			// We are not safe and have to split the node.
			// Rearrange this node into nodes A and B.
			/* SPLIT */

			// MARK: these two statements need to be atomic!
			int nextNode = sn_btree.numnodes;   
	    	sn_btree.numnodes++; 		

			// create A and B

	  		if (SN_Btree.debug) 
				System.out.println("	node "+bucket+" has "
					+num_used+" records.");
			int half = bucket_records_read.size()/2; 

	    	aKey = ((BtreeNode.BtreeEntry) bucket_records_read.elementAt(half-1)).get_record_key(); // A's key
			aPtr = el.get_main_bucket_num(); // A's ptr
	    	key = ((BtreeNode.BtreeEntry) bucket_records_read.lastElement()).get_record_key(); // B's key 
	    	newptr =  nextNode;	// ptr to B
			if (key == sn_btree.max_key && el.get_is_leaf_node()) 
					sn_btree.max_ptr = newptr;

			/* root is a special case,
			 * it needs to have a parent before
			 * it can "split"
			 */ 	
			if (el.get_main_bucket_num() == 0)
			{
				aPtr = sn_btree.numnodes;
	    		sn_btree.numnodes++; 	

				// Create new Root node
				BtreeRoot root  = create_root((BtreeRoot)el);
	    		globBufCache.write_back_cache(segname, 0, root.get_byte_array()); 
			}
		// First Create B
		// start index, end index, node, blink, isleaf
			BtreeNode B = 
				create_thb(half, bucket_records_read.size(), nextNode,
						el.get_overflow_bucket_num(), el.get_is_leaf_node());
	    	globBufCache.write_back_cache(segname,nextNode,B.get_byte_array()); 

		// Then Create A
		// start index, end index, node, blink, isleaf
			BtreeNode A = create_thb(0,half, aPtr, nextNode, el.get_is_leaf_node());

		// invariant: bucket = el.get_bucket_num, replace with this later
	    	globBufCache.write_back_cache(segname, aPtr, A.get_byte_array()); 


		// finished writing back A and B
		A = null;
		B = null;
	    
		// Get Parent Node for inserting new ptr to Node B
		// and updating new high key for Node A
		if (el.get_main_bucket_num() != 0)
		{
	    	bucket = ((Integer) traversed_nodes.pop()).intValue(); // Parent ptr
			//System.out.println("			pop "+bucket);
			//System.out.println("			getting parent for update..."+bucket);
	    	fsm_state = PUT_MOVING_UP;  
	    	globBufCache.fetch_and_cache(segname, bucket, lock_request_id);

	    	return null;
		} else {
	    	SN_BtreeWriteComplete hwc = new SN_BtreeWriteComplete();
	    	hwc.uniqueID = lock_request_id;
	    	hwc.success = true;
	    	hwc.table = segname;
	    	hwc.key = key;
	    	hwc.old_value = old_data; 
	    	hwc.new_value = put_bytes;

			old_data = null;
			put_bytes = null;
			traversed_nodes = null;
			ht_upcall = null;
			sn_btree = null;
			blocks_read = null;
			bucket_records_read = null;
			lock_held = null;

	    	return hwc;
			
		}

	    } /* end else split */
    } 

  /**
   * Here's where a GET FSM receives a bufcacheelement, and pushes itself
   * onwards accordingly.  If this event triggers the completion of
   * the FSM, it returns a non-null SN_BtreeReadComplete.  Otherwise,
   * it returns null and another SN_BtreeReadRequest is queued. 
   * A java.io.IOException is thrown if this element
   * shouldn't have been dispatched to this FSM, and therefore the FSM
   * breaks.
   */
  public SN_BtreeReadComplete get_process_bce(BtreeNode el) 
   throws IOException {
	if (SN_Btree.debug) 
		System.out.println("starting ST_FSM.get_process_bce()...");
	switch (min_pred)
	{
		/* x == some value */
		case ST_FSM.EQ:
			key = min_key;
			return search(el);
		case ST_FSM.GE_MIN:
		/* x < some value */
			if (el.get_main_bucket_num() == 0)
			{
				min_key = sn_btree.min_key;
				min_ptr = sn_btree.min_ptr; 
				min_pred = ST_FSM.GE;

				/* btree MIN <= x < max_key */
				fsm_state = GET_MOVING_RIGHT;	/* scanning left to right */
				/* MARK:   careful, test tree not fully populated! */ 
				globBufCache.fetch_and_cache(segname, min_ptr, lock_request_id);
				return null; /* more work to do */
			} else {
				if (SN_Btree.debug)
					System.out.println("Error in get_process_bce().");
				System.exit(-1);
			}
		
		case ST_FSM.GT:
		case ST_FSM.GE:
			/* x > some value */ 
			if (max_pred == ST_FSM.LE_MAX)
			{
				max_key = sn_btree.max_key;
				max_ptr = sn_btree.max_ptr; /* maybe useful to know */
				max_pred = ST_FSM.LE;
			} else {
				/* min_key < x < max_key */
			}
		
			if ( (el.get_main_bucket_num()==0) && 
								(el.get_is_leaf_node() == true) )
			{
				/* already there! */	
				fsm_state = GET_MOVING_RIGHT;
			}
			else if (fsm_state == GET_FETCHING_DATA)
			{
				min_ptr = find_min_node(el);
				if (min_ptr <= 0) /* child ptr or blink ptr */
					return null; /* more work to do */
				else
					fsm_state = GET_MOVING_RIGHT;
			}
		default:			
			/* We're finally where we want to be....
			 * 	el is some leaf node pointing to our desired data 
			 */
			return between(el);	
	}
  }
  
  /* reads leaf nodes between min_key and max_key */ 
  protected SN_BtreeReadComplete between(BtreeNode el) 
		throws IOException {
	  
	  // System.out.println("ST_FSM.between()...this node is: "+el.get_main_bucket_num());
	  // System.out.println("		"+min_key+":"+max_key);
      if ( fsm_state != GET_MOVING_RIGHT ) {
      		throw new IOException("not in GET_MOVING_RIGHT");
      }

	  if (!el.get_is_leaf_node())
	  {
		System.out.println("ST_FSM.between(): FATAL ERROR not a leaf node");
		System.exit(-1);
	  }

	  /* use move-right routine here */
	  bucket = read_node(el);
	  if (bucket == 0 || bucket == -1) {
	  	SN_BtreeReadComplete shrc = new SN_BtreeReadComplete();
	  	shrc.uniqueID = lock_request_id;
	  	shrc.table = segname;
	  	shrc.success = true;
	  	shrc.key = key;
	  	shrc.value = get_bytes; 

	  	// System.out.println("		min ptr: "+min_ptr+" max_ptr: "+max_ptr);
	  	return shrc;

	  } else {
		fsm_state = GET_MOVING_RIGHT; 
		globBufCache.fetch_and_cache(segname, bucket, lock_request_id); 
		return null; /* more work to do */
	  }
  } 
	    
    /* gathers data from a single leaf node between min_key and max_key */
    protected int read_node (BtreeNode el) 
			throws IOException {
	int num_records = el.get_num_bucket_records();
	long reckey = -1;
	boolean begin = false;
	BtreeNode.BtreeEntry thbr = null;
	BtreeData btd = null;
	MemRegionIF data = null;
	int thisNode = el.get_main_bucket_num();
	if (SN_Btree.debug)
		System.out.println("starting ST_FSM.read_node() node"+thisNode);

	/* First, start at the first leaf node and find the first value */
	int i = 0;
	if (el.get_main_bucket_num() == min_ptr) {
	for ( ; ((begin==false) && (i<num_records)) ; i++) {
	    thbr = el.get_bucket_record(i); 
		reckey = thbr.get_record_key();
	    if (thbr.get_is_used()) { 
			switch(min_pred)
			{
			  case ST_FSM.GT:
				if (reckey > min_key) 
				{
 					/* get data which is pointed to by leaf node, synchronous */
      				if(SN_Btree.debug)
        				System.out.println("ST_FSM.read_node() getting..."
												+dataname+" key "+reckey);
      				data = globBufCache.fetch_sync_and_cache(dataname,
												thbr.get_ptr());
      				btd = new BtreeData(data); 
					get_bytes = btd.get_record_data(); 

					begin = true;
				}
				break;
			  case ST_FSM.GE:
				if (reckey >= min_key) 
				{
 					/* get data which is pointed to by leaf node, synchronous */
      				if(SN_Btree.debug)
        				System.out.println("ST_FSM.read_node() getting..."
												+dataname+" key "+reckey);
      				data = globBufCache.fetch_sync_and_cache(dataname,
												thbr.get_ptr());
      				btd = new BtreeData(data); 
					get_bytes = btd.get_record_data(); 

					begin = true;
				}
				break;
			}
		}
	}
	} /* end of begin processing */

	/* Second, process data < max_key */
	for (; i<num_records; i++) {
	    thbr = el.get_bucket_record(i);
		reckey = thbr.get_record_key();
	    if (thbr.get_is_used()) { 
			switch(max_pred)
			{
			  case ST_FSM.LT:
				if (reckey < max_key) 
				{
 					/* get data which is pointed to by leaf node, synchronous */
      				if(SN_Btree.debug)
        				System.out.println("ST_FSM.read_node() getting..."
												+dataname+" key "+reckey);
      				data = globBufCache.fetch_sync_and_cache(dataname,
												thbr.get_ptr());
      				btd = new BtreeData(data); 
					get_bytes = concatList( get_bytes, btd.get_record_data()); 

					break;
				}
				else
					return -1;
			  case ST_FSM.LE:
				if (reckey <= max_key) 
				{
 					/* get data which is pointed to by leaf node, synchronous */
      				if(SN_Btree.debug)
        				System.out.println("ST_FSM.read_node() getting..."
												+dataname+" key "+reckey);
      				data = globBufCache.fetch_sync_and_cache(dataname,
												thbr.get_ptr());
      				btd = new BtreeData(data); 
					get_bytes = concatList( get_bytes, btd.get_record_data()); 

					break;
				}
				else
					return -1;
			}
		}
	}
	// we scanned the node, and collected the data, now return blink 
	return el.get_overflow_bucket_num();
    }

  /* looks for a leaf node which contains the smallest key >= min_key 
   * returns: 
   *	0 = follow next ptr (moving down) 
   * 	-1 = follow blink (moving right)
   *    pos value = ptr to leaf 
   * exits on error
   */ 
  protected int find_min_node(BtreeNode el) 
       throws IOException {
	if (SN_Btree.debug) System.out.println("ST_FSM.find_min_node() called.");
    if ( (fsm_state != GET_FETCHING_DATA) && (fsm_state != PUT_FETCHING_DATA)) 
    {
      throw new IOException("not in XXX_FETCHING_DATA");
    }

    // look through records for our key and read all
    // records into bucket_records_read Vector
    int num_used = el.get_num_buckets_used();
    //System.out.println("	num slots "+num_records);
	long theKey = (min_pred == ST_FSM.GT) ? min_key+1 : min_key;
    long reckey = 0;
    BtreeNode.BtreeEntry thbr = null; 
    /* bucket_records_read = new Vector(); */

	int i=0;
    for (i=0; i<num_used; i++) {
    thbr = el.get_bucket_record(i);
      if (thbr.get_is_used()) {
	// Print records in node for debugging...
	  if (SN_Btree.debug) thbr.printString();

	/* bucket_records_read.addElement(thbr);	MARK: don't really think this is useful */ 
	reckey = thbr.get_record_key();

	if ((reckey == theKey) && el.get_is_leaf_node() )
 	{
	  // found our key!  
	  return el.get_main_bucket_num(); /* return ptr to this leaf */

	} else if (reckey >= theKey) {
	    // Is this a leaf?
	    if (el.get_is_leaf_node()) { 
		/* return ptr to this leaf */
	  	return el.get_main_bucket_num(); 
	    } else {
		// It is a parent, and we do not have to follow the 
		//	link pointers because highkey >= reckey.
		bucket = thbr.get_ptr(); // bucket indicates the current value
		if (SN_Btree.debug) 
			System.out.println("		following ptr: "+bucket + " node "+el.get_main_bucket_num());
		/* MARK: pass lock_request_id to find fsm */
		globBufCache.fetch_and_cache(segname, bucket, lock_request_id);
		// we will have more work to do next time time we are called.
		return 0;
	    }
	} else {
		/* check next slot */
	}
      }
    }
    // We now have to check if we have to follow blinks
    // i.e. move right
    if (reckey < theKey) {
		int blink = el.get_overflow_bucket_num();

		/* first check if min_key > max_key in btree */
        if ( (blink == -1) && (theKey > sn_btree.max_key) )
        {
            if (el.get_is_leaf_node())
            {
                /* return ptr to this leaf */ 
	  			return el.get_main_bucket_num(); 
            } else {
				// It is a parent, and we need to 
				// a) update the high key in this node
				// b) continue down the btree 

				i--;
				thbr.set_record_key(theKey);
				el.set_bucket_record(i,thbr);
	    		globBufCache.write_back_cache( segname, 
											el.get_main_bucket_num(), 
											el.get_byte_array());
				
				bucket = thbr.get_ptr(); // bucket indicates the current value
				
				/* MARK: pass lock_request_id to find fsm */
				if (SN_Btree.debug) 
					System.out.println("		following ptr: "+bucket + " node "+el.get_main_bucket_num());
				globBufCache.fetch_and_cache(segname, bucket, lock_request_id);
				// we will have more work to do next time time we are called.
				return 0;
            }

        }  
		bucket = blink;
		if ((bucket == 0) || (bucket == -1))
		{
    		System.out.println("ST_FSM.find_mind_node(): FATAL ERROR");
			System.exit(-1); /* error */
		} else {
			if (SN_Btree.debug) 
				System.out.println("		following blink: "+bucket);
			globBufCache.fetch_and_cache(segname, bucket, lock_request_id); 
			return -1; 
		}
    }
    System.out.println("ST_FSM.find_mind_node(): FATAL ERROR");
	System.exit(-1); /* error */
	return 0; /* shouldn't make it here, need for compile */
  }
  public SN_BtreeReadComplete search(BtreeNode el) 
       throws IOException {
	//System.out.println("starting ST_FSM.search()...");
    if (fsm_state != GET_FETCHING_DATA && fsm_state != DEL_FETCHING_DATA ) {
      throw new IOException("not in GET/DEL_FETCHING_DATA");
    }
    /* blocks_read.addElement(el); 		don't think this is necessary */

    // look through records for our key
    int num_records = el.get_num_bucket_records();
    long reckey = 0;
    BtreeNode.BtreeEntry thbr = null;
    for (int i=0; i<num_records; i++) {
      thbr = el.get_bucket_record(i);
      if (thbr.get_is_used()) {
	// Print records in node for debugging...
	// thbr.printString();
	bucket_records_read.addElement(thbr);
	reckey = thbr.get_record_key();

	if ((reckey == key) && el.get_is_leaf_node()) {
	  // found our key!  return completion

	  /* get data which is pointed to by leaf node, synchronous */ 
	  int block_num = thbr.get_ptr();
	  if(SN_Btree.debug)
	  	System.out.println("ST_FSM.search() getting..."+dataname+" "+"#"+block_num);
	  MemRegionIF data = 
			globBufCache.fetch_sync_and_cache(dataname,block_num);
	  BtreeData btd = new BtreeData(data);
	  /* MARK: ugly hack, will fix later */
	  update_btd = btd;

	  SN_BtreeReadComplete shrc = new SN_BtreeReadComplete();
	  shrc.uniqueID = lock_request_id;
	  shrc.table = segname;
	  shrc.success = true;
	  shrc.key = key;
	  shrc.value = btd.get_record_data();
	  return shrc;

	} else if (reckey >= key) {
	    // Is this a leaf?
	    if (el.get_is_leaf_node()) { // ok, key not found
		SN_BtreeReadComplete shrc = new SN_BtreeReadComplete();
		shrc.uniqueID = lock_request_id;
		shrc.success = true;
		shrc.table = segname;
		shrc.key = key;
		shrc.value = null;
		return shrc;
	    } else {
		// It is a parent, and we do not have to follow the 
		//	link pointers because highkey >= reckey.
		bucket = thbr.get_ptr(); // bucket indicates the current value
		// System.out.println("		following ptr: "+bucket);
		/* MARK: pass lock_request_id to find fsm */
		globBufCache.fetch_and_cache(segname, bucket, lock_request_id);
		// we will have more work to do next time time we are called.
		return null; 
	    }
	} else {
		/* check next slot */
	}
      }
    }
    // We now have to check if we have to follow links
    if (reckey < key) {
	bucket = el.get_overflow_bucket_num();
	if ((bucket == 0) || (bucket == -1))
		System.exit(-1); /* error */
	else {
		// System.out.println("		following blink: "+bucket);
		globBufCache.fetch_and_cache(segname, bucket, lock_request_id); 
		return null;
	}
    }
  
    System.out.println("ST_FSM.search(): FATAL ERROR");
    System.exit(-1); 	/* error */
    return null;	/* shouldn't make it here, need for compile */
  }

  /**
   * Here's where a REMOVE FSM does the delete of the value. 
   * This event triggers the completion of
   * the FSM, it returns a non-null SN_HashtableRemoveComplete. 
   * A java.io.IOException is thrown if this element
   * shouldn't have been dispatched to this FSM, and therefore the FSM
   * breaks.
   */
  public SN_BtreeRemoveComplete doDeletion(int[] old_values)
       throws IOException {

    if (fsm_state != DEL_DOING_DELETION) {
      throw new IOException("not in DEL_DOING_DELETION");
    }

    // ok, have all elements. now need to delete old value (if there),
    // and write 

    if (old_values == null) {
      // The key for this value does not exist! 
      SN_BtreeRemoveComplete hrc = new SN_BtreeRemoveComplete();
      hrc.uniqueID = lock_request_id;
      hrc.success = false;
      hrc.table = segname;
      hrc.key = key;
      hrc.value = null;
      return hrc;
    }

    // delete old value(s)
	int ov_size = old_values.length;
	int[] new_values = new int[ov_size];
	int j = 0 ;
	boolean value_found = false, keep_this = true; 
	for (int i=0; i<ov_size; i++)
	{
		for (int k=0;k<del_bytes.length;k++)
		{
			if (del_bytes[k] == old_values[i])	
				keep_this = false;
		}
		if (keep_this)
		{
			new_values[j] = old_values[i];
			j++;
		} else value_found = true;
		
		keep_this = true;
    }

	
      SN_BtreeRemoveComplete hrc = new SN_BtreeRemoveComplete();
      hrc.uniqueID = lock_request_id;
      hrc.table = segname;
      hrc.key = key;
      hrc.value = new_values;

	  if (!value_found)
      	hrc.success = false; /* value not found */
	  else {
		int[] temp_values = new int[j];
		System.arraycopy(new_values,0,temp_values,0,j);

	  /* write data to cache */
      hrc.success = true;
	  update_btd.set_record_data(temp_values);
      globBufCache.write_back_cache( dataname, update_btd.get_block_num(), 
				update_btd.get_byte_array()); 
	  }

      return hrc;
  }


  // a convenient utility function 
  // creates a BtreeNode from a given range of data 
  // and pads the rest with dummy slots
  private BtreeNode create_thb(int start_hb, int stop_hb, int node, int blink, boolean isLeaf) {
    BtreeNode thb = null;
	BtreeNode.BtreeEntry thbr = null;
    BtreeNode.BtreeEntry[] thbr_arr = null;
    MemRegionIF reg = null;
	int max_slots;

	// System.out.println("ST_FSM.create_thb():	index start: "+start_hb+" index stop: "+stop_hb);

    // prep array
	max_slots = (node == 0) ? SN_Btree.max_root_slots : SN_Btree.max_node_slots;
    thbr_arr = new BtreeNode.BtreeEntry[max_slots];
	int i,j;
    for (j=start_hb,i=0; j<stop_hb; i++,j++) {
      thbr_arr[i] = 
			(BtreeNode.BtreeEntry) bucket_records_read.elementAt(j);
	  //System.out.println("		record "+j);
    }
	// fill with dummy slots
	for (;i<max_slots; i++)
	{
		thbr = new BtreeNode.BtreeEntry(false);
        thbr_arr[i] = thbr;
	}

    // allocate and return thb
    reg = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize);
    
	/* LSN=0 Node=node BLinkPtr=blink LEAF=isLeaf slotsArray[] MemRegion */    
	if (node == 0)
	{
    	thb = new BtreeRoot(0, 0, blink, isLeaf, thbr_arr, reg); 
		((BtreeRoot)thb).set_metadata(sn_btree.numnodes,sn_btree.numblocks,
			sn_btree.min_key,sn_btree.min_ptr,
			sn_btree.max_key,sn_btree.max_ptr);

	}
	else
    	thb = new BtreeNode(0, node, blink, isLeaf, thbr_arr, reg); 
	thbr_arr = null;
	reg = null;
    return thb;
  }

  // Creates a nonleaf BtreeRoot node due to a
  // split in BtreeRoot 
  // Adds data and pads the rest with dummy slots
  private BtreeRoot create_root(BtreeRoot el) {
    BtreeRoot thb = null;
	BtreeNode.BtreeEntry thbr = null;
    BtreeNode.BtreeEntry[] thbr_arr;
    MemRegionIF reg;

	if (SN_Btree.debug)
		System.out.println("ST_FSM.create_root()...");	

    thbr_arr = new BtreeNode.BtreeEntry[SN_Btree.max_root_slots];
	thbr = new BtreeNode.BtreeEntry(true, aKey ,aPtr ); // entry for A
	thbr_arr[0] = thbr;
	thbr = new BtreeNode.BtreeEntry(true, key , newptr); // entry for B
	thbr_arr[1] = thbr;

	
	// fill with dummy slots
	for (int i=2;i<SN_Btree.max_root_slots; i++)
	{
		thbr = new BtreeNode.BtreeEntry(false);
        thbr_arr[i] = thbr;
	}

    // allocate and return thb
    reg = globMemAllocIF.allocateRegion(SN_Btree.table_blocksize);
	/* LSN=0 Node=0 BLinkPtr=-1 LEAF=false slotsArray[] MemRegion */    
	/* MARK: is the root is not a leaf */

    thb = new BtreeRoot(0, 0, -1, false, thbr_arr, reg); 
	/* set metadata in root node */
	if (el.get_is_leaf_node()) sn_btree.min_ptr = aPtr;
	if (key == sn_btree.max_key && el.get_is_leaf_node()) 
			sn_btree.max_ptr = newptr;
	thb.set_metadata(sn_btree.numnodes,sn_btree.numblocks,sn_btree.min_key,
						sn_btree.min_ptr,sn_btree.max_key,sn_btree.max_ptr);

    return thb;
  }


	private boolean checkMetaData(int node, long thisKey)
	{
		boolean result = false;

		if (thisKey < sn_btree.min_key)
		{
			sn_btree.min_key = thisKey;
			sn_btree.min_ptr = node; 
			result = true;
		}

		if (thisKey > sn_btree.max_key)
		{
			sn_btree.max_key = thisKey;
			sn_btree.max_ptr = node; 
			result = true;
		}

		return result;

	}

	/* Just a little ditty for simple puts to globBufCache */
    private SN_BtreeWriteComplete simplePut(BtreeNode el) {

	  int num_recs = bucket_records_read.size();
	  if (SN_Btree.debug) 
	  {
		System.out.println("ST_FSM.simplePut:	node "+bucket+" has "+num_recs+" records.");
		System.out.flush();
	  }

	  ptr = el.get_overflow_bucket_num();
	  boolean isLeaf = el.get_is_leaf_node();
	  BtreeNode thb;
	  thb =  create_thb(0, num_recs, bucket, ptr, isLeaf);

	SN_BtreeWriteComplete hwc; 
	if (thb != null) {
	    hwc = new SN_BtreeWriteComplete();
	    hwc.uniqueID = lock_request_id;
	    hwc.success = true;
	    hwc.table = segname;
	    hwc.key = key;
	    hwc.old_value = old_data; 
	    hwc.new_value = put_bytes;

	    globBufCache.write_back_cache( segname, bucket, thb.get_byte_array());

	} else {      
		// fail on write in put
		hwc = new SN_BtreeWriteComplete();
		hwc.uniqueID = lock_request_id;
		hwc.success = false; 
		hwc.table = segname;
		hwc.key = key;
		hwc.old_value = old_data;
		hwc.new_value = null;
	}
		old_data = null;
		put_bytes = null;
		traversed_nodes = null;
		ht_upcall = null;
		sn_btree = null;
		blocks_read = null;
		bucket_records_read = null;
		lock_held = null;

	    return hwc;

	}
    
	/* Another little ditty which concats lists of ints */
    protected int[] concatList(int[] list1, int[] list2)
    {
        if (list1 == null)
            return list2;
        else
        {
            int[] list3 = new int[(list1.length+list2.length)];
            int i,j;
            for (i=0; i<list1.length; i++)
            {
                list3[i] = list1[i];
            }
            for (j=0; j<list2.length; i++,j++)
            {
                list3[i] = list2[j];
            }
            return list3;
        }
    } 
}

