org.jgroups.protocols
Class SEQUENCER

java.lang.Object
  extended by org.jgroups.stack.Protocol
      extended by org.jgroups.protocols.SEQUENCER

public class SEQUENCER
extends Protocol

Implementation of total order protocol using a sequencer. Consult SEQUENCER.txt for details

Author:
Bela Ban

Nested Class Summary
protected  class SEQUENCER.Flusher
           
static class SEQUENCER.SequencerHeader
           
 
Field Summary
protected  boolean ack_mode
          When ack_mode is set, we need to wait for an ack for each forwarded message until we can send the next one
protected  Promise<java.lang.Long> ack_promise
          Used for each resent message to wait until the message has been received
protected  long bcast_msgs
           
protected  Address coord
           
protected  long delivered_bcasts
           
protected  java.util.concurrent.ConcurrentMap<Address,java.util.NavigableSet<java.lang.Long>> delivery_table
           
protected  int delivery_table_max_size
           
protected  SEQUENCER.Flusher flusher
           
protected  boolean flushing
          Set when we block all sending threads to resend all messages from forward_table
protected  java.util.NavigableMap<java.lang.Long,byte[]> forward_table
          Maintains messages forwarded to the coord which which no ack has been received yet.
protected  long forwarded_msgs
           
protected  java.util.concurrent.atomic.AtomicInteger in_flight_sends
          Keeps track of the threads sending messages
protected  boolean is_coord
           
protected  Address local_addr
           
protected  int num_acks
           
protected  long received_bcasts
           
protected  long received_forwards
           
protected  boolean running
           
protected  java.util.concurrent.locks.Condition send_cond
           
protected  java.util.concurrent.locks.Lock send_lock
           
protected  java.util.concurrent.atomic.AtomicLong seqno
           
protected  int threshold
           
protected  View view
           
 
Fields inherited from class org.jgroups.stack.Protocol
down_prot, ergonomics, id, log, name, stack, stats, up_prot
 
Constructor Summary
SEQUENCER()
           
 
Method Summary
protected  void block()
           
protected  void broadcast(Message msg, boolean copy, Address original_sender, long seqno, boolean resend)
           
protected  boolean canDeliver(Address sender, long seqno)
          Checks if seqno has already been received from sender.
protected  void deliver(Message msg, Event evt, SEQUENCER.SequencerHeader hdr)
           
 java.lang.Object down(Event evt)
          An event is to be sent down the stack.
 java.util.Map<java.lang.String,java.lang.Object> dumpStats()
           
protected  void flush(Address new_coord)
           
protected  void flushMessagesInForwardTable()
          Sends all messages currently in forward_table to the new coordinator (changing the dest field).
protected  void forward(byte[] marshalled_msg, long seqno, boolean flush)
           
protected  void forwardToCoord(byte[] marshalled_msg, long seqno)
           
 long getBroadcast()
           
 Address getCoordinator()
           
 long getForwarded()
           
 int getForwardTableSize()
           
 Address getLocalAddress()
           
 long getReceivedBroadcasts()
           
 long getReceivedForwards()
           
protected  void handleViewChange(View v)
           
 boolean isCoordinator()
           
 java.lang.String printStats()
           
 void resetStats()
           
 void setDeliveryTableMaxSize(int size)
           
 void setThreshold(int new_threshold)
           
 void start()
          This method is called on a Channel.connect(String).
protected  void startFlusher(Address new_coord)
           
 void stop()
          This method is called on a Channel.disconnect().
protected  void stopFlusher()
           
protected  void unblockAll()
           
protected  void unwrapAndDeliver(Message msg, boolean flush_ack)
          Unmarshal the original message (in the payload) and then pass it up (unless already delivered)
 java.lang.Object up(Event evt)
          An event was received from the layer below.
 
Methods inherited from class org.jgroups.stack.Protocol
destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, init, isErgonomics, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

local_addr

protected Address local_addr

coord

protected volatile Address coord

view

protected volatile View view

is_coord

protected volatile boolean is_coord

seqno

protected final java.util.concurrent.atomic.AtomicLong seqno

forward_table

protected final java.util.NavigableMap<java.lang.Long,byte[]> forward_table
Maintains messages forwarded to the coord which which no ack has been received yet. Needs to be sorted so we resend them in the right order


send_lock

protected final java.util.concurrent.locks.Lock send_lock

send_cond

protected final java.util.concurrent.locks.Condition send_cond

ack_mode

protected volatile boolean ack_mode
When ack_mode is set, we need to wait for an ack for each forwarded message until we can send the next one


flushing

protected volatile boolean flushing
Set when we block all sending threads to resend all messages from forward_table


running

protected volatile boolean running

in_flight_sends

protected final java.util.concurrent.atomic.AtomicInteger in_flight_sends
Keeps track of the threads sending messages


delivery_table

protected final java.util.concurrent.ConcurrentMap<Address,java.util.NavigableSet<java.lang.Long>> delivery_table

flusher

protected volatile SEQUENCER.Flusher flusher

ack_promise

protected final Promise<java.lang.Long> ack_promise
Used for each resent message to wait until the message has been received


delivery_table_max_size

protected int delivery_table_max_size

threshold

protected int threshold

num_acks

protected int num_acks

forwarded_msgs

protected long forwarded_msgs

bcast_msgs

protected long bcast_msgs

received_forwards

protected long received_forwards

received_bcasts

protected long received_bcasts

delivered_bcasts

protected long delivered_bcasts
Constructor Detail

SEQUENCER

public SEQUENCER()
Method Detail

isCoordinator

public boolean isCoordinator()

getCoordinator

public Address getCoordinator()

getLocalAddress

public Address getLocalAddress()

getForwarded

public long getForwarded()

getBroadcast

public long getBroadcast()

getReceivedForwards

public long getReceivedForwards()

getReceivedBroadcasts

public long getReceivedBroadcasts()

getForwardTableSize

public int getForwardTableSize()

setThreshold

public void setThreshold(int new_threshold)

setDeliveryTableMaxSize

public void setDeliveryTableMaxSize(int size)

resetStats

public void resetStats()
Overrides:
resetStats in class Protocol

dumpStats

public java.util.Map<java.lang.String,java.lang.Object> dumpStats()
Overrides:
dumpStats in class Protocol

printStats

public java.lang.String printStats()
Overrides:
printStats in class Protocol

start

public void start()
           throws java.lang.Exception
Description copied from class: Protocol
This method is called on a Channel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.

Overrides:
start in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so Channel.connect(String) will throw an exception

stop

public void stop()
Description copied from class: Protocol
This method is called on a Channel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed

Overrides:
stop in class Protocol

down

public java.lang.Object down(Event evt)
Description copied from class: Protocol
An event is to be sent down the stack. The layer may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the layer may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down(). In case of a GET_ADDRESS event (which tries to retrieve the stack's address from one of the bottom layers), the layer may need to send a new response event back up the stack using up_prot.up().

Overrides:
down in class Protocol

up

public java.lang.Object up(Event evt)
Description copied from class: Protocol
An event was received from the layer below. Usually the current layer will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().

Overrides:
up in class Protocol

handleViewChange

protected void handleViewChange(View v)

flush

protected void flush(Address new_coord)
              throws java.lang.InterruptedException
Throws:
java.lang.InterruptedException

flushMessagesInForwardTable

protected void flushMessagesInForwardTable()
Sends all messages currently in forward_table to the new coordinator (changing the dest field). This needs to be done, so the underlying reliable unicast protocol (e.g. UNICAST) adds these messages to its retransmission mechanism
Note that we need to resend the messages in order of their seqnos ! We also need to prevent other message from being inserted until we're done, that's why there's synchronization.
Access to the forward_table doesn't need to be synchronized as there won't be any insertions during flushing (all down-threads are blocked)


forwardToCoord

protected void forwardToCoord(byte[] marshalled_msg,
                              long seqno)

forward

protected void forward(byte[] marshalled_msg,
                       long seqno,
                       boolean flush)

broadcast

protected void broadcast(Message msg,
                         boolean copy,
                         Address original_sender,
                         long seqno,
                         boolean resend)

unwrapAndDeliver

protected void unwrapAndDeliver(Message msg,
                                boolean flush_ack)
Unmarshal the original message (in the payload) and then pass it up (unless already delivered)

Parameters:
msg -

deliver

protected void deliver(Message msg,
                       Event evt,
                       SEQUENCER.SequencerHeader hdr)

canDeliver

protected boolean canDeliver(Address sender,
                             long seqno)
Checks if seqno has already been received from sender. This weeds out duplicates. Note that this method is never called concurrently for the same sender, as the sender in NAKACK will always be the coordinator.


block

protected void block()

unblockAll

protected void unblockAll()

startFlusher

protected void startFlusher(Address new_coord)

stopFlusher

protected void stopFlusher()


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.