Class StreamingStateTransfer

  • All Implemented Interfaces:
    Lifecycle, ProcessingQueue.Handler<Address>
    Direct Known Subclasses:
    STATE, STATE_SOCK

    public abstract class StreamingStateTransfer
    extends Protocol
    implements ProcessingQueue.Handler<Address>
    Base class for state transfer protocols which use streaming (or chunking) to transfer state between two members.

    The major advantage of this approach is that transferring application state to a joining member of a group does not entail loading of the complete application state into memory. The application state, for example, might be located entirely on some form of disk based storage. The default STATE_TRANSFER protocol requires this state to be loaded entirely into memory before being transferred to a group member while the streaming state transfer protocols do not. Thus the streaming state transfer protocols are able to transfer application state that is very large (>1Gb) without a likelihood of the such transfer resulting in OutOfMemoryException.

    Note that prior to 3.0, there was only 1 streaming protocol: STATE. In 3.0 the functionality was split between STATE and STATE_SOCK, and common functionality moved up into StreamingStateTransfer.

    Since:
    3.0
    Author:
    Bela Ban, Vladimir Blagojevic
    See Also:
    STATE_TRANSFER, STATE, STATE_SOCK
    • Field Detail

      • buffer_size

        protected int buffer_size
      • max_pool

        protected int max_pool
      • pool_thread_keep_alive

        protected long pool_thread_keep_alive
      • num_state_reqs

        protected final java.util.concurrent.atomic.LongAdder num_state_reqs
      • num_bytes_sent

        protected final java.util.concurrent.atomic.LongAdder num_bytes_sent
      • avg_state_size

        protected double avg_state_size
      • state_provider

        protected volatile Address state_provider
      • members

        protected final java.util.List<Address> members
      • flushProtocolInStack

        protected volatile boolean flushProtocolInStack
      • state_requesters

        protected final ProcessingQueue<Address> state_requesters
        List of members requesting state. Only a single state request is handled at any time
    • Constructor Detail

      • StreamingStateTransfer

        public StreamingStateTransfer()
    • Method Detail

      • getNumberOfStateRequests

        public long getNumberOfStateRequests()
      • getNumberOfStateBytesSent

        public long getNumberOfStateBytesSent()
      • getAverageStateSize

        public double getAverageStateSize()
      • getThreadPoolSize

        public int getThreadPoolSize()
      • getThreadPoolCompletedTasks

        public long getThreadPoolCompletedTasks()
      • requiredDownServices

        public java.util.List<java.lang.Integer> requiredDownServices()
        Description copied from class: Protocol
        List of events that are required to be answered by some layer below
        Overrides:
        requiredDownServices in class Protocol
      • init

        public void init()
                  throws java.lang.Exception
        Description copied from class: Protocol
        Called after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
        Specified by:
        init in interface Lifecycle
        Overrides:
        init in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be initialized successfully. This will cause the ProtocolStack to fail, so the the channel constructor will throw an exception
      • start

        public void start()
                   throws java.lang.Exception
        Description copied from class: Protocol
        This method is called on a JChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.
        Specified by:
        start in interface Lifecycle
        Overrides:
        start in class Protocol
        Throws:
        java.lang.Exception - Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, so JChannel.connect(String) will throw an exception
      • down

        public java.lang.Object down​(Event evt)
        Description copied from class: Protocol
        An event is to be sent down the stack. A protocol may want to examine its type and perform some action on it, depending on the event's type. If the event is a message MSG, then the protocol may need to add a header to it (or do nothing at all) before sending it down the stack using down_prot.down().
        Overrides:
        down in class Protocol
      • up

        public java.lang.Object up​(Event evt)
        Description copied from class: Protocol
        An event was received from the protocol below. Usually the current protocol will want to examine the event type and - depending on its type - perform some computation (e.g. removing headers from a MSG event type, or updating the internal membership list when receiving a VIEW_CHANGE event). Finally, the event is either a) discarded, or b) an event is sent down the stack using down_prot.down() or c) the event (or another event) is sent up the stack using up_prot.up().
        Overrides:
        up in class Protocol
      • up

        public java.lang.Object up​(Message msg)
        Description copied from class: Protocol
        A single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
        Overrides:
        up in class Protocol
      • up

        public void up​(MessageBatch batch)
        Description copied from class: Protocol
        Sends up a multiple messages in a MessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages.

        The default processing below sends messages up the stack individually, based on a matching criteria (calling Protocol.accept(Message)), and - if true - calls Protocol.up(org.jgroups.Event) for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.

        Subclasses should check if there are any messages destined for them (e.g. using MessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.

        Overrides:
        up in class Protocol
        Parameters:
        batch - The message batch
      • isDigestNeeded

        protected boolean isDigestNeeded()
        When FLUSH is used we do not need to pass digests between members (see JGroups/doc/design/FLUSH.txt)
        Returns:
        true if use of digests is required, false otherwise
      • handleConfig

        protected void handleConfig​(java.util.Map<java.lang.String,​java.lang.Object> config)
      • handleStateChunk

        protected void handleStateChunk​(Address sender,
                                        byte[] buffer,
                                        int offset,
                                        int length)
      • handleEOF

        protected void handleEOF​(Address sender)
      • handleException

        protected void handleException​(java.lang.Throwable exception)
      • getStateFromApplication

        protected void getStateFromApplication​(Address requester,
                                               java.io.OutputStream out,
                                               boolean use_separate_thread)
      • setStateInApplication

        protected void setStateInApplication​(java.io.InputStream in,
                                             java.lang.Object resource,
                                             Address provider)
      • closeBarrierAndSuspendStable

        public void closeBarrierAndSuspendStable()
      • openBarrierAndResumeStable

        public void openBarrierAndResumeStable()
      • openBarrier

        protected void openBarrier()
      • resumeStable

        protected void resumeStable()
      • sendEof

        protected void sendEof​(Address requester)
      • sendException

        protected void sendException​(Address requester,
                                     java.lang.Throwable exception)
      • createThreadPool

        protected java.util.concurrent.ThreadPoolExecutor createThreadPool()
      • determineCoordinator

        protected Address determineCoordinator()
      • handleViewChange

        protected void handleViewChange​(View v)
      • handleStateReq

        protected void handleStateReq​(Address requester)
                               throws java.lang.Exception
        Throws:
        java.lang.Exception
      • createStreamToRequester

        protected void createStreamToRequester​(Address requester)
        Creates an OutputStream to the state requester to write the state
      • createStreamToProvider

        protected abstract Tuple<java.io.InputStream,​java.lang.Object> createStreamToProvider​(Address provider,
                                                                                                    StreamingStateTransfer.StateHeader hdr)
                                                                                             throws java.lang.Exception
        Creates an InputStream to the state provider to read the state. Return the input stream and a handback object as a tuple. The handback object is handed back to the subclass when done, or in case of an error (e.g. to clean up resources)
        Throws:
        java.lang.Exception
      • close

        protected void close​(java.lang.Object resource)
      • useAsyncStateDelivery

        protected boolean useAsyncStateDelivery()
      • punchHoleFor

        protected void punchHoleFor​(Address member)
      • closeHoleFor

        protected void closeHoleFor​(Address member)