Class TransferQueueBundler2

  • All Implemented Interfaces:
    java.lang.Runnable, Bundler

    public class TransferQueueBundler2
    extends java.lang.Object
    implements Bundler, java.lang.Runnable
    This bundler adds all (unicast or multicast) messages to a queue until max size has been exceeded, but does send messages immediately when no other messages are available. https://issues.redhat.com/browse/JGRP-1540
    The difference to TransferQueueBundler is that a size is maintained
    per destination
    and we maintain byte arrays of max_bundle_size per destination into which we marshall a message directly when it is sent.
    • Field Detail

      • max_size

        protected int max_size
        Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller than the largest datagram packet size in case of UDP
      • capacity

        protected int capacity
      • poll_timeout

        protected long poll_timeout
      • transport

        protected TP transport
      • log

        protected Log log
      • queue

        protected java.util.concurrent.BlockingQueue<Message> queue
      • remove_queue

        protected java.util.List<Message> remove_queue
      • bundler_thread

        protected volatile java.lang.Thread bundler_thread
      • running

        protected volatile boolean running
      • num_sends_because_full_queue

        protected long num_sends_because_full_queue
      • num_sends_because_no_msgs

        protected long num_sends_because_no_msgs
      • messages

        protected final java.util.Map<Address,​org.jgroups.protocols.TransferQueueBundler2.Buffer> messages
    • Constructor Detail

      • TransferQueueBundler2

        public TransferQueueBundler2()
      • TransferQueueBundler2

        protected TransferQueueBundler2​(java.util.concurrent.BlockingQueue<Message> queue)
      • TransferQueueBundler2

        public TransferQueueBundler2​(int capacity)
    • Method Detail

      • getThread

        public java.lang.Thread getThread()
      • getCapacity

        public int getCapacity()
        Description copied from interface: Bundler
        If the bundler implementation supports a capacity (e.g. RingBufferBundler, then return it, else return -1
        Specified by:
        getCapacity in interface Bundler
      • setCapacity

        public Bundler setCapacity​(int c)
      • getMaxSize

        public int getMaxSize()
        Description copied from interface: Bundler
        Maximum number of bytes for messages to be queued until they are sent
        Specified by:
        getMaxSize in interface Bundler
      • getQueueSize

        public int getQueueSize()
        Description copied from interface: Bundler
        If the bundler has a queue and it should be managed by a queuing discipline (like Random Early Detection), then return the number of elements in the queue, else -1. In the latter case, the queue won't be managed.
        This method needs to be fast as it might get called on every message to be sent.
        Specified by:
        getQueueSize in interface Bundler
      • removeQueueSize

        public int removeQueueSize()
      • dump

        public java.lang.String dump()
      • init

        public void init​(TP transport)
        Description copied from interface: Bundler
        Called after creation of the bundler
        Specified by:
        init in interface Bundler
        Parameters:
        transport - the transport, for further reference
      • resetStats

        public void resetStats()
        Specified by:
        resetStats in interface Bundler
      • stop

        public void stop()
        Specified by:
        stop in interface Bundler
      • size

        public int size()
        Description copied from interface: Bundler
        The number of unsent messages in the bundler
        Specified by:
        size in interface Bundler
      • send

        public void send​(Message msg)
                  throws java.lang.Exception
        Specified by:
        send in interface Bundler
        Throws:
        java.lang.Exception
      • run

        public void run()
        Specified by:
        run in interface java.lang.Runnable
      • hasMessages

        protected boolean hasMessages()
      • addAndSendIfSizeExceeded

        protected void addAndSendIfSizeExceeded​(Message msg)
      • sendBundledMessages

        protected void sendBundledMessages()
      • drain

        protected void drain()
        Takes all messages from the queue, adds them to the hashmap and then sends all bundled messages
      • assertPositive

        protected static int assertPositive​(int value,
                                            java.lang.String message)