Class Table<T>

  • All Implemented Interfaces:
    java.lang.Iterable<T>

    public class Table<T>
    extends java.lang.Object
    implements java.lang.Iterable<T>
    A store for elements (typically messages) to be retransmitted or delivered. Used on sender and receiver side, as a replacement for HashMap. Table should use less memory than HashMap, as HashMap.Entry has 4 fields, plus arrays for storage.

    Table maintains a matrix (an array of arrays) of elements, which are stored in the matrix by mapping their seqno to an index. E.g. when we have 10 rows of 1000 elements each, and first_seqno is 3000, then an element with seqno=5600, will be stored in the 3rd row, at index 600.

    Rows are removed when all elements in that row have been received.

    Table started out as a copy of RetransmitTable, but is synchronized and maintains its own low, hd and hr pointers, so it can be used as a replacement for NakReceiverWindow. The idea is to put messages into Table, deliver them in order of seqnos, and periodically scan over all tables in NAKACK2 to do retransmission.

    Version:
    3.1
    Author:
    Bela Ban
    • Constructor Summary

      Constructors 
      Constructor Description
      Table()  
      Table​(int num_rows, int elements_per_row, long offset)  
      Table​(int num_rows, int elements_per_row, long offset, double resize_factor)  
      Table​(int num_rows, int elements_per_row, long offset, double resize_factor, long max_compaction_time)
      Creates a new table
      Table​(long offset)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      protected boolean _add​(long seqno, T element, boolean check_if_resize_needed, java.util.function.Predicate<T> remove_filter)  
      protected void _compact()
      Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly.
      T _get​(long seqno)
      To be used only for testing; doesn't do any index or sanity checks
      boolean add​(long seqno, T element)
      Adds an element if the element at the given index is null.
      boolean add​(long seqno, T element, java.util.function.Predicate<T> remove_filter)
      Adds an element if the element at the given index is null.
      boolean add​(java.util.List<LongTuple<T>> list)
      Adds elements from list to the table
      boolean add​(java.util.List<LongTuple<T>> list, boolean remove_added_elements)
      Adds elements from list to the table, removes elements from list that were not added to the table
      boolean add​(java.util.List<LongTuple<T>> list, boolean remove_added_elements, T const_value)
      Adds elements from the list to the table
      int capacity()
      Returns the total capacity in the matrix
      void compact()  
      protected int computeIndex​(long seqno)
      Computes and returns the index within a row for seqno
      protected int computeRow​(long seqno)
      Computes and returns the row index for seqno.
      int computeSize()
      Iterate from low to hr and add up non-null values.
      java.lang.String dump()
      Dumps the seqnos in the table as a list
      protected long findHighestSeqno​(java.util.List<LongTuple<T>> list)  
      void forEach​(long from, long to, Table.Visitor<T> visitor)
      Iterates over the matrix with range [from ..
      T get​(long seqno)
      Returns an element at seqno
      java.util.concurrent.atomic.AtomicInteger getAdders()  
      long[] getDigest()  
      int getElementsPerRow()  
      long getHighestDeliverable()
      Returns the highest deliverable (= removable) seqno.
      long getHighestDelivered()  
      long getHighestReceived()  
      long getLow()  
      long getMaxCompactionTime()  
      SeqnoList getMissing()
      Returns a list of missing (= null) elements
      SeqnoList getMissing​(int max_msgs)
      Returns a list of missing messages
      int getNumCompactions()  
      int getNumDeliverable()
      Returns the number of messages that can be delivered
      int getNumMissing()
      Returns the number of null elements in the range [hd+1 ..
      int getNumMoves()  
      int getNumPurges()  
      int getNumResizes()  
      int getNumRows()  
      long getOffset()  
      protected T[] getRow​(int index)
      Returns a row.
      boolean isEmpty()  
      java.util.Iterator<T> iterator()  
      java.util.Iterator<T> iterator​(long from, long to)  
      protected void move​(int num_rows)
      Moves contents of matrix num_rows down.
      void purge​(long seqno)
      Removes all elements less than or equal to seqno from the table.
      void purge​(long seqno, boolean force)
      Removes all elements less than or equal to seqno from the table.
      T remove()  
      T remove​(boolean nullify)
      Removes the next non-null element and nulls the index if nullify=true
      java.util.List<T> removeMany​(boolean nullify, int max_results)  
      java.util.List<T> removeMany​(boolean nullify, int max_results, java.util.function.Predicate<T> filter)  
      <R> R removeMany​(boolean nullify, int max_results, java.util.function.Predicate<T> filter, java.util.function.Supplier<R> result_creator, java.util.function.BiConsumer<R,​T> accumulator)
      Removes elements from the table and adds them to the result created by result_creator.
      void resetStats()  
      protected void resize​(long seqno)
      Moves rows down the matrix, by removing purged rows.
      void setHighestDelivered​(long seqno)
      Only used internally by JGroups on a state transfer.
      void setMaxCompactionTime​(long max_compaction_time)  
      int size()
      Returns an appromximation of the number of elements in the table
      java.util.stream.Stream<T> stream()  
      java.util.stream.Stream<T> stream​(long from, long to)  
      java.lang.String toString()  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
      • Methods inherited from interface java.lang.Iterable

        forEach, spliterator
    • Field Detail

      • num_rows

        protected final int num_rows
      • elements_per_row

        protected final int elements_per_row
        Must be a power of 2 for efficient modular arithmetic
      • resize_factor

        protected final double resize_factor
      • matrix

        protected T[][] matrix
      • offset

        protected long offset
        The first seqno, at matrix[0][0]
      • size

        protected int size
      • low

        protected long low
        The highest seqno purged
      • hr

        protected long hr
        The highest received seqno
      • hd

        protected long hd
        The highest delivered (= removed) seqno
      • max_compaction_time

        protected long max_compaction_time
        Time (in nanoseconds) after which a compaction should take place. 0 disables compaction
      • last_compaction_timestamp

        protected long last_compaction_timestamp
        The time when the last compaction took place. If a compact() takes place and sees that the last compaction is more than max_compaction_time nanoseconds ago, a compaction will take place
      • lock

        protected final java.util.concurrent.locks.Lock lock
      • adders

        protected final java.util.concurrent.atomic.AtomicInteger adders
      • num_compactions

        protected int num_compactions
      • num_resizes

        protected int num_resizes
      • num_moves

        protected int num_moves
      • num_purges

        protected int num_purges
      • DEFAULT_MAX_COMPACTION_TIME

        protected static final long DEFAULT_MAX_COMPACTION_TIME
        See Also:
        Constant Field Values
      • DEFAULT_RESIZE_FACTOR

        protected static final double DEFAULT_RESIZE_FACTOR
        See Also:
        Constant Field Values
    • Constructor Detail

      • Table

        public Table()
      • Table

        public Table​(long offset)
      • Table

        public Table​(int num_rows,
                     int elements_per_row,
                     long offset)
      • Table

        public Table​(int num_rows,
                     int elements_per_row,
                     long offset,
                     double resize_factor)
      • Table

        public Table​(int num_rows,
                     int elements_per_row,
                     long offset,
                     double resize_factor,
                     long max_compaction_time)
        Creates a new table
        Parameters:
        num_rows - the number of rows in the matrix
        elements_per_row - the number of elements per row
        offset - the seqno before the first seqno to be inserted. E.g. if 0 then the first seqno will be 1
        resize_factor - teh factor with which to increase the number of rows
        max_compaction_time - the max time in milliseconds after we attempt a compaction
    • Method Detail

      • getAdders

        public java.util.concurrent.atomic.AtomicInteger getAdders()
      • getOffset

        public long getOffset()
      • getElementsPerRow

        public int getElementsPerRow()
      • capacity

        public int capacity()
        Returns the total capacity in the matrix
      • getNumCompactions

        public int getNumCompactions()
      • getNumMoves

        public int getNumMoves()
      • getNumResizes

        public int getNumResizes()
      • getNumPurges

        public int getNumPurges()
      • size

        public int size()
        Returns an appromximation of the number of elements in the table
      • isEmpty

        public boolean isEmpty()
      • getLow

        public long getLow()
      • getHighestDelivered

        public long getHighestDelivered()
      • getHighestReceived

        public long getHighestReceived()
      • getMaxCompactionTime

        public long getMaxCompactionTime()
      • setMaxCompactionTime

        public void setMaxCompactionTime​(long max_compaction_time)
      • getNumRows

        public int getNumRows()
      • resetStats

        public void resetStats()
      • getHighestDeliverable

        public long getHighestDeliverable()
        Returns the highest deliverable (= removable) seqno. This may be higher than getHighestDelivered(), e.g. if elements have been added but not yet removed
      • getNumDeliverable

        public int getNumDeliverable()
        Returns the number of messages that can be delivered
      • setHighestDelivered

        public void setHighestDelivered​(long seqno)
        Only used internally by JGroups on a state transfer. Please don't use this in application code, or you're on your own !
        Parameters:
        seqno -
      • add

        public boolean add​(long seqno,
                           T element)
        Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
        Parameters:
        seqno -
        element -
        Returns:
        True if the element at the computed index was null, else false
      • add

        public boolean add​(long seqno,
                           T element,
                           java.util.function.Predicate<T> remove_filter)
        Adds an element if the element at the given index is null. Returns true if no element existed at the given index, else returns false and doesn't set the element.
        Parameters:
        seqno -
        element -
        remove_filter - If not null, a filter used to remove all consecutive messages passing the filter
        Returns:
        True if the element at the computed index was null, else false
      • add

        public boolean add​(java.util.List<LongTuple<T>> list)
        Adds elements from list to the table
        Parameters:
        list -
        Returns:
        True if at least 1 element was added successfully
      • add

        public boolean add​(java.util.List<LongTuple<T>> list,
                           boolean remove_added_elements)
        Adds elements from list to the table, removes elements from list that were not added to the table
        Parameters:
        list -
        Returns:
        True if at least 1 element was added successfully. This guarantees that the list has at least 1 element
      • add

        public boolean add​(java.util.List<LongTuple<T>> list,
                           boolean remove_added_elements,
                           T const_value)
        Adds elements from the list to the table
        Parameters:
        list - The list of tuples of seqnos and elements. If remove_added_elements is true, if elements could not be added to the table (e.g. because they were already present or the seqno was < HD), those elements will be removed from list
        remove_added_elements - If true, elements that could not be added to the table are removed from list
        const_value - If non-null, this value should be used rather than the values of the list tuples
        Returns:
        True if at least 1 element was added successfully, false otherwise.
      • get

        public T get​(long seqno)
        Returns an element at seqno
        Parameters:
        seqno -
        Returns:
      • _get

        public T _get​(long seqno)
        To be used only for testing; doesn't do any index or sanity checks
        Parameters:
        seqno -
        Returns:
      • remove

        public T remove()
      • remove

        public T remove​(boolean nullify)
        Removes the next non-null element and nulls the index if nullify=true
      • removeMany

        public java.util.List<T> removeMany​(boolean nullify,
                                            int max_results)
      • removeMany

        public java.util.List<T> removeMany​(boolean nullify,
                                            int max_results,
                                            java.util.function.Predicate<T> filter)
      • removeMany

        public <R> R removeMany​(boolean nullify,
                                int max_results,
                                java.util.function.Predicate<T> filter,
                                java.util.function.Supplier<R> result_creator,
                                java.util.function.BiConsumer<R,​T> accumulator)
        Removes elements from the table and adds them to the result created by result_creator. Between 0 and max_results elements are removed. If no elements were removed, processing will be set to true while the table lock is held.
        Type Parameters:
        R - the type of the result
        Parameters:
        nullify - if true, the x,y location of the removed element in the matrix will be nulled
        max_results - the max number of results to be returned, even if more elements would be removable
        filter - a filter which accepts (or rejects) elements into the result. If null, all elements will be accepted
        result_creator - a supplier required to create the result, e.g. ArrayList::new
        accumulator - an accumulator accepting the result and an element, e.g. ArrayList::add
        Returns:
        the result
      • purge

        public void purge​(long seqno)
        Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed
        Parameters:
        seqno -
      • purge

        public void purge​(long seqno,
                          boolean force)
        Removes all elements less than or equal to seqno from the table. Does this by nulling entire rows in the matrix and nulling all elements < index(seqno) of the first row that cannot be removed.
        Parameters:
        seqno - All elements <= seqno will be nulled
        force - If true, we only ensure that seqno <= hr, but don't care about hd, and set hd=low=seqno.
      • compact

        public void compact()
      • forEach

        public void forEach​(long from,
                            long to,
                            Table.Visitor<T> visitor)
        Iterates over the matrix with range [from .. to] (including from and to), and calls Table.Visitor.visit(long,Object,int,int). If the visit() method returns false, the iteration is terminated.

        This method must be called with the lock held

        Parameters:
        from - The starting seqno
        to - The ending seqno, the range is [from .. to] including from and to
        visitor - An instance of Visitor
      • iterator

        public java.util.Iterator<T> iterator()
        Specified by:
        iterator in interface java.lang.Iterable<T>
      • iterator

        public java.util.Iterator<T> iterator​(long from,
                                              long to)
      • stream

        public java.util.stream.Stream<T> stream()
      • stream

        public java.util.stream.Stream<T> stream​(long from,
                                                 long to)
      • _add

        protected boolean _add​(long seqno,
                               T element,
                               boolean check_if_resize_needed,
                               java.util.function.Predicate<T> remove_filter)
      • findHighestSeqno

        protected long findHighestSeqno​(java.util.List<LongTuple<T>> list)
      • resize

        protected void resize​(long seqno)
        Moves rows down the matrix, by removing purged rows. If resizing to accommodate seqno is still needed, computes a new size. Then either moves existing rows down, or copies them into a new array (if resizing took place). The lock must be held by the caller of resize().
      • move

        protected void move​(int num_rows)
        Moves contents of matrix num_rows down. Avoids a System.arraycopy(). Caller must hold the lock.
      • _compact

        protected void _compact()
        Moves the contents of matrix down by the number of purged rows and resizes the matrix accordingly. The capacity of the matrix should be size * resize_factor. Caller must hold the lock.
      • computeSize

        public int computeSize()
        Iterate from low to hr and add up non-null values. Caller must hold the lock.
      • getNumMissing

        public int getNumMissing()
        Returns the number of null elements in the range [hd+1 .. hr-1] excluding hd and hr
      • getMissing

        public SeqnoList getMissing()
        Returns a list of missing (= null) elements
        Returns:
        A SeqnoList of missing messages, or null if no messages are missing
      • getMissing

        public SeqnoList getMissing​(int max_msgs)
        Returns a list of missing messages
        Parameters:
        max_msgs - If > 0, the max number of missing messages to be returned (oldest first), else no limit
        Returns:
        A SeqnoList of missing messages, or null if no messages are missing
      • getDigest

        public long[] getDigest()
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object
      • dump

        public java.lang.String dump()
        Dumps the seqnos in the table as a list
      • getRow

        protected T[] getRow​(int index)
        Returns a row. Creates a new row and inserts it at index if the row at index doesn't exist
        Parameters:
        index -
        Returns:
        A row
      • computeRow

        protected int computeRow​(long seqno)
        Computes and returns the row index for seqno. The caller must hold the lock.
      • computeIndex

        protected int computeIndex​(long seqno)
        Computes and returns the index within a row for seqno