JBoss.orgCommunity Documentation

Chapter 7. List of Protocols

7.1. Properties availabe in every protocol
7.2. Transport
7.2.1. UDP
7.2.2. TCP
7.2.3. TUNNEL
7.3. Initial membership discovery
7.3.1. Discovery
7.3.2. PING
7.3.3. TCPPING
7.3.4. TCPGOSSIP
7.3.5. MPING
7.3.6. FILE_PING
7.3.7. JDBC_PING
7.3.8. BPING
7.3.9. RACKSPACE_PING
7.3.10. S3_PING
7.3.11. SWIFT_PING
7.3.12. AWS_PING
7.3.13. PDC - Persistent Discovery Cache
7.4. Merging after a network partition
7.4.1. MERGE2
7.4.2. MERGE3
7.5. Failure Detection
7.5.1. FD
7.5.2. FD_ALL
7.5.3. FD_SOCK
7.5.4. FD_PING
7.5.5. VERIFY_SUSPECT
7.6. Reliable message transmission
7.6.1. pbcast.NAKACK
7.6.2. NAKACK2
7.6.3. UNICAST
7.6.4. UNICAST2
7.6.5. UNICAST3
7.6.6. RSVP
7.7. Message stability
7.7.1. STABLE
7.8. Group Membership
7.8.1. pbcast.GMS
7.9. Flow control
7.9.1. FC
7.9.2. MFC and UFC
7.10. Fragmentation
7.10.1. FRAG and FRAG2
7.11. Ordering
7.11.1. SEQUENCER
7.11.2. Total Order Anycast (TOA)
7.12. State Transfer
7.12.1. pbcast.STATE_TRANSFER
7.12.2. StreamingStateTransfer
7.12.3. pbcast.STATE
7.12.4. STATE_SOCK
7.12.5. BARRIER
7.13. pbcast.FLUSH
7.14. Misc
7.14.1. Statistics
7.14.2. Security
7.14.3. COMPRESS
7.14.4. SCOPE
7.14.5. RELAY
7.14.6. RELAY2
7.14.7. STOMP
7.14.8. DAISYCHAIN
7.14.9. RATE_LIMITER
7.14.10. Locking protocols
7.14.11. CENTRAL_EXECUTOR
7.14.12. COUNTER
7.14.13. SUPERVISOR

This chapter describes the most frequently used protocols, and their configuration. Ergonomics (Section 5.15, “Ergonomics”) strives to reduce the number of properties that have to be configured, by dynamically adjusting them at run time, however, this is not yet in place.

Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with JGroups), e.g. udp.xml or tcp.xml, and make only minimal changes to it.

This section is work in progress; we strive to update the documentation as we make changes to the code.

The table below lists properties that are available in all protocols, as they're defined in the superclass of all protocols, org.jgroups.stack.Protocol.


TP is the base class for all transports, e.g. UDP and TCP. All of the properties defined here are inherited by the subclasses. The properties for TP are:

Table 7.2. Properties

NameDescription
bind_addrThe bind address which should be used by this transport. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address
bind_interface_strThe interface (NIC) which should be used by this transport
bind_portThe port to which the transport binds. Default of 0 binds to any (ephemeral) port
bundler_capacityThe max number of elements in a bundler if the bundler supports size limitations
bundler_typeThe type of bundler used. Has to be "old" or "new" (default)
diagnostics_addrAddress for diagnostic probing. Default is 224.0.75.75
diagnostics_bind_interfacesComma delimited list of interfaces (IP addresses or interface names) that the diagnostics multicast socket should bind to
diagnostics_passcodeAuthorization passcode for diagnostics. If specified every probe query will be authorized
diagnostics_portPort for diagnostic probing. Default is 7500
diagnostics_ttlTTL of the diagnostics multicast socket
discard_incompatible_packetsDiscard packets with a different version if true
enable_batchingAllows the transport to pass received message batches up as MessagesBatch instances (up(MessageBatch)), rather than individual messages. This flag will be removed in a future version when batching has been implemented by all protocols
enable_bundlingEnable bundling of smaller messages into bigger ones. Default is true
enable_diagnosticsSwitch to enable diagnostic probing. Default is true
enable_unicast_bundlingEnable bundling of smaller messages into bigger ones for unicast messages. Default is true
external_addrUse "external_addr" if you have hosts on different networks, behind firewalls. On each firewall, set up a port forwarding rule (sometimes called "virtual server") to the local IP (e.g. 192.168.1.100) of the host then on each host, set "external_addr" TCP transport parameter to the external (public IP) address of the firewall.
external_portUsed to map the internal port (bind_port) to an external port. Only used if > 0
internal_thread_pool_enabledSwitch for enabling thread pool for internal messages
internal_thread_pool_keep_alive_timeTimeout in ms to remove idle threads from the internal pool
internal_thread_pool_max_threadsMaximum thread pool size for the internal thread pool
internal_thread_pool_min_threadsMinimum thread pool size for the internal thread pool
internal_thread_pool_queue_enabledQueue to enqueue incoming internal messages
internal_thread_pool_queue_max_sizeMaximum queue size for incoming internal messages
internal_thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
log_discard_msgswhether or not warnings about messages from different groups are logged
log_discard_msgs_versionwhether or not warnings about messages from members with a different version are discarded
logical_addr_cache_expirationTime (in ms) after which entries in the logical address cache marked as removable are removed
logical_addr_cache_max_sizeMax number of elements in the logical address cache before eviction starts
loopbackMessages to self are looped back immediately if true
max_bundle_sizeMaximum number of bytes for messages to be queued until they are sent
max_bundle_timeoutMax number of milliseconds until queued messages are sent
oob_thread_pool.keep_alive_timeTimeout in ms to remove idle threads from the OOB pool
oob_thread_pool.max_threadsMax thread pool size for the OOB thread pool
oob_thread_pool.min_threadsMinimum thread pool size for the OOB thread pool
oob_thread_pool_enabledSwitch for enabling thread pool for OOB messages. Default=true
oob_thread_pool_queue_enabledUse queue to enqueue incoming OOB messages
oob_thread_pool_queue_max_sizeMaximum queue size for incoming OOB messages
oob_thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
physical_addr_max_fetch_attemptsMax number of attempts to fetch a physical address (when not in the cache) before giving up
port_rangeThe range of valid ports, from bind_port to end_port. 0 only binds to bind_port and fails if taken
receive_interfacesComma delimited list of interfaces (IP addresses or interface names) to receive multicasts on
receive_on_all_interfacesIf true, the transport should use all available interfaces to receive multicast messages
singleton_nameIf assigned enable this transport to be a singleton (shared) transport
suppress_time_different_cluster_warningsTime during which identical warnings about messages from a member from a different cluster will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
suppress_time_different_version_warningsTime during which identical warnings about messages from a member with a different version will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
thread_naming_patternThread naming pattern for threads in this channel. Valid values are "pcl": "p": includes the thread name, e.g. "Incoming thread-1", "UDP ucast receiver", "c": includes the cluster name, e.g. "MyCluster", "l": includes the local address of the current member, e.g. "192.168.5.1:5678"
thread_pool.keep_alive_timeTimeout in milliseconds to remove idle thread from regular pool
thread_pool.max_threadsMaximum thread pool size for the regular thread pool
thread_pool.min_threadsMinimum thread pool size for the regular thread pool
thread_pool_enabledSwitch for enabling thread pool for regular messages
thread_pool_queue_enabledQueue to enqueue incoming regular messages
thread_pool_queue_max_sizeMaximum queue size for incoming OOB messages
thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
tick_timeTick duration in the HashedTimingWheel timer. Only applicable if timer_type is "wheel"
timer.keep_alive_timeTimeout in ms to remove idle threads from the timer pool
timer.max_threadsMax thread pool size for the timer thread pool
timer.min_threadsMinimum thread pool size for the timer thread pool
timer_queue_max_sizeMax number of elements on a timer queue
timer_rejection_policyTimer rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
timer_typeType of timer to be used. Valid values are "old" (DefaultTimeScheduler, used up to 2.10), "new" or "new2" (TimeScheduler2), "new3" (TimeScheduler3) and "wheel". Note that this property might disappear in future releases, if one of the 3 timers is chosen as default timer
wheel_sizeNumber of ticks in the HashedTimingWheel timer. Only applicable if timer_type is "wheel"
who_has_cache_timeoutTimeout (in ms) to determine how long to wait until a request to fetch the physical address for a given logical address will be sent again. Subsequent requests for the same physical address will therefore be spaced at least who_has_cache_timeout ms apart

bind_addr can be set to the address of a network interface, e.g. 192.168.1.5. It can also be set for the entire stack using system property -Djgroups.bind_addr, which overrrides the XML value (if given).

The following special values are also recognized for bind_addr:

GLOBAL

Picks a global IP address if available. If not, falls back to a SITE_LOCAL IP address.

SITE_LOCAL

Picks a site local (non routable) IP address, e.g. from the 192.168.0.0 or 10.0.0.0 address range.

LINK_LOCAL

Picks a link-local IP address, from 169.254.1.0 through 169.254.254.255.

NON_LOOPBACK

Picks any non loopback address.

LOOPBACK

Pick a loopback address, e.g. 127.0.0.1.

match-interface

Pick an address which matches a pattern against the interface name, e.g. match-interface:eth.*

match-address

Pick an address which matches a pattern against the host address, e.g. match-address:192.168.*

match-host

Pick an address which matches a pattern against the host name, e.g. match-host:linux.*

An example of setting the bind address in UDP to use a site local address is:



            <UDP bind_addr="SITE_LOCAL" />
        

This will pick any address of any interface that's site-local, e.g. a 192.168.x.x or 10.x.x.x address.

UDP uses IP multicast for sending messages to all members of a group and UDP datagrams for unicast messages (sent to a single member). When started, it opens a unicast and multicast socket: the unicast socket is used to send/receive unicast messages, whereas the multicast socket sends and receives multicast messages. The channel's physical address will be the address and port number of the unicast socket.

A protocol stack with UDP as transport protocol is typically used with clusters whose members run in the same subnet. If running across subnets, an admin has to ensure that IP multicast is enabled across subnets. It is often the case that IP multicast is not enabled across subnets. In such cases, the stack has to either use UDP without IP multicasting or other transports such as TCP.


Specifying TCP in your protocol stack tells JGroups to use TCP to send messages between cluster members. Instead of using a multicast bus, the cluster members create a mesh of TCP connections.

For example, while UDP sends 1 IP multicast packet when sending a message to a cluster of 10 members, TCP needs to send the message 9 times. It sends the same message to the first member, to the second member, and so on (excluding itself as the message is looped back internally).

This is slow, as the cost of sending a group message is O(n) with TCP, where it is O(1) with UDP. As the cost of sending a group message with TCP is a function of the cluster size, it becomes higher with larger clusters.


The task of the discovery is to find an initial membership, which is used to determine the current coordinator. Once a coordinator is found, the joiner sends a JOIN request to the coord.

Discovery is also called periodically by MERGE2 (see Section 7.4.1, “MERGE2”), to see if we have diverging cluster membership information.

This uses a shared Database into which all members write their addresses. New joiners read all addresses from this Database and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding record.

JDBC_PING is an alternative to S3_PING by using Amazon RDS instead of S3.

Table 7.10. Properties

NameDescription
connection_driverThe JDBC connection driver name
connection_passwordThe JDBC connection password
connection_urlThe JDBC connection URL
connection_usernameThe JDBC connection username
datasource_jndi_nameTo use a DataSource registered in JNDI, specify the JNDI name here. This is an alternative to all connection_* configuration options: if this property is not empty, then all connection relatedproperties must be empty.
delete_single_sqlSQL used to delete a row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String
initialize_sqlIf not empty, this SQL statement will be performed at startup.Customize it to create the needed table on those databases which permit table creation attempt without loosing data, such as PostgreSQL and MySQL (using IF NOT EXISTS). To allow for creation attempts, errors performing this statement will be loggedbut not considered fatal. To avoid any DDL operation, set this to an empty string.
insert_single_sqlSQL used to insert a new row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String 3)Serialized PingData as byte[]
select_all_pingdata_sqlSQL used to fetch all node's PingData. Customizable, but keep the order of parameters and pick compatible types: only one parameter needed, String compatible, representing the Cluster name. Must return a byte[], the Serialized PingData as it was stored by the insert_single_sql statement

S3_PING uses Amazon S3 to discover initial members. New joiners read all addresses from this bucket and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding file.

It's designed specifically for members running on Amazon EC2, where multicast traffic is not allowed and thus MPING or PING will not work. When Amazon RDS is preferred over S3, or if a shared database is used, an alternative is to use JDBC_PING.

Each instance uploads a small file to an S3 bucket and each instance reads the files out of this bucket to determine the other members.

There are three different ways to use S3_PING, each having its own tradeoffs between security and ease-of-use. These are described in more detail below:

Pre-signed URLs are the most secure method since writing to buckets still requires authorization and you don't have to pass Amazon AWS credentials to every instance. However, they are also the most complex to setup.

Here's a configuration example for private buckets with credentials given to each instance:



<S3_PING location="my_bucket" access_key="access_key"
         secret_access_key="secret_access_key" timeout="2000"
         num_initial_members="3"/>
            

Here's an example for public buckets with no credentials:



<S3_PING location="my_bucket"
         timeout="2000" num_initial_members="3"/>
            

And finally, here's an example for public readable buckets with pre-signed URLs:



<S3_PING pre_signed_put_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=it1cUUtgCT9ZJyCJDj2xTAcRTFg%3D"
         pre_signed_delete_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=u4IFPRq%2FL6%2FAohykIW4QrKjR23g%3D"
         timeout="2000" num_initial_members="3"/>
            

If a cluster gets split for some reasons (e.g. network partition), this protocol merges the subclusters back into one cluster. It is only run by the coordinator (the oldest member in a cluster), which periodically multicasts its presence and view information. If another coordinator (for the same cluster) receives this message, it will initiate a merge process. Note that this merges subgroups {A,B} and {C,D,E} back into {A,B,C,D,E}, but it does not merge state. The application has to handle the callback to merge state. See Section 5.6, “Handling network partitions” for suggestion on merging states.

Following a merge, the coordinator of the merged group can shift from the typical case of "the coordinator is the member who has been up the longest." During the merge process, the coordinators of the various subgroups need to reach a common decision as to who the new coordinator is. In order to ensure a consistent result, each coordinator combines the addresses of all the members in a list and then sorts the list. The first member in the sorted list becomes the coordinator. The sort order is determined by how the address implements the interface. Then JGroups compares based on the UUID. So, take a hypothetical case where two machines were running, with one machine running three separate cluster members and the other two members. If communication between the machines were cut, the following subgroups would form: {A,B} and {C,D,E} Following the merge, the new view would be: {C,D,A,B,E}, with C being the new coordinator.

Note that "A", "B" and so on are just logical names, attached to UUIDs, but the actual sorting is done on the actual UUIDs.


MERGE3 was added in JGroups 3.1.

In MERGE3, all members periodically send an INFO message with their address (UUID), logical name, physical address and ViewId. The ViewId (Section 3.7.1, “ViewId”) is used to see if we have diverging views among the cluster members: periodically, every coordinator looks at the INFO messages received so far and checks if there are any inconsistencies.

When inconsistencies are found, the merge leader will be the member with the lowest address (UUID). This is deterministic, and therefore we should at most times only have 1 merge going on.

The merge leader then asks the senders of the inconsistent ViewIds for their full Views. Once received, it simply passes a MERGE event up the stack, where the merge will be handled (by GMS) in exactly the same way as if MERGE2 has generated the MERGE event.

The advantages of MERGE3 compared to MERGE2 are:

  • Sending of INFO messages is spread out over time, preventing messgage peaks which might cause packet loss. This is especially important in large clusters.
  • Only 1 merge should be running at any time. Competing merges, as happening with MERGE2, slow down the merge process, and don't scale to large clusters.
  • An INFO message carries the logical name and physical address of a member. Compared to MERGE2, this allows us to immediately send messages to newly merged members, and not have to solicit this information first.
  • On the downside, MERGE3 has constant (small) traffic by all members.
  • MERGE3 was written for an IP multicast capable transport (UDP), but it also works with other transports (such as TCP), although it isn't as efficient on TCP as on UDP.


The task of failure detection is to probe members of a group and see whether they are alive. When a member is suspected (= deemed dead), then a SUSPECT message is sent to all nodes of the cluster. It is not the task of the failure detection layer to exclude a crashed member (this is done by the group membership protocol, GMS), but simply to notify everyone that a node in the cluster is suspected of having crashed.

The SUSPECT message is handled by the GMS protocol of the current coordinator only; all other members ignore it.

Failure detection protocol based on a ring of TCP sockets created between cluster members. Each member in a cluster connects to its neighbor (the last member connects to the first), thus forming a ring. Member B is suspected when its neighbor A detects abnormally closing of its TCP socket (presumably due to a node B crash). However, if a member B is about to leave gracefully, it lets its neighbor A know, so that it does not become suspected.

If you are using a multi NIC machine note that JGroups versions prior to 2.2.8 have FD_SOCK implementation that does not assume this possibility. Therefore JVM can possibly select NIC unreachable to its neighbor and setup FD_SOCK server socket on it. Neighbor would be unable to connect to that server socket thus resulting in immediate suspecting of a member. Suspected member is kicked out of the group, tries to rejoin, and thus goes into join/leave loop. JGroups version 2.2.8 introduces srv_sock_bind_addr property so you can specify network interface where FD_SOCK TCP server socket should be bound. This network interface is most likely the same interface used for other JGroups traffic. JGroups versions 2.2.9 and newer consult bind.address system property or you can specify network interface directly as FD_SOCK bind_addr property.


NAKACK provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message[9] if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

NAKACK is a Lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver delivers only P:1, and asks P for retransmission of message 2, queuing P3-4. When P2 is finally received, the receiver will deliver P2-4 to the application.

Table 7.22. Properties

NameDescription
become_server_queue_sizeSize of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.
discard_delivered_msgsShould messages delivered to application be discarded
exponential_backoffThe first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0
log_discard_msgsdiscards warnings about promiscuous traffic
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !
max_rebroadcast_timeoutTimeout to rebroadcast messages. Default is 2000 msec
print_stability_history_on_failed_xmitShould stability history be printed if we fail in retransmission. Default is false
retransmit_timeoutsTimeout before requesting retransmissions
suppress_time_non_member_warningsTime during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
use_mcast_xmitRetransmit retransmit responses (messages) using multicast rather than unicast
use_mcast_xmit_reqUse a multicast to request retransmission of missing messages
use_range_based_retransmitterWhether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur
xmit_from_random_memberAsk a random member for retransmission of a missing message. Default is false
xmit_stagger_timeoutNumber of milliseconds to delay the sending of an XMIT request. We pick a random number in the range [1 .. xmit_req_stagger_timeout] and add this to the scheduling time of an XMIT request. When use_mcast_xmit is enabled, if a number of members drop messages from the same member, then chances are that, if staggering is enabled, somebody else already sent the XMIT request (via mcast) and we can cancel the XMIT request once we receive the missing messages. For unicast XMIT responses (use_mcast_xmit=false), we still have an advantage by not overwhelming the receiver with XMIT requests, all at the same time. 0 disabless staggering.
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

NAKACK2 was introduced in 3.1 and is a successor to NAKACK (at some point it will replace NAKACK). It has the same properties as NAKACK, but its implementation is faster and uses less memory, plus it creates fewer tasks in the timer.

Some of the properties of NAKACK were deprecated in NAKACK2, but were not removed so people can simply change from NAKACK to NAKACK2 by changing the protocol name in the config.

Table 7.23. Properties

NameDescription
become_server_queue_sizeSize of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.
discard_delivered_msgsShould messages delivered to application be discarded
log_discard_msgsdiscards warnings about promiscuous traffic
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a RingBuffer. This property might get removed anytime, so don't use it !
max_rebroadcast_timeoutTimeout to rebroadcast messages. Default is 2000 msec
print_stability_history_on_failed_xmitShould stability history be printed if we fail in retransmission. Default is false
suppress_time_non_member_warningsTime during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
use_mcast_xmitRetransmit retransmit responses (messages) using multicast rather than unicast
use_mcast_xmit_reqUse a multicast to request retransmission of missing messages
xmit_from_random_memberAsk a random member for retransmission of a missing message. Default is false
xmit_intervalInterval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

UNICAST provides reliable delivery and FIFO (= First In First Out) properties for point-to-point messages between one sender and one receiver.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

UNICAST uses positive acks for retransmission; sender A keeps sending message M until receiver B delivers M and sends an ack(M) to A, or until B leaves the cluster or A crashes.

Although JGroups attempts to send acks selectively, UNICAST will still see a lot of acks on the wire. If this is not desired, use UNICAST2 (see Section 7.6.4, “UNICAST2”).

On top of a reliable transport, such as TCP, UNICAST is not really needed. However, concurrent delivery of messages from the same sender is prevented by UNICAST by acquiring a lock on the sender's retransmission table, so unless concurrent delivery is desired, UNICAST should not be removed from the stack even if TCP is used.


UNICAST2 provides lossless, ordered, communication between 2 members. Contrary to UNICAST, it uses negative acks (similar to NAKACK) rather than positive acks. This reduces the communication overhead required for sending an ack for every message.

Negative acks have sender A simply send messages without retransmission, and receivers never ack messages, until they detect a gap: for instance, if A sends messages 1,2,4,5, then B delivers 1 and 2, but queues 4 and 5 because it is missing message 3 from A. B then asks A to retransmit 3. When 3 is received, messages 3, 4 and 5 can be delivered to the application.

Compared to a positive ack scheme as used in UNICAST, negative acks have the advantage that they generate less traffic: if all messages are received in order, we never need to do retransmission.

Table 7.25. Properties

NameDescription
conn_expiry_timeoutTime (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping
exponential_backoffThe first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_bytesMax number of bytes before a stability message is sent to the sender
max_msg_batch_sizeMax number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !
max_retransmit_timeMax number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this
max_stable_msgsMax number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid
stable_intervalMax number of milliseconds before a stability message is sent to the sender(s)
timeoutlist of timeouts
use_range_based_retransmitterWhether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur
xmit_intervalInterval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted
xmit_table_automatic_purgingIf enabled, the removal of a message from the retransmission table causes an automatic purge (only for experts)
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

UNICAST3 (available in 3.3) is the successor to UNICAST2, but is based on UNICAST, as it uses a positive acknowledgment mechanism. However, speed wise it is similar to UNICAST2

Details of UNICAST3's design can be found here: UNICAST3

Table 7.26. Properties

NameDescription
ack_batches_immediatelySend an ack for a batch immediately instead of using a delayed ack
conn_close_timeoutTime (in ms) until a connection marked to be closed will get removed. 0 disables this
conn_expiry_timeoutTime (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it !
max_retransmit_timeMax number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this
xmit_intervalInterval (in milliseconds) at which messages in the send windows are resent
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

To serve potential retransmission requests, a member has to store received messages until it is known that every member in the cluster has received them. Message stability for a given message M means that M has been seen by everyone in the cluster.

The stability protocol periodically (or when a certain number of bytes have been received) initiates a consensus protocol, which multicasts a stable message containing the highest message numbers for a given member. This is called a digest.

When everyone has received everybody else's stable messages, a digest is computed which consists of the minimum sequence numbers of all received digests so far. This is the stability vector, and contain only message sequence numbers that have been seen by everyone.

This stability vector is the broadcast to the group and everyone can remove messages from their retransmission tables whose sequence numbers are smaller than the ones received in the stability vector. These messages can then be garbage collected.

Group membership takes care of joining new members, handling leave requests by existing members, and handling SUSPECT messages for crashed members, as emitted by failure detection protocols. The algorithm for joining a new member is essentially:

- loop
- find initial members (discovery)
- if no responses:
    - become singleton group and break out of the loop
- else:
    - determine the coordinator (oldest member) from the responses
    - send JOIN request to coordinator
    - wait for JOIN response
    - if JOIN response received:
        - install view and break out of the loop
    - else
        - sleep for 5 seconds and continue the loop
        

Flow control takes care of adjusting the rate of a message sender to the rate of the slowest receiver over time. If a sender continuously sends messages at a rate that is faster than the receiver(s), the receivers will either queue up messages, or the messages will get discarded by the receiver(s), triggering costly retransmissions. In addition, there is spurious traffic on the cluster, causing even more retransmissions.

Flow control throttles the sender so the receivers are not overrun with messages.

Note that flow control can be bypassed by setting message flag Message.NO_FC. See Section 5.13, “Tagging messages with flags” for details.

The properties for FlowControl are shown below and can be used in MFC and UFC:


STATE was renamed from (2.x) STREAMING_STATE_TRANSFER, and refactored to extend a common superclass StreamingStateTransfer. The other state transfer protocol extending StreamingStateTransfer is STATE_SOCK (see Section 3.8.11.1.3, “STATE_SOCK”).

STATE uses a streaming approach to state transfer; the state provider writes its state to the output stream passed to it in the getState(OutputStream) callback, which chunks the stream up into chunks that are sent to the state requester in separate messages.

The state requester receives those chunks and feeds them into the input stream from which the state is read by the setState(InputStream) callback.

The advantage compared to STATE_TRANSFER is that state provider and requester only need small (transfer) buffers to keep a part of the state in memory, whereas STATE_TRANSFER needs to copy the entire state into memory.

If we for example have a list of 1 million elements, then STATE_TRANSFER would have to create a byte[] buffer out of it, and return the byte[] buffer, whereas a streaming approach could iterate through the list and write each list element to the output stream. Whenever the buffer capacity is reached, we'd then send a message and the buffer would be reused to receive more data.

Flushing forces group members to send all their pending messages prior to a certain event. The process of flushing acquiesces the cluster so that state transfer or a join can be done. It is also called the stop-the-world model as nobody will be able to send messages while a flush is in process. Flush is used in:

FLUSH is designed as another protocol positioned just below the channel, on top of the stack (e.g. above STATE_TRANSFER). The STATE_TRANSFER and GMS protocols request a flush by sending an event up the stack, where it is handled by the FLUSH protcol. Another event is sent back by the FLUSH protocol to let the caller know that the flush has completed. When done (e.g. view was installed or state transferred), the protocol sends a message, which will allow everyone in the cluster to resume sending.

A channel is notified that the FLUSH phase has been started by the Receiver.block() callback.

Read more about flushing in Section 5.7, “Flushing: making sure every node in the cluster received a message”.


JGroups provides protocols to encrypt cluster traffic (ENCRYPT), and to make sure that only authorized members can join a cluster (AUTH).

A detailed description of ENCRYPT is found in the JGroups source (JGroups/doc/ENCRYPT.html). Encryption by default only encrypts the message body, but doesn't encrypt message headers. To encrypt the entire message (including all headers, plus destination and source addresses), the property encrypt_entire_message has to be set to true. Also, ENCRYPT has to be below any protocols whose headers we want to encrypt, e.g.



<config ... >
    <UDP />
    <PING />
    <MERGE2 />
    <FD />
    <VERIFY_SUSPECT />
    <ENCRYPT encrypt_entire_message="false"
             sym_init="128" sym_algorithm="AES/ECB/PKCS5Padding"
             asym_init="512" asym_algorithm="RSA"/>
    <pbcast.NAKACK />
    <UNICAST />
    <pbcast.STABLE />
    <FRAG2 />
    <pbcast.GMS />
</config>
                

Note that ENCRYPT sits below NAKACK and UNICAST, so the sequence numbers for these 2 protocols will be encrypted. Had ENCRYPT been placed below UNICAST but above NAKACK, then only UNICAST's headers (including sequence numbers) would have been encrypted, but not NAKACKs.

Note that it doesn't make too much sense to place ENCRYPT even lower in the stack, because then almost all traffic (even merge or discovery traffic) will be encrypted, which may be somewhat of a performance drag.

When we encrypt an entire message, we have to marshal the message into a byte buffer first and then encrypt it. This entails marshalling and copying of the byte buffer, which is not so good performance wise...


As discussed in Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender”, the SCOPE protocol is used to deliver updates to different scopes concurrently. It has to be placed somewhere above UNICAST and NAKACK.

SCOPE has a separate thread pool. The reason why the default thread pool from the transport wasn't used is that the default thread pool has a different purpose. For example, it can use a queue to which all incoming messages are added, which would defy the purpose of concurrent delivery in SCOPE. As a matter of fact, using a queue would most likely delay messages get sent up into SCOPE !

Also, the default pool's rejection policy might not be "run", so the SCOPE implementation would have to catch rejection exceptions and engage in a retry protocol, which is complex and wastes resources.

The configuration of the thread pool is shown below. If you expect concurrent messages to N different scopes, then the max pool size would ideally be set to N. However, in most cases, this is not necessary as (a) the messages might not be to different scopes or (b) not all N scopes might get messages at the same time. So even if the max pool size is a bit smaller, the cost of this is slight delays, in the sense that a message for scope Y might wait until the thread processing message for scope X is available.

To remove unused scopes, an expiry policy is provided: expiration_time is the number of milliseconds after which an idle scope is removed. An idle scope is a scope which hasn't seen any messages for expiration_time milliseconds. The expiration_interval value defines the number of milliseconds at which the expiry task runs. Setting both values to 0 disables expiration; it would then have to be done manually (see Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender” for details).


RELAY2 provides clustering between different sites (local clusters), for multicast and unicast messages. See Section 5.11, “Relaying between multiple sites (RELAY2)” for details.

Table 7.43. Properties

NameDescription
async_relay_creationIf true, the creation of the relay channel (and the connect()) are done in the background. Async relay creation is recommended, so the view callback won't be blocked
can_become_site_masterWhether or not this node can become the site master. If false, and we become the coordinator, we won't start the bridge(s)
configName of the relay configuration
enable_address_taggingWhether or not we generate our own addresses in which we use can_become_site_master. If this property is false, can_become_site_master is ignored
forward_sleepThe time (in milliseconds) to sleep between forward attempts
fwd_queue_max_sizeMax number of messages in the foward queue. Messages are added to the forward queue when the status of a route went from UP to UNKNOWN and the queue is flushed when the status goes to UP (resending all queued messages) or DOWN (sending SITE-UNREACHABLE messages to the senders)
max_forward_attemptsThe number of tries to forward a message to a remote site
relay_multicastsWhether or not to relay multicast (dest=null) messages
siteName of the site (needs to be defined in the configuration)
site_down_timeoutNumber of millisconds to wait when the status for a site changed from UP to UNKNOWN before that site is declared DOWN. A site that's DOWN triggers immediate sending of a SITE-UNREACHABLE message back to the sender of a message to that site
warn_when_ftc_missingIf true, logs a warning if the FORWARD_TO_COORD protocol is not found. This property might get deprecated soon

There are currently 2 locking protocols: org.jgroups.protocols.CENTRAL_LOCK and org.jgroups.protocols.PEER_LOCK. Both extend Locking, which has the following properties:




[9] Note that NAKACK can also be configured to send retransmission requests for M to anyone in the cluster, rather than only to the sender of M.