Class STABLE


  • public class STABLE
    extends Protocol
    Computes the broadcast messages that are stable; i.e., have been delivered by all members. Sends STABLE events down the stack when this is the case. This allows NAKACK{2,3} to garbage collect messages that have been seen by all members.

    Works as follows: periodically (desired_avg_gossip) or when having received a number of bytes (max_bytes), every member sends its digest (highest seqno delivered, received) to the current coordinator
    The coordinator updates a stability vector, which maintains the highest seqno delivered/receive for each member and initially contains no data, when such a message is received.
    When messages from all members have been received, a stability message is mcast, which causes all members to send a STABLE event down the stack (triggering garbage collection in the NAKACK{2,3} layer).

    Author:
    Bela Ban
    • Field Detail

      • desired_avg_gossip

        protected long desired_avg_gossip
        Sends a STABLE gossip every 20 seconds on average. 0 disables gossiping of STABLE messages
      • stability_delay

        @Deprecated
        protected long stability_delay
        Deprecated.
        delay before we send STABILITY msg (give others a change to send first). This should be set to a very small number (> 0 !) if max_bytes is used
      • max_bytes

        protected long max_bytes
        Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE message will be broadcast andnum_bytes_received reset to 0 . If this is > 0, then ideally stability_delay should be set to a low number as well
      • send_stable_msgs_to_coord_only

        @Deprecated
        protected boolean send_stable_msgs_to_coord_only
        Deprecated.
      • num_stable_msgs_sent

        protected int num_stable_msgs_sent
      • num_stable_msgs_received

        protected int num_stable_msgs_received
      • num_stability_msgs_sent

        protected int num_stability_msgs_sent
      • num_stability_msgs_received

        protected int num_stability_msgs_received
      • local_addr

        protected Address local_addr
      • view

        protected volatile View view
      • votes

        protected FixedSizeBitSet votes
        Keeps track of who we already heard from (STABLE_GOSSIP msgs). This is all 0's, and we set the sender when a STABLE message is received. When the bitset is all 1's (responses from all members), we send a STABILITY message
      • lock

        protected final java.util.concurrent.locks.Lock lock
      • stable_task_future

        protected java.util.concurrent.Future<?> stable_task_future
      • stable_task_lock

        protected final java.util.concurrent.locks.Lock stable_task_lock
      • num_bytes_received

        protected long num_bytes_received
        The total number of bytes received from unicast and multicast messages
      • received

        protected final java.util.concurrent.locks.Lock received
      • suspended

        protected volatile boolean suspended
        When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messages
      • initialized

        protected boolean initialized
      • resume_task_future

        protected java.util.concurrent.Future<?> resume_task_future
      • resume_task_mutex

        protected final java.lang.Object resume_task_mutex
      • coordinator

        protected volatile Address coordinator
    • Constructor Detail

      • STABLE

        public STABLE()
    • Method Detail

      • getDesiredAverageGossip

        public long getDesiredAverageGossip()
      • setDesiredAverageGossip

        public void setDesiredAverageGossip​(long gossip_interval)
      • desiredAverageGossip

        public STABLE desiredAverageGossip​(long g)
      • getMaxBytes

        public long getMaxBytes()
      • setMaxBytes

        public void setMaxBytes​(long max_bytes)
      • getBytes

        public long getBytes()
      • getStableSent

        public int getStableSent()
      • getStableReceived

        public int getStableReceived()
      • getStabilitySent

        public int getStabilitySent()
      • getStabilityReceived

        public int getStabilityReceived()
      • getNumVotes

        public int getNumVotes()
      • getStableTaskRunning

        public boolean getStableTaskRunning()
      • gc

        public void gc()
      • printDigest

        public java.lang.String printDigest()
      • printVotes

        public java.lang.String printVotes()
      • requiredDownServices

        public java.util.List<java.lang.Integer> requiredDownServices()
        Description copied from class: Protocol
        List of events that are required to be answered by some layer below
        Overrides:
        requiredDownServices in class Protocol
      • suspend

        protected void suspend​(long timeout)
      • resume

        protected void resume()
      • init

        public void init()
                  throws java.lang.Exception
        Description copied from class: Protocol
        Called after instance has been created (null constructor) and before protocol is started. Properties are already set. Other protocols are not yet connected and events cannot yet be sent.
        Overrides:
        init in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the channel constructor will throw an exception
      • start

        public void start()
                   throws java.lang.Exception
        Description copied from class: Protocol
        This method is called on a JChannel.connect(String). Starts work. Protocols are connected and queues are ready to receive events. Will be called from bottom to top. This call will replace the START and START_OK events.
        Overrides:
        start in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
      • stop

        public void stop()
        Description copied from class: Protocol
        This method is called on a JChannel.disconnect(). Stops work (e.g. by closing multicast socket). Will be called from top to bottom. This means that at the time of the method invocation the neighbor protocol below is still working. This method will replace the STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that when this method is called all messages in the down queue will have been flushed
        Overrides:
        stop in class Protocol
      • up

        public java.lang.Object up​(Event evt)
        Description copied from class: Protocol
        An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().
        Overrides:
        up in class Protocol
      • up

        public java.lang.Object up​(Message msg)
        Description copied from class: Protocol
        A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
        Overrides:
        up in class Protocol
      • up

        public void up​(MessageBatch batch)
        Description copied from class: Protocol
        Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

        The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(org.jgroups.Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

        Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.getMatchingMessages(short,boolean)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

        Overrides:
        up in class Protocol
        Parameters:
        batch - The message batch
      • down

        public java.lang.Object down​(Message msg)
        Description copied from class: Protocol
        A message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it before passing it down.
        Overrides:
        down in class Protocol
      • down

        public java.lang.Object down​(Event evt)
        Description copied from class: Protocol
        An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
        Overrides:
        down in class Protocol
      • handleRegularMessage

        protected void handleRegularMessage​(Message msg)
      • maxBytesExceeded

        protected boolean maxBytesExceeded​(int len)
      • handleViewChange

        protected void handleViewChange​(View v)
      • updateLocalDigest

        protected void updateLocalDigest​(Digest d,
                                         Address sender)
        Update my own digest from a digest received by somebody else. Returns whether the update was successful. Needs to be called with a lock on digest
      • resetDigest

        protected void resetDigest()
      • addVote

        protected boolean addVote​(int rank)
        Adds mbr to votes and returns true if we have all the votes, otherwise false.
        Parameters:
        rank -
      • allVotesReceived

        protected static boolean allVotesReceived​(FixedSizeBitSet votes)
        Votes is already locked and guaranteed to be non-null
      • getRank

        protected static int getRank​(Address member,
                                     View v)
      • startStableTask

        protected void startStableTask()
      • stopStableTask

        protected void stopStableTask()
      • startResumeTask

        protected void startResumeTask​(long max_suspend_time)
      • stopResumeTask

        protected void stopResumeTask()
      • handleStableMessage

        protected void handleStableMessage​(Digest d,
                                           Address sender,
                                           ViewId view_id)
        Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos seen for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability message, which results in garbage collection of messages lower than the ones in the stability vector. The maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN for details).
      • resetNumBytes

        protected void resetNumBytes()
      • handleStabilityMessage

        protected void handleStabilityMessage​(Digest stable_digest,
                                              Address sender,
                                              ViewId view_id)
      • sendStableMessage

        protected void sendStableMessage​(boolean send_in_background)
        Broadcasts a STABLE message of the current digest to all members (or the coordinator only). The message contains the highest seqno delivered and received for all members. The seqnos are retrieved from the NAKACK layer below.
      • readDigest

        protected Digest readDigest​(byte[] buffer,
                                    int offset,
                                    int length)
      • sendStabilityMessage

        protected void sendStabilityMessage​(Digest d,
                                            ViewId view_id)
        Sends a stability message to all members except self.
        Parameters:
        d - A copy of the stability digest, so we don't need to copy it again
      • getDigest

        protected Digest getDigest()
      • printDigest

        protected java.lang.String printDigest​(Digest digest)