Package org.jgroups.protocols
Class ReliableMulticast
- java.lang.Object
-
- org.jgroups.stack.Protocol
-
- org.jgroups.protocols.ReliableMulticast
-
- All Implemented Interfaces:
Lifecycle,DiagnosticsHandler.ProbeHandler
public abstract class ReliableMulticast extends Protocol implements DiagnosticsHandler.ProbeHandler
Base class for reliable multicast protocols- Since:
- 5.4
- Author:
- Bela Ban
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static classReliableMulticast.Entryprotected classReliableMulticast.LastSeqnoResenderClass which is called by RetransmitTask to resend the last seqno sent (if resend_last_seqno is enabled)protected classReliableMulticast.RetransmitTaskRetransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and sends retransmit request to all members from which we have missing messages
-
Field Summary
Fields Modifier and Type Field Description protected AverageMinMaxavg_batch_sizeThe average number of messages in a receivedMessageBatchprotected static java.util.function.BiConsumer<MessageBatch,Message>BATCH_ACCUMULATORprotected java.util.Queue<Message>become_server_queueprotected intbecome_server_queue_sizeprotected java.util.Map<Address,MessageBatch>cached_batchesprotected BoundedList<java.lang.String>digest_historyKeeps a bounded list of the last N digest setsprotected booleandiscard_delivered_msgsMessages that have been received in order are sent up the stack (=delivered) to the application) and removed from the retransmit table, so they can get GC'ed.protected static MessageDUMMY_OOB_MSGprotected java.util.function.Predicate<Message>HAS_HEADERprotected booleanis_serverprotected booleanis_traceprotected ReliableMulticast.LastSeqnoResenderlast_seqno_resenderprotected booleanleavingprotected ReliableMulticast.Entrylocal_send_entryprotected Buffer<Message>local_xmit_tableprotected booleanlog_discard_msgsIf true, logs messages discarded because received from other membersprotected booleanlog_not_found_msgsprotected intmax_batch_sizeprotected intmax_xmit_req_sizeprotected java.util.List<Address>membersprotected java.util.function.Predicate<Message>no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgsprotected java.util.concurrent.atomic.LongAddernum_messages_receivedprotected java.util.concurrent.atomic.LongAddernum_messages_sentprotected static java.util.function.Predicate<Message>remove_filterprotected booleanreuse_message_batchesprotected booleanrunningprotected booleansend_atomicallyprotected booleansends_can_blockprotected java.util.concurrent.atomic.AtomicLongseqnoprotected java.util.function.Function<Message,java.lang.Long>SEQNO_GETTERprotected BoundedList<java.lang.String>stability_msgsKeeps the last N stability messagesprotected java.util.Map<Address,java.lang.Long>stable_xmit_mapprotected SuppressLog<Address>suppress_log_non_memberLog to suppress identical warnings for messages from non-membersprotected longsuppress_time_non_member_warningsprotected TimeSchedulertimerprotected booleanuse_mcast_xmitRetransmit messages using multicast rather than unicast.protected booleanuse_mcast_xmit_reqUse an mcast to request retransmission of missing messages.protected Viewviewprotected booleanxmit_from_random_memberAsk a random member for retransmission of a missing message.protected longxmit_intervalprotected java.util.concurrent.atomic.LongAdderxmit_reqs_receivedprotected java.util.concurrent.atomic.LongAdderxmit_reqs_sentprotected java.util.concurrent.atomic.LongAdderxmit_rsps_receivedprotected java.util.concurrent.atomic.LongAdderxmit_rsps_sentprotected java.util.Map<Address,ReliableMulticast.Entry>xmit_tableMap to store sent and received messages (keyed by sender)protected java.util.concurrent.Future<?>xmit_taskRetransmitTask running every xmit_interval msprotected java.util.Map<Address,java.lang.Long>xmit_task_mapUsed by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539)-
Fields inherited from class org.jgroups.stack.Protocol
after_creation_hook, down_prot, ergonomics, id, local_addr, log, policies, stack, stats, up_prot
-
-
Constructor Summary
Constructors Constructor Description ReliableMulticast()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected booleanaddToSendBuffer(Buffer<Message> win, long seq, Message msg, java.util.function.Predicate<Message> filter)Adds the message to the send buffer.protected voidadjustReceivers(java.util.List<Address> members)Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0).protected java.util.Queue<Message>becomeServerQueue()ReliableMulticastclearCachedBatches()voidclearNonMemberCache()protected abstract Buffer<Message>createXmitWindow(long initial_seqno)protected voiddeliver(Message msg, Address sender, long seqno, ReliableMulticast.Entry entry, java.lang.String error_msg)protected voiddeliverBatch(MessageBatch batch, ReliableMulticast.Entry entry)booleandiscardDeliveredMsgs()ReliableMulticastdiscardDeliveredMsgs(boolean d)java.lang.Objectdown(Event evt)Callback.java.lang.Objectdown(Message msg)A message is sent down the stack.protected voidflushBecomeServerQueue()Flushes the queue.intgetBecomeServerQueueSize()intgetBecomeServerQueueSizeActual()<T extends Buffer<Message>>
TgetBuf(Address sender)Returns the receive window for sender; only used for testing.longgetCurrentSeqno()DigestgetDigest()Returns a message digest: for each member P the highest delivered and received seqno is addedDigestgetDigest(Address mbr)protected ReliableMulticast.EntrygetEntry(Address sender)intgetMaxXmitReqSize()intgetNonMemberMessages()longgetNumMessagesReceived()longgetNumMessagesSent()longgetSizeOfAllMessages()longgetSizeOfAllMessagesInclHeaders()longgetSuppressTimeNonMemberWarnings()longgetXmitInterval()longgetXmitRequestsReceived()longgetXmitRequestsSent()longgetXmitResponsesReceived()longgetXmitResponsesSent()longgetXmitTableCapacity()intgetXmitTableMissingMessages()intgetXmitTableUndeliveredMsgs()protected voidhandleAck(Address sender, long ack)protected voidhandleHighestSeqno(Address sender, long seqno)Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmissionprotected voidhandleMessage(Message msg, NakAckHeader hdr)Finds the corresponding retransmit buffer and adds the message to it (according to seqno).protected voidhandleMessageBatch(MessageBatch mb)java.util.Map<java.lang.String,java.lang.String>handleProbe(java.lang.String... keys)Handles a probe.protected voidhandleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender)Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.protected voidhandleXmitRsp(Message msg, NakAckHeader hdr)voidinit()Called after a protocol has been created and before the protocol is started.booleanisTrace()ReliableMulticastisTrace(boolean i)booleanisXmitTaskRunning()booleanlogDiscardMessages()ReliableMulticastlogDiscardMessages(boolean l)booleanlogNotFoundMessages()ReliableMulticastlogNotFoundMessages(boolean flag)protected voidmergeDigest(Digest digest)For all members of the digest, adjust the retransmit buffers in xmit_table.protected MessagemsgFromXmitRsp(Message msg, NakAckHeader hdr)protected booleanneedToSendAck(ReliableMulticast.Entry __)protected booleanneedToSendAck(ReliableMulticast.Entry e, int num_acks)protected voidoverwriteDigest(Digest digest)Overwrites existing entries, but does NOT remove entries not found in the digestjava.lang.StringprintBatches()java.lang.StringprintCachedBatches()java.lang.StringprintDigestHistory()java.lang.StringprintMessages()java.lang.StringprintStabilityMessages()java.util.List<java.lang.Integer>providedUpServices()List of events that are provided to layers above (they will be handled when sent down from above)protected voidqueueMessage(Message msg, long seqno)protected voidremoveAndDeliver(Buffer<Message> win, ReliableMulticast.Entry e, Address sender, boolean loopback, AsciiString cluster)Efficient way of checking whether another thread is already processing messages from sender.protected voidresend(Message msg)protected voidreset()voidresetStats()protected voidretransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request)protected voidretransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request)booleanreuseMessageBatches()ReliableMulticastreuseMessageBatches(boolean b)protected booleansend(Message msg, Buffer<Message> win, boolean dont_loopback_set)protected voidsendAck(Address to, Buffer<Message> win)booleansendAtomically()ReliableMulticastsendAtomically(boolean f)protected Buffer<Message>sendBuf()booleansendBufferCanBlock()Whether addition of a message to the send buffer can blockprotected ReliableMulticast.EntrysendEntry()booleansendsCanBlock()ReliableMulticastsendsCanBlock(boolean s)protected voidsendXmitRsp(Address dest, Message msg)Sends a message msg to the requester.ReliableMulticastsetBecomeServerQueueSize(int b)protected voidsetDigest(Digest digest)Creates a retransmit buffer for each sender in the digest according to the sender's seqno.protected voidsetDigest(Digest digest, boolean merge)Sets or merges the digest.ReliableMulticastsetDiscardDeliveredMsgs(boolean d)<T extends Protocol>
TsetLevel(java.lang.String level)Sets the level of a logger.ReliableMulticastsetMaxXmitReqSize(int m)ReliableMulticastsetSuppressTimeNonMemberWarnings(long s)voidsetTimer(TimeScheduler timer)Only used for unit tests, don't use !ReliableMulticastsetXmitFromRandomMember(boolean r)ReliableMulticastsetXmitInterval(long x)protected static longsizeOfAllMessages(Buffer<Message> win, boolean include_headers)protected voidstable(Digest digest)Garbage collect messages that have been seen by all members.voidstart()This method is called on aJChannel.connect(String); starts work.protected voidstartRetransmitTask()voidstop()Called on aJChannel.disconnect(); stops work (e.g.protected voidstopRetransmitTask()java.lang.String[]supportedKeys()Returns a list of supported keysvoidtriggerXmit()protected voidunknownMember(Address sender, java.lang.Object message)java.lang.Objectup(Event evt)Callback.java.lang.Objectup(Message msg)A single message was received.voidup(MessageBatch mb)Sends up a multiple messages in aMessageBatch.booleanuseMcastXmit()ReliableMulticastuseMcastXmit(boolean u)booleanuseMcastXmitReq()ReliableMulticastuseMcastXmitReq(boolean flag)booleanxmitFromRandomMember()ReliableMulticastxmitFromRandomMember(boolean x)-
Methods inherited from class org.jgroups.stack.Protocol
accept, addPolicy, addr, addr, afterCreationHook, destroy, down, enableStats, getAddress, getComponents, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getLog, getName, getPolicies, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, policies, providedDownServices, removePolicy, requiredDownServices, requiredUpServices, resetStatistics, setAddress, setDownProtocol, setErgonomics, setId, setPolicies, setProtocolStack, setSocketFactory, setUpProtocol, setValue, statsEnabled, toString
-
-
-
-
Field Detail
-
use_mcast_xmit
protected boolean use_mcast_xmit
Retransmit messages using multicast rather than unicast. This has the advantage that, if many receivers lost a message, the sender only retransmits once
-
use_mcast_xmit_req
protected boolean use_mcast_xmit_req
Use an mcast to request retransmission of missing messages. May be costly as every member will send a response
-
xmit_from_random_member
protected boolean xmit_from_random_member
Ask a random member for retransmission of a missing message. If true, discard_delivered_msgs is set to false
-
discard_delivered_msgs
protected boolean discard_delivered_msgs
Messages that have been received in order are sent up the stack (=delivered) to the application) and removed from the retransmit table, so they can get GC'ed. When this property is true, everyone (except the sender of a message) removes the message from their retransmit table as soon as it has been delivered to the application
-
log_discard_msgs
protected boolean log_discard_msgs
If true, logs messages discarded because received from other members
-
log_not_found_msgs
protected boolean log_not_found_msgs
-
xmit_interval
protected long xmit_interval
-
become_server_queue_size
protected int become_server_queue_size
-
suppress_time_non_member_warnings
protected long suppress_time_non_member_warnings
-
max_xmit_req_size
protected int max_xmit_req_size
-
max_batch_size
protected int max_batch_size
-
reuse_message_batches
protected boolean reuse_message_batches
-
send_atomically
protected boolean send_atomically
-
sends_can_block
protected boolean sends_can_block
-
num_messages_sent
protected final java.util.concurrent.atomic.LongAdder num_messages_sent
-
num_messages_received
protected final java.util.concurrent.atomic.LongAdder num_messages_received
-
DUMMY_OOB_MSG
protected static final Message DUMMY_OOB_MSG
-
no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
protected final java.util.function.Predicate<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
-
remove_filter
protected static final java.util.function.Predicate<Message> remove_filter
-
BATCH_ACCUMULATOR
protected static final java.util.function.BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR
-
SEQNO_GETTER
protected final java.util.function.Function<Message,java.lang.Long> SEQNO_GETTER
-
HAS_HEADER
protected final java.util.function.Predicate<Message> HAS_HEADER
-
xmit_reqs_received
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_received
-
xmit_reqs_sent
protected final java.util.concurrent.atomic.LongAdder xmit_reqs_sent
-
xmit_rsps_received
protected final java.util.concurrent.atomic.LongAdder xmit_rsps_received
-
xmit_rsps_sent
protected final java.util.concurrent.atomic.LongAdder xmit_rsps_sent
-
avg_batch_size
protected final AverageMinMax avg_batch_size
The average number of messages in a receivedMessageBatch
-
is_trace
protected boolean is_trace
-
is_server
protected volatile boolean is_server
-
members
protected volatile java.util.List<Address> members
-
view
protected volatile View view
-
seqno
protected final java.util.concurrent.atomic.AtomicLong seqno
-
xmit_table
protected final java.util.Map<Address,ReliableMulticast.Entry> xmit_table
Map to store sent and received messages (keyed by sender)
-
local_send_entry
protected ReliableMulticast.Entry local_send_entry
-
xmit_task
protected java.util.concurrent.Future<?> xmit_task
RetransmitTask running every xmit_interval ms
-
xmit_task_map
protected final java.util.Map<Address,java.lang.Long> xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.redhat.com/browse/JGRP-1539)
-
stable_xmit_map
protected final java.util.Map<Address,java.lang.Long> stable_xmit_map
-
cached_batches
protected final java.util.Map<Address,MessageBatch> cached_batches
-
leaving
protected volatile boolean leaving
-
running
protected volatile boolean running
-
timer
protected TimeScheduler timer
-
last_seqno_resender
protected final ReliableMulticast.LastSeqnoResender last_seqno_resender
-
stability_msgs
protected final BoundedList<java.lang.String> stability_msgs
Keeps the last N stability messages
-
digest_history
protected final BoundedList<java.lang.String> digest_history
Keeps a bounded list of the last N digest sets
-
become_server_queue
protected java.util.Queue<Message> become_server_queue
-
suppress_log_non_member
protected SuppressLog<Address> suppress_log_non_member
Log to suppress identical warnings for messages from non-members
-
-
Method Detail
-
isXmitTaskRunning
public boolean isXmitTaskRunning()
-
getNonMemberMessages
public int getNonMemberMessages()
-
sendBufferCanBlock
public boolean sendBufferCanBlock()
Whether addition of a message to the send buffer can block
-
clearNonMemberCache
public void clearNonMemberCache()
-
printCachedBatches
public java.lang.String printCachedBatches()
-
clearCachedBatches
public ReliableMulticast clearCachedBatches()
-
getXmitRequestsReceived
public long getXmitRequestsReceived()
-
getXmitRequestsSent
public long getXmitRequestsSent()
-
getXmitResponsesReceived
public long getXmitResponsesReceived()
-
getXmitResponsesSent
public long getXmitResponsesSent()
-
useMcastXmit
public boolean useMcastXmit()
-
useMcastXmit
public ReliableMulticast useMcastXmit(boolean u)
-
useMcastXmitReq
public boolean useMcastXmitReq()
-
useMcastXmitReq
public ReliableMulticast useMcastXmitReq(boolean flag)
-
xmitFromRandomMember
public boolean xmitFromRandomMember()
-
xmitFromRandomMember
public ReliableMulticast xmitFromRandomMember(boolean x)
-
discardDeliveredMsgs
public boolean discardDeliveredMsgs()
-
discardDeliveredMsgs
public ReliableMulticast discardDeliveredMsgs(boolean d)
-
logDiscardMessages
public boolean logDiscardMessages()
-
logDiscardMessages
public ReliableMulticast logDiscardMessages(boolean l)
-
logNotFoundMessages
public boolean logNotFoundMessages()
-
logNotFoundMessages
public ReliableMulticast logNotFoundMessages(boolean flag)
-
setXmitFromRandomMember
public ReliableMulticast setXmitFromRandomMember(boolean r)
-
setDiscardDeliveredMsgs
public ReliableMulticast setDiscardDeliveredMsgs(boolean d)
-
getXmitInterval
public long getXmitInterval()
-
setXmitInterval
public ReliableMulticast setXmitInterval(long x)
-
getBecomeServerQueueSize
public int getBecomeServerQueueSize()
-
setBecomeServerQueueSize
public ReliableMulticast setBecomeServerQueueSize(int b)
-
getSuppressTimeNonMemberWarnings
public long getSuppressTimeNonMemberWarnings()
-
setSuppressTimeNonMemberWarnings
public ReliableMulticast setSuppressTimeNonMemberWarnings(long s)
-
getMaxXmitReqSize
public int getMaxXmitReqSize()
-
setMaxXmitReqSize
public ReliableMulticast setMaxXmitReqSize(int m)
-
sendsCanBlock
public boolean sendsCanBlock()
-
sendsCanBlock
public ReliableMulticast sendsCanBlock(boolean s)
-
getNumMessagesSent
public long getNumMessagesSent()
-
getNumMessagesReceived
public long getNumMessagesReceived()
-
reuseMessageBatches
public boolean reuseMessageBatches()
-
reuseMessageBatches
public ReliableMulticast reuseMessageBatches(boolean b)
-
sendAtomically
public boolean sendAtomically()
-
sendAtomically
public ReliableMulticast sendAtomically(boolean f)
-
isTrace
public boolean isTrace()
-
isTrace
public ReliableMulticast isTrace(boolean i)
-
setLevel
public <T extends Protocol> T setLevel(java.lang.String level)
Description copied from class:ProtocolSets the level of a logger. This method is used to dynamically change the logging level of a running system, e.g. via JMX. The appender of a level needs to exist.
-
getBecomeServerQueueSizeActual
public int getBecomeServerQueueSizeActual()
-
getBuf
public <T extends Buffer<Message>> T getBuf(Address sender)
Returns the receive window for sender; only used for testing. Do not use !
-
getEntry
protected ReliableMulticast.Entry getEntry(Address sender)
-
setTimer
public void setTimer(TimeScheduler timer)
Only used for unit tests, don't use !
-
getXmitTableUndeliveredMsgs
public int getXmitTableUndeliveredMsgs()
-
getXmitTableMissingMessages
public int getXmitTableMissingMessages()
-
getXmitTableCapacity
public long getXmitTableCapacity()
-
getSizeOfAllMessages
public long getSizeOfAllMessages()
-
getSizeOfAllMessagesInclHeaders
public long getSizeOfAllMessagesInclHeaders()
-
printMessages
public java.lang.String printMessages()
-
printBatches
public java.lang.String printBatches()
-
getCurrentSeqno
public long getCurrentSeqno()
-
printStabilityMessages
public java.lang.String printStabilityMessages()
-
printDigestHistory
public java.lang.String printDigestHistory()
-
sendEntry
protected ReliableMulticast.Entry sendEntry()
-
resetStats
public void resetStats()
- Overrides:
resetStatsin classProtocol
-
init
public void init() throws java.lang.ExceptionDescription copied from class:ProtocolCalled after a protocol has been created and before the protocol is started. Attributes are already set. Other protocols are not yet connected and events cannot yet be sent.
-
providedUpServices
public java.util.List<java.lang.Integer> providedUpServices()
Description copied from class:ProtocolList of events that are provided to layers above (they will be handled when sent down from above)- Overrides:
providedUpServicesin classProtocol
-
start
public void start() throws java.lang.ExceptionDescription copied from class:ProtocolThis method is called on aJChannel.connect(String); starts work. Protocols are connected ready to receive events. Will be called from bottom to top.- Specified by:
startin interfaceLifecycle- Overrides:
startin classProtocol- Throws:
java.lang.Exception- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack to fail, soJChannel.connect(String)will throw an exception
-
stop
public void stop()
Description copied from class:ProtocolCalled on aJChannel.disconnect(); stops work (e.g. by closing multicast socket). Will be called from top to bottom.
-
down
public java.lang.Object down(Event evt)
Callback. Called by superclass when event may be handled.Do not use
down_prot.down()in this method as the event is passed down by default by the superclass after this method returns !
-
down
public java.lang.Object down(Message msg)
Description copied from class:ProtocolA message is sent down the stack. Protocols may examine the message and do something (e.g. add a header) with it, before passing it down.
-
up
public java.lang.Object up(Event evt)
Callback. Called by superclass when event may be handled.Do not use
passUpin this method as the event is passed up by default by the superclass after this method returns !
-
up
public java.lang.Object up(Message msg)
Description copied from class:ProtocolA single message was received. Protocols may examine the message and do something (e.g. add a header) with it before passing it up.
-
up
public void up(MessageBatch mb)
Description copied from class:ProtocolSends up a multiple messages in aMessageBatch. The sender of the batch is always the same, and so is the destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed messages, although the transport itself will create initial MessageBatches that contain only either OOB or regular messages. The default processing below sends messages up the stack individually, based on a matching criteria (callingProtocol.accept(Message)), and - if true - callsProtocol.up(org.jgroups.Event)for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped. Subclasses should check if there are any messages destined for them (e.g. usingMessageBatch.iterator(Predicate)), then possibly remove and process them and finally pass the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all encrypted messages in the batch, not remove them, and pass the batch up when done.
-
handleProbe
public java.util.Map<java.lang.String,java.lang.String> handleProbe(java.lang.String... keys)
Description copied from interface:DiagnosticsHandler.ProbeHandlerHandles a probe. For each key that is handled, the key and its result should be in the returned map.- Specified by:
handleProbein interfaceDiagnosticsHandler.ProbeHandler- Returns:
- Map
. A map of keys and values. A null return value is permissible.
-
supportedKeys
public java.lang.String[] supportedKeys()
Description copied from interface:DiagnosticsHandler.ProbeHandlerReturns a list of supported keys- Specified by:
supportedKeysin interfaceDiagnosticsHandler.ProbeHandler
-
becomeServerQueue
protected java.util.Queue<Message> becomeServerQueue()
-
queueMessage
protected void queueMessage(Message msg, long seqno)
-
unknownMember
protected void unknownMember(Address sender, java.lang.Object message)
-
addToSendBuffer
protected boolean addToSendBuffer(Buffer<Message> win, long seq, Message msg, java.util.function.Predicate<Message> filter)
Adds the message to the send buffer. The loop tries to handle temporary OOMEs by retrying if add() failed
-
resend
protected void resend(Message msg)
-
handleMessage
protected void handleMessage(Message msg, NakAckHeader hdr)
Finds the corresponding retransmit buffer and adds the message to it (according to seqno). Then removes as many messages as possible and passes them up the stack. Discards messages from non-members.
-
handleMessageBatch
protected void handleMessageBatch(MessageBatch mb)
-
removeAndDeliver
protected void removeAndDeliver(Buffer<Message> win, ReliableMulticast.Entry e, Address sender, boolean loopback, AsciiString cluster)
Efficient way of checking whether another thread is already processing messages from sender. If that's the case, we return immediately and let the existing thread process our message (https://issues.redhat.com/browse/JGRP-829). Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
-
handleXmitReq
protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender)
Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester, called when XMIT_REQ is received.- Parameters:
xmit_requester- The sender of the XMIT_REQ, we have to send the requested copy of the message to this addressmissing_msgs- A list of seqnos that have to be retransmittedoriginal_sender- The member who originally sent the messsage. Guaranteed to be non-null
-
deliver
protected void deliver(Message msg, Address sender, long seqno, ReliableMulticast.Entry entry, java.lang.String error_msg)
-
deliverBatch
protected void deliverBatch(MessageBatch batch, ReliableMulticast.Entry entry)
-
flushBecomeServerQueue
protected void flushBecomeServerQueue()
Flushes the queue. Done in a separate thread as we don't want to block theGMS.installView(View, Digest)method (called when a view is installed).
-
sendXmitRsp
protected void sendXmitRsp(Address dest, Message msg)
Sends a message msg to the requester. We have to wrap the original message into a retransmit message, as we need to preserve the original message's properties, such as src, headers etc.- Parameters:
dest-msg-
-
handleXmitRsp
protected void handleXmitRsp(Message msg, NakAckHeader hdr)
-
handleHighestSeqno
protected void handleHighestSeqno(Address sender, long seqno)
Compares the sender's highest seqno with my highest seqno: if the sender's is higher, ask sender for retransmission- Parameters:
sender- The senderseqno- The highest seqno sent by sender
-
handleAck
protected void handleAck(Address sender, long ack)
-
needToSendAck
protected boolean needToSendAck(ReliableMulticast.Entry __)
-
needToSendAck
protected boolean needToSendAck(ReliableMulticast.Entry e, int num_acks)
-
msgFromXmitRsp
protected Message msgFromXmitRsp(Message msg, NakAckHeader hdr)
-
adjustReceivers
protected void adjustReceivers(java.util.List<Address> members)
Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0). This method is not called concurrently
-
getDigest
public Digest getDigest()
Returns a message digest: for each member P the highest delivered and received seqno is added
-
setDigest
protected void setDigest(Digest digest)
Creates a retransmit buffer for each sender in the digest according to the sender's seqno. If a buffer already exists, it resets it.
-
mergeDigest
protected void mergeDigest(Digest digest)
For all members of the digest, adjust the retransmit buffers in xmit_table. If no entry exists, create one with the initial seqno set to the seqno of the member in the digest. If the member already exists, and is not the local address, replace it with the new entry (https://issues.redhat.com/browse/JGRP-699) if the digest's seqno is greater than the seqno in the window.
-
overwriteDigest
protected void overwriteDigest(Digest digest)
Overwrites existing entries, but does NOT remove entries not found in the digest
-
setDigest
protected void setDigest(Digest digest, boolean merge)
Sets or merges the digest. If there is no entry for a given member in xmit_table, create a new buffer. Else skip the existing entry, unless it is a merge. In this case, skip the existing entry if its seqno is greater than or equal to the one in the digest, or reset the window and create a new one if not.- Parameters:
digest- The digestmerge- Whether to merge the new digest with our own, or not
-
stable
protected void stable(Digest digest)
Garbage collect messages that have been seen by all members. Update sent_msgs: for the sender P in the digest which is equal to the local address, garbage collect all messages <= seqno at digest[P]. Update xmit_table: for each sender P in the digest and its highest seqno seen SEQ, garbage collect all delivered_msgs in the retransmit buffer corresponding to P which are <= seqno at digest[P].
-
retransmit
protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request)
-
retransmit
protected void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request)
-
reset
protected void reset()
-
sizeOfAllMessages
protected static long sizeOfAllMessages(Buffer<Message> win, boolean include_headers)
-
startRetransmitTask
protected void startRetransmitTask()
-
stopRetransmitTask
protected void stopRetransmitTask()
-
triggerXmit
public void triggerXmit()
-
-