org.jgroups.protocols
Class UNICAST2

java.lang.Object
  extended by org.jgroups.stack.Protocol
      extended by org.jgroups.protocols.UNICAST2
All Implemented Interfaces:
AgeOutCache.Handler<Address>

public class UNICAST2
extends Protocol
implements AgeOutCache.Handler<Address>

Reliable unicast layer. Implemented with negative acks. Every sender keeps its messages in an AckSenderWindow. A receiver stores incoming messages in a NakReceiverWindow, and asks the sender for retransmission if a gap is detected. Every now and then (stable_interval), a timer task sends a STABLE message to all senders, including the highest received and delivered seqnos. A sender purges messages lower than highest delivered and asks the STABLE sender for messages it might have missed (smaller than highest received). A STABLE message can also be sent when a receiver has received more than max_bytes from a given sender.

The advantage of this protocol over UNICAST is that it doesn't send acks for every message. Instead, it sends 'acks' after receiving max_bytes and/ or periodically (stable_interval).

Author:
Bela Ban

Nested Class Summary
protected  class UNICAST2.ConnectionReaper
           
protected  class UNICAST2.ReceiverEntry
           
protected  class UNICAST2.RetransmitTask
          Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and sends retransmit request to all members from which we have missing messages
protected  class UNICAST2.SenderEntry
           
static class UNICAST2.Unicast2Header
          The following types and fields are serialized:
 
Field Summary
protected  AgeOutCache<Address> cache
           
protected  long conn_expiry_timeout
           
protected  java.util.concurrent.Future<?> connection_reaper
           
static long DEFAULT_FIRST_SEQNO
           
protected  int exponential_backoff
          Deprecated. 
protected  short last_conn_id
           
protected  Address local_addr
           
protected  boolean log_not_found_msgs
           
protected  long max_bytes
           
protected  int max_msg_batch_size
           
protected  long max_retransmit_time
           
protected  int max_stable_msgs
           
protected  java.util.List<Address> members
           
protected  int num_messages_received
           
protected  int num_messages_sent
           
protected  java.util.concurrent.ConcurrentMap<Address,UNICAST2.ReceiverEntry> recv_table
           
protected  java.util.concurrent.locks.ReentrantLock recv_table_lock
           
protected  boolean running
           
protected  java.util.concurrent.ConcurrentMap<Address,UNICAST2.SenderEntry> send_table
           
protected  long stable_interval
           
protected  java.util.concurrent.Future<?> stable_task_future
           
protected  int[] timeout
          Deprecated. 
protected  TimeScheduler timer
           
protected  boolean use_range_based_retransmitter
           
protected  long xmit_interval
           
protected  java.util.concurrent.atomic.AtomicLong xmit_reqs_received
           
protected  java.util.concurrent.atomic.AtomicLong xmit_reqs_sent
           
protected  java.util.concurrent.atomic.AtomicLong xmit_rsps_sent
           
protected  boolean xmit_table_automatic_purging
          Deprecated. 
protected  long xmit_table_max_compaction_time
           
protected  int xmit_table_msgs_per_row
           
protected  int xmit_table_num_rows
           
protected  double xmit_table_resize_factor
           
protected  java.util.concurrent.Future<?> xmit_task
          RetransmitTask running every xmit_interval ms
protected  java.util.Map<Address,java.lang.Long> xmit_task_map
          Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)
 
Fields inherited from class org.jgroups.stack.Protocol
down_prot, ergonomics, id, log, name, stack, stats, up_prot
 
Constructor Summary
UNICAST2()
           
 
Method Summary
 java.lang.Object down(Event evt)
          An event is to be sent down the stack.
 void expired(Address key)
          Called by AgeOutCache, to removed expired connections
 AgeOutCache<Address> getAgeOutCache()
           
 int getAgeOutCacheSize()
           
 java.lang.String getLocalAddress()
           
 long getMaxRetransmitTime()
           
 java.lang.String getMembers()
           
protected  short getNewConnectionId()
           
 int getNumConnections()
           
 int getNumReceiveConnections()
           
 int getNumSendConnections()
           
protected  UNICAST2.ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, short conn_id)
           
protected  UNICAST2.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id)
           
 int[] getTimeout()
          Deprecated. 
 TimeScheduler getTimer()
           
 long getXmitTableMissingMessages()
           
 int getXmitTableNumCompactions()
           
 int getXmitTableNumMoves()
           
 int getXmitTableNumPurges()
           
 int getXmitTableNumResizes()
           
 long getXmitTableUndeliveredMessages()
           
protected  void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg, Event evt)
          Check whether the hashmap contains an entry e for sender (create if not).
protected  void handleResendingOfFirstMessage(Address sender, long seqno)
          We need to resend our first message with our conn_id
protected  void handleXmitRequest(Address sender, SeqnoList missing)
           
 void init()
          Called after instance has been created (null constructor) and before protocol is started.
 boolean isConnectionReaperRunning()
           
 boolean isXmitTaskRunning()
           
 java.lang.String printAgeOutCache()
           
 java.lang.String printConnections()
           
 java.lang.String printReceiveWindowMessages()
           
 java.lang.String printSendWindowMessages()
           
 void reapIdleConnections()
           
 void removeAllConnections()
          This method is public only so it can be invoked by unit testing, but should not otherwise be used !
 void removeConnection(Address mbr)
          Removes and resets from connection table (which is already locked).
 void removeReceiveConnection(Address mbr)
           
 void removeSendConnection(Address mbr)
           
 void resetStats()
           
 void retransmit(SeqnoList missing, Address sender)
           
protected  void sendRequestForFirstSeqno(Address dest, long seqno_received)
           
protected  void sendStableMessage(Address dest, short conn_id, long hd, long hr)
           
 void sendStableMessages()
           
 void setMaxMessageBatchSize(int size)
           
 void setMaxRetransmitTime(long max_retransmit_time)
           
 void setTimeout(int[] val)
          Deprecated. 
 void setTimer(TimeScheduler timer)
          Only used for unit tests, don't use !
protected  void stable(Address sender, short conn_id, long hd, long hr)
          Purge all messages in window for local_addr, which are <= low.
 void start()
          This method is called on a Channel.connect(String).
protected  void startConnectionReaper()
           
protected  void startRetransmitTask()
           
protected  void startStableTask()
           
 void stop()
          This method is called on a Channel.disconnect().
protected  void stopConnectionReaper()
           
protected  void stopRetransmitTask()
           
protected  void stopStableTask()
           
 void triggerXmit()
           
 java.lang.Object up(Event evt)
          An event was received from the layer below.
 
Methods inherited from class org.jgroups.stack.Protocol
destroy, dumpStats, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, printStats, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_FIRST_SEQNO

public static final long DEFAULT_FIRST_SEQNO
See Also:
Constant Field Values

timeout

@Deprecated
protected int[] timeout
Deprecated. 

exponential_backoff

@Deprecated
protected int exponential_backoff
Deprecated. 
The first value (in milliseconds) to use in the exponential backoff retransmission mechanism. Only enabled if the value is > 0


max_msg_batch_size

protected int max_msg_batch_size

max_bytes

protected long max_bytes

stable_interval

protected long stable_interval

max_stable_msgs

protected int max_stable_msgs

xmit_table_num_rows

protected int xmit_table_num_rows

xmit_table_msgs_per_row

protected int xmit_table_msgs_per_row

xmit_table_resize_factor

protected double xmit_table_resize_factor

xmit_table_max_compaction_time

protected long xmit_table_max_compaction_time

xmit_table_automatic_purging

@Deprecated
protected boolean xmit_table_automatic_purging
Deprecated. 

use_range_based_retransmitter

protected boolean use_range_based_retransmitter

log_not_found_msgs

protected boolean log_not_found_msgs

conn_expiry_timeout

protected long conn_expiry_timeout

xmit_interval

protected long xmit_interval

num_messages_sent

protected int num_messages_sent

num_messages_received

protected int num_messages_received

send_table

protected final java.util.concurrent.ConcurrentMap<Address,UNICAST2.SenderEntry> send_table

recv_table

protected final java.util.concurrent.ConcurrentMap<Address,UNICAST2.ReceiverEntry> recv_table

xmit_task

protected java.util.concurrent.Future<?> xmit_task
RetransmitTask running every xmit_interval ms


xmit_task_map

protected final java.util.Map<Address,java.lang.Long> xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)


recv_table_lock

protected final java.util.concurrent.locks.ReentrantLock recv_table_lock

members

protected volatile java.util.List<Address> members

local_addr

protected Address local_addr

timer

protected TimeScheduler timer

running

protected volatile boolean running

last_conn_id

protected short last_conn_id

max_retransmit_time

protected long max_retransmit_time

cache

protected AgeOutCache<Address> cache

stable_task_future

protected java.util.concurrent.Future<?> stable_task_future

connection_reaper

protected java.util.concurrent.Future<?> connection_reaper

xmit_reqs_received

protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_received

xmit_reqs_sent

protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_sent

xmit_rsps_sent

protected final java.util.concurrent.atomic.AtomicLong xmit_rsps_sent
Constructor Detail

UNICAST2

public UNICAST2()
Method Detail

getTimeout

@Deprecated
public int[] getTimeout()
Deprecated. 


setTimeout

@Deprecated
public void setTimeout(int[] val)
Deprecated. 


setMaxMessageBatchSize

public void setMaxMessageBatchSize(int size)

getLocalAddress

public java.lang.String getLocalAddress()

getMembers

public java.lang.String getMembers()

getNumSendConnections

public int getNumSendConnections()

getNumReceiveConnections

public int getNumReceiveConnections()

getNumConnections

public int getNumConnections()

printConnections

public java.lang.String printConnections()

isConnectionReaperRunning

public boolean isConnectionReaperRunning()

getXmitTableUndeliveredMessages

public long getXmitTableUndeliveredMessages()

getXmitTableMissingMessages

public long getXmitTableMissingMessages()

getXmitTableNumCompactions

public int getXmitTableNumCompactions()

getXmitTableNumMoves

public int getXmitTableNumMoves()

getXmitTableNumResizes

public int getXmitTableNumResizes()

getXmitTableNumPurges

public int getXmitTableNumPurges()

printReceiveWindowMessages

public java.lang.String printReceiveWindowMessages()

printSendWindowMessages

public java.lang.String printSendWindowMessages()

isXmitTaskRunning

public boolean isXmitTaskRunning()

getMaxRetransmitTime

public long getMaxRetransmitTime()

setMaxRetransmitTime

public void setMaxRetransmitTime(long max_retransmit_time)

getAgeOutCacheSize

public int getAgeOutCacheSize()

printAgeOutCache

public java.lang.String printAgeOutCache()

getAgeOutCache

public AgeOutCache<Address> getAgeOutCache()

resetStats

public void resetStats()
Overrides:
resetStats in class Protocol

getTimer

public TimeScheduler getTimer()

setTimer

public void setTimer(TimeScheduler timer)
Only used for unit tests, don't use !

Parameters:
timer -

init

public void init()
          throws java.lang.Exception
Description copied from class: Protocol
Called after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.

Overrides:
init in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the channel constructor will throw an exception

start

public void start()
           throws java.lang.Exception
Description copied from class: Protocol
This method is called on a Channel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.

Overrides:
start in class Protocol
Throws:
java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so Channel.connect(String) will throw an exception

stop

public void stop()
Description copied from class: Protocol
This method is called on a Channel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed

Overrides:
stop in class Protocol

up

public java.lang.Object up(Event evt)
Description copied from class: Protocol
An event was received from the layer below. Usually the current layer will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().

Overrides:
up in class Protocol

down

public java.lang.Object down(Event evt)
Description copied from class: Protocol
An event is to be sent down the stack. The layer may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the layer may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down(). In case of a GET_ADDRESS event (which tries to retrieve the stack's address from one of the bottom layers), the layer may need to send a new response event back up the stack using up_prot.up().

Overrides:
down in class Protocol

stable

protected void stable(Address sender,
                      short conn_id,
                      long hd,
                      long hr)
Purge all messages in window for local_addr, which are <= low. Check if the window's highest received message is > high: if true, retransmit all messages from high - win.high to sender

Parameters:
sender -
hd - Highest delivered seqno
hr - Highest received seqno

sendStableMessages

public void sendStableMessages()

sendStableMessage

protected void sendStableMessage(Address dest,
                                 short conn_id,
                                 long hd,
                                 long hr)

startStableTask

protected void startStableTask()

stopStableTask

protected void stopStableTask()

startConnectionReaper

protected void startConnectionReaper()

stopConnectionReaper

protected void stopConnectionReaper()

removeConnection

public void removeConnection(Address mbr)
Removes and resets from connection table (which is already locked). Returns true if member was found, otherwise false. This method is public only so it can be invoked by unit testing, but should not otherwise be used !


removeSendConnection

public void removeSendConnection(Address mbr)

removeReceiveConnection

public void removeReceiveConnection(Address mbr)

removeAllConnections

public void removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !


retransmit

public void retransmit(SeqnoList missing,
                       Address sender)

expired

public void expired(Address key)
Called by AgeOutCache, to removed expired connections

Specified by:
expired in interface AgeOutCache.Handler<Address>
Parameters:
key -

handleDataReceived

protected void handleDataReceived(Address sender,
                                  long seqno,
                                  short conn_id,
                                  boolean first,
                                  Message msg,
                                  Event evt)
Check whether the hashmap contains an entry e for sender (create if not). If e.received_msgs is null and first is true: create a new AckReceiverWindow(seqno) and add message. Set e.received_msgs to the new window. Else just add the message.


getReceiverEntry

protected UNICAST2.ReceiverEntry getReceiverEntry(Address sender,
                                                  long seqno,
                                                  boolean first,
                                                  short conn_id)

getOrCreateReceiverEntry

protected UNICAST2.ReceiverEntry getOrCreateReceiverEntry(Address sender,
                                                          long seqno,
                                                          short conn_id)

handleXmitRequest

protected void handleXmitRequest(Address sender,
                                 SeqnoList missing)

handleResendingOfFirstMessage

protected void handleResendingOfFirstMessage(Address sender,
                                             long seqno)
We need to resend our first message with our conn_id

Parameters:
sender -
seqno - Resend the non null messages in the range [lowest .. seqno]

startRetransmitTask

protected void startRetransmitTask()

stopRetransmitTask

protected void stopRetransmitTask()

getNewConnectionId

protected short getNewConnectionId()

sendRequestForFirstSeqno

protected void sendRequestForFirstSeqno(Address dest,
                                        long seqno_received)

reapIdleConnections

public void reapIdleConnections()

triggerXmit

public void triggerXmit()


Copyright © 1998-2012 Bela Ban / Red Hat. All Rights Reserved.