JBoss.orgCommunity Documentation

Chapter 7. List of Protocols

7.1. Properties availabe in every protocol
7.2. Transport
7.2.1. UDP
7.2.2. TCP
7.2.3. TUNNEL
7.3. Initial membership discovery
7.3.1. Discovery
7.3.2. PING
7.3.3. TCPPING
7.3.4. TCPGOSSIP
7.3.5. MPING
7.3.6. FILE_PING
7.3.7. JDBC_PING
7.3.8. BPING
7.3.9. RACKSPACE_PING
7.3.10. S3_PING
7.3.11. GOOGLE_PING
7.3.12. SWIFT_PING
7.3.13. AWS_PING
7.3.14. PDC - Persistent Discovery Cache
7.4. Merging after a network partition
7.4.1. MERGE2
7.4.2. MERGE3
7.5. Failure Detection
7.5.1. FD
7.5.2. FD_ALL
7.5.3. FD_ALL2
7.5.4. FD_SOCK
7.5.5. FD_PING
7.5.6. FD_HOST
7.5.7. VERIFY_SUSPECT
7.6. Reliable message transmission
7.6.1. pbcast.NAKACK
7.6.2. NAKACK2
7.6.3. UNICAST
7.6.4. UNICAST2
7.6.5. UNICAST3
7.6.6. RSVP
7.7. Message stability
7.7.1. STABLE
7.8. Group Membership
7.8.1. pbcast.GMS
7.9. Flow control
7.9.1. FC
7.9.2. MFC and UFC
7.10. Fragmentation
7.10.1. FRAG and FRAG2
7.11. Ordering
7.11.1. SEQUENCER
7.11.2. Total Order Anycast (TOA)
7.12. State Transfer
7.12.1. pbcast.STATE_TRANSFER
7.12.2. StreamingStateTransfer
7.12.3. pbcast.STATE
7.12.4. STATE_SOCK
7.12.5. BARRIER
7.13. pbcast.FLUSH
7.14. Misc
7.14.1. Statistics
7.14.2. Security
7.14.3. COMPRESS
7.14.4. SCOPE
7.14.5. RELAY
7.14.6. RELAY2
7.14.7. STOMP
7.14.8. DAISYCHAIN
7.14.9. RATE_LIMITER
7.14.10. Locking protocols
7.14.11. CENTRAL_EXECUTOR
7.14.12. COUNTER
7.14.13. SUPERVISOR
7.14.14. FORK

This chapter describes the most frequently used protocols, and their configuration. Ergonomics (Section 5.15, “Ergonomics”) strives to reduce the number of properties that have to be configured, by dynamically adjusting them at run time, however, this is not yet in place.

Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with JGroups), e.g. udp.xml or tcp.xml, and make only minimal changes to it.

This section is work in progress; we strive to update the documentation as we make changes to the code.

The table below lists properties that are available in all protocols, as they're defined in the superclass of all protocols, org.jgroups.stack.Protocol.


TP is the base class for all transports, e.g. UDP and TCP. All of the properties defined here are inherited by the subclasses. The properties for TP are:

Table 7.2. Properties

NameDescription
bind_addrThe bind address which should be used by this transport. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address
bind_interface_strThe interface (NIC) which should be used by this transport
bind_portThe port to which the transport binds. Default of 0 binds to any (ephemeral) port
bundler_capacityThe max number of elements in a bundler if the bundler supports size limitations
bundler_typeThe type of bundler used. Has to be "sender-sends-with-timer", "transfer-queue" (default) or "sender-sends"
diagnostics_addrAddress for diagnostic probing. Default is 224.0.75.75
diagnostics_bind_interfacesComma delimited list of interfaces (IP addresses or interface names) that the diagnostics multicast socket should bind to
diagnostics_passcodeAuthorization passcode for diagnostics. If specified every probe query will be authorized
diagnostics_portPort for diagnostic probing. Default is 7500
diagnostics_ttlTTL of the diagnostics multicast socket
discard_incompatible_packetsDiscard packets with a different version if true
enable_batchingAllows the transport to pass received message batches up as MessagesBatch instances (up(MessageBatch)), rather than individual messages. This flag will be removed in a future version when batching has been implemented by all protocols
enable_bundlingEnable bundling of smaller messages into bigger ones. Default is true
enable_diagnosticsSwitch to enable diagnostic probing. Default is true
enable_unicast_bundlingEnable bundling of smaller messages into bigger ones for unicast messages. Default is true
external_addrUse "external_addr" if you have hosts on different networks, behind firewalls. On each firewall, set up a port forwarding rule (sometimes called "virtual server") to the local IP (e.g. 192.168.1.100) of the host then on each host, set "external_addr" TCP transport parameter to the external (public IP) address of the firewall.
external_portUsed to map the internal port (bind_port) to an external port. Only used if > 0
ignore_dont_bundleWhether or not messages with DONT_BUNDLE set should be ignored by default (JGRP-1737). This property will be removed in a future release, so don't use it
internal_thread_pool_enabledSwitch for enabling thread pool for internal messages
internal_thread_pool_keep_alive_timeTimeout in ms to remove idle threads from the internal pool
internal_thread_pool_max_threadsMaximum thread pool size for the internal thread pool
internal_thread_pool_min_threadsMinimum thread pool size for the internal thread pool
internal_thread_pool_queue_enabledQueue to enqueue incoming internal messages
internal_thread_pool_queue_max_sizeMaximum queue size for incoming internal messages
internal_thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
log_discard_msgswhether or not warnings about messages from different groups are logged
log_discard_msgs_versionwhether or not warnings about messages from members with a different version are discarded
logical_addr_cache_expirationTime (in ms) after which entries in the logical address cache marked as removable can be removed. 0 never removes any entries (not recommended)
logical_addr_cache_max_sizeMax number of elements in the logical address cache before eviction starts
logical_addr_cache_reaper_intervalInterval (in ms) at which the reaper task scans logical_addr_cache and removes entries marked as removable. 0 disables reaping.
loopbackMessages to self are looped back immediately if true
loopback_copyWhether or not to make a copy of a message before looping it back up. Don't use this; might get removed without warning
loopback_separate_threadLoop back the message on a separate thread or use the current thread. Don't use this; might get removed without warning
max_bundle_sizeMaximum number of bytes for messages to be queued until they are sent
max_bundle_timeoutMax number of milliseconds until queued messages are sent
oob_thread_pool_enabledSwitch for enabling thread pool for OOB messages. Default=true
oob_thread_pool_keep_alive_timeTimeout in ms to remove idle threads from the OOB pool
oob_thread_pool_max_threadsMax thread pool size for the OOB thread pool
oob_thread_pool_min_threadsMinimum thread pool size for the OOB thread pool
oob_thread_pool_queue_enabledUse queue to enqueue incoming OOB messages
oob_thread_pool_queue_max_sizeMaximum queue size for incoming OOB messages
oob_thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
physical_addr_max_fetch_attemptsMax number of attempts to fetch a physical address (when not in the cache) before giving up
port_rangeThe range of valid ports, from bind_port to end_port. 0 only binds to bind_port and fails if taken
receive_interfacesComma delimited list of interfaces (IP addresses or interface names) to receive multicasts on
receive_on_all_interfacesIf true, the transport should use all available interfaces to receive multicast messages
singleton_nameIf assigned enable this transport to be a singleton (shared) transport
suppress_time_different_cluster_warningsTime during which identical warnings about messages from a member from a different cluster will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
suppress_time_different_version_warningsTime during which identical warnings about messages from a member with a different version will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
thread_naming_patternThread naming pattern for threads in this channel. Valid values are "pcl": "p": includes the thread name, e.g. "Incoming thread-1", "UDP ucast receiver", "c": includes the cluster name, e.g. "MyCluster", "l": includes the local address of the current member, e.g. "192.168.5.1:5678"
thread_pool_enabledSwitch for enabling thread pool for regular messages
thread_pool_keep_alive_timeTimeout in milliseconds to remove idle thread from regular pool
thread_pool_max_threadsMaximum thread pool size for the regular thread pool
thread_pool_min_threadsMinimum thread pool size for the regular thread pool
thread_pool_queue_enabledQueue to enqueue incoming regular messages
thread_pool_queue_max_sizeMaximum queue size for incoming OOB messages
thread_pool_rejection_policyThread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
tick_timeTick duration in the HashedTimingWheel timer. Only applicable if timer_type is "wheel"
time_service_intervalInterval (in ms) at which the time service updates its timestamp. 0 disables the time service
timer_keep_alive_timeTimeout in ms to remove idle threads from the timer pool
timer_max_threadsMax thread pool size for the timer thread pool
timer_min_threadsMinimum thread pool size for the timer thread pool
timer_queue_max_sizeMax number of elements on a timer queue
timer_rejection_policyTimer rejection policy. Possible values are Abort, Discard, DiscardOldest and Run
timer_typeType of timer to be used. Valid values are "old" (DefaultTimeScheduler, used up to 2.10), "new" or "new2" (TimeScheduler2), "new3" (TimeScheduler3) and "wheel". Note that this property might disappear in future releases, if one of the 3 timers is chosen as default timer
wheel_sizeNumber of ticks in the HashedTimingWheel timer. Only applicable if timer_type is "wheel"
who_has_cache_timeoutTimeout (in ms) to determine how long to wait until a request to fetch the physical address for a given logical address will be sent again. Subsequent requests for the same physical address will therefore be spaced at least who_has_cache_timeout ms apart

bind_addr can be set to the address of a network interface, e.g. 192.168.1.5. It can also be set for the entire stack using system property -Djgroups.bind_addr, which provides a value for bind_addr unless it has already been set in the XML config.

The following special values are also recognized for bind_addr:

GLOBAL

Picks a global IP address if available. If not, falls back to a SITE_LOCAL IP address.

SITE_LOCAL

Picks a site local (non routable) IP address, e.g. from the 192.168.0.0 or 10.0.0.0 address range.

LINK_LOCAL

Picks a link-local IP address, from 169.254.1.0 through 169.254.254.255.

NON_LOOPBACK

Picks any non loopback address.

LOOPBACK

Pick a loopback address, e.g. 127.0.0.1.

match-interface

Pick an address which matches a pattern against the interface name, e.g. match-interface:eth.*

match-address

Pick an address which matches a pattern against the host address, e.g. match-address:192.168.*

match-host

Pick an address which matches a pattern against the host name, e.g. match-host:linux.*

An example of setting the bind address in UDP to use a site local address is:



            <UDP bind_addr="SITE_LOCAL" />
        

This will pick any address of any interface that's site-local, e.g. a 192.168.x.x or 10.x.x.x address.

UDP uses IP multicast for sending messages to all members of a group and UDP datagrams for unicast messages (sent to a single member). When started, it opens a unicast and multicast socket: the unicast socket is used to send/receive unicast messages, whereas the multicast socket sends and receives multicast messages. The channel's physical address will be the address and port number of the unicast socket.

A protocol stack with UDP as transport protocol is typically used with clusters whose members run in the same subnet. If running across subnets, an admin has to ensure that IP multicast is enabled across subnets. It is often the case that IP multicast is not enabled across subnets. In such cases, the stack has to either use UDP without IP multicasting or other transports such as TCP.


Specifying TCP in your protocol stack tells JGroups to use TCP to send messages between cluster members. Instead of using a multicast bus, the cluster members create a mesh of TCP connections.

For example, while UDP sends 1 IP multicast packet when sending a message to a cluster of 10 members, TCP needs to send the message 9 times. It sends the same message to the first member, to the second member, and so on (excluding itself as the message is looped back internally).

This is slow, as the cost of sending a group message is O(n) with TCP, where it is O(1) with UDP. As the cost of sending a group message with TCP is a function of the cluster size, it becomes higher with larger clusters.


The task of the discovery is to find an initial membership, which is used to determine the current coordinator. Once a coordinator is found, the joiner sends a JOIN request to the coord.

Discovery is also called periodically by MERGE2 (see Section 7.4.1, “MERGE2”), to see if we have diverging cluster membership information.

Discovery is the superclass for all discovery protocols and therefore its properties below can be used in any subclass.

Discovery sends a discovery request, and waits for num_initial_members discovery responses, or timeout ms, whichever occurs first, before returning. Note that break_on_coord_rsp="true" will return as soon as we have a response from a coordinator.

Table 7.6. Properties

NameDescription
always_send_physical_addr_with_discovery_requestWhen sending a discovery request, always send the physical address and logical name too
async_discoveryIf true then the discovery is done on a separate timer thread. Should be set to true when discovery is blocking and/or takes more than a few milliseconds
break_on_coord_rspReturn from the discovery phase as soon as we have 1 coordinator response
discovery_rsp_expiry_timeExpiry time of discovery responses in ms
force_sending_discovery_rspsAlways sends a discovery response, no matter what
max_members_in_discovery_requestMax size of the member list shipped with a discovery request. If we have more, the mbrs field in the discovery request header is nulled and members return the entire membership, not individual members
num_initial_membersMinimum number of initial members to get a response from
num_initial_srv_membersMinimum number of server responses (PingData.isServer()=true). If this value is greater than 0, we'll ignore num_initial_members
return_entire_cacheWhether or not to return the entire logical-physical address cache mappings on a discovery request, or not.
stagger_timeoutIf greater than 0, we'll wait a random number of milliseconds in range [0..stagger_timeout] before sending a discovery response. This prevents traffic spikes in large clusters when everyone sends their discovery response at the same time
timeoutTimeout to wait for the initial members
use_disk_cacheIf a persistent disk cache (PDC) is present, combine the discovery results with the contents of the disk cache before returning the results

This uses a shared directory into which all members write their addresses. New joiners read all addresses from this directory (which needs to be shared, e.g. via NFS or SMB) and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding file.

FILE_PING can be used instead of GossipRouter in cases where no external process is desired.

Since 3.5, the way FILE_PING performs discovery has changed. The following paragraphs describe the new mechanism to discover members via FILE_PING or subclasses (e.g. S3_PING or GOOGLE_PING), so this applies to all cloud-based stores as well.

Instead of storing 1 file per member in the file system or cloud store, we only store 1 file for all members. This has the advantage, especially in cloud stores, that the number of reads is not a function of the cluster size, e.g. we don't have to perform 1000 reads for member discovery in a 1000 node cluster, but just a single read. This is important as the cost of 1000 times the round trip time of a (REST) call to the cloud store is certainly higher that the cost of a single call. There may also be a charge for calls to the cloud, so a reduced number of calls lead to reduced charges for cloud store access, especially in large clusters.

The current coordinator is always in charge of writing the file; participants never write it, but only read it. When there is a split and we have multiple coordinator, we may also have multiple files.

The name of a file is always UUID.logical_name.list, e.g. 0000-0000-000000000001.m1.1.list, which has a UUID of 1, a logical name of "m1.1" and the suffix ".list".


This uses a shared Database into which all members write their addresses. New joiners read all addresses from this Database and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding record.

JDBC_PING is an alternative to S3_PING by using Amazon RDS instead of S3.

Table 7.11. Properties

NameDescription
connection_driverThe JDBC connection driver name
connection_passwordThe JDBC connection password
connection_urlThe JDBC connection URL
connection_usernameThe JDBC connection username
datasource_jndi_nameTo use a DataSource registered in JNDI, specify the JNDI name here. This is an alternative to all connection_* configuration options: if this property is not empty, then all connection relatedproperties must be empty.
delete_single_sqlSQL used to delete a row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String
initialize_sqlIf not empty, this SQL statement will be performed at startup.Customize it to create the needed table on those databases which permit table creation attempt without loosing data, such as PostgreSQL and MySQL (using IF NOT EXISTS). To allow for creation attempts, errors performing this statement will be loggedbut not considered fatal. To avoid any DDL operation, set this to an empty string.
insert_single_sqlSQL used to insert a new row. Customizable, but keep the order of parameters and pick compatible types: 1)Own Address, as String 2)Cluster name, as String 3)Serialized PingData as byte[]
select_all_pingdata_sqlSQL used to fetch all node's PingData. Customizable, but keep the order of parameters and pick compatible types: only one parameter needed, String compatible, representing the Cluster name. Must return a byte[], the Serialized PingData as it was stored by the insert_single_sql statement

S3_PING uses Amazon S3 to discover initial members. New joiners read all addresses from this bucket and ping each of the elements of the resulting set of members. When a member leaves, it deletes its corresponding file.

It's designed specifically for members running on Amazon EC2, where multicast traffic is not allowed and thus MPING or PING will not work. When Amazon RDS is preferred over S3, or if a shared database is used, an alternative is to use JDBC_PING.

Each instance uploads a small file to an S3 bucket and each instance reads the files out of this bucket to determine the other members.

There are three different ways to use S3_PING, each having its own tradeoffs between security and ease-of-use. These are described in more detail below:

Pre-signed URLs are the most secure method since writing to buckets still requires authorization and you don't have to pass Amazon AWS credentials to every instance. However, they are also the most complex to setup.

Here's a configuration example for private buckets with credentials given to each instance:



<S3_PING location="my_bucket" access_key="access_key"
         secret_access_key="secret_access_key" timeout="2000"
         num_initial_members="3"/>
            

Here's an example for public buckets with no credentials:



<S3_PING location="my_bucket"
         timeout="2000" num_initial_members="3"/>
            

And finally, here's an example for public readable buckets with pre-signed URLs:



<S3_PING pre_signed_put_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=it1cUUtgCT9ZJyCJDj2xTAcRTFg%3D"
         pre_signed_delete_url="http://s3.amazonaws.com/my_bucket/DemoCluster/node1?AWSAccessKeyId=access_key&Expires=1316276200&Signature=u4IFPRq%2FL6%2FAohykIW4QrKjR23g%3D"
         timeout="2000" num_initial_members="3"/>
            

If a cluster gets split for some reasons (e.g. network partition), this protocol merges the subclusters back into one cluster. It is only run by the coordinator (the oldest member in a cluster), which periodically multicasts its presence and view information. If another coordinator (for the same cluster) receives this message, it will initiate a merge process. Note that this merges subgroups {A,B} and {C,D,E} back into {A,B,C,D,E}, but it does not merge state. The application has to handle the callback to merge state. See Section 5.6, “Handling network partitions” for suggestion on merging states.

Following a merge, the coordinator of the merged group can shift from the typical case of "the coordinator is the member who has been up the longest." During the merge process, the coordinators of the various subgroups need to reach a common decision as to who the new coordinator is. In order to ensure a consistent result, each coordinator combines the addresses of all the members in a list and then sorts the list. The first member in the sorted list becomes the coordinator. The sort order is determined by how the address implements the interface. Then JGroups compares based on the UUID. So, take a hypothetical case where two machines were running, with one machine running three separate cluster members and the other two members. If communication between the machines were cut, the following subgroups would form: {A,B} and {C,D,E} Following the merge, the new view would be: {C,D,A,B,E}, with C being the new coordinator.

Note that "A", "B" and so on are just logical names, attached to UUIDs, but the actual sorting is done on the actual UUIDs.


MERGE3 was added in JGroups 3.1.

In MERGE3, all members periodically send an INFO message with their address (UUID), logical name, physical address and ViewId. The ViewId (Section 3.7.1, “ViewId”) is used to see if we have diverging views among the cluster members: periodically, every coordinator looks at the INFO messages received so far and checks if there are any inconsistencies.

When inconsistencies are found, the merge leader will be the member with the lowest address (UUID). This is deterministic, and therefore we should at most times only have 1 merge going on.

The merge leader then asks the senders of the inconsistent ViewIds for their full Views. Once received, it simply passes a MERGE event up the stack, where the merge will be handled (by GMS) in exactly the same way as if MERGE2 has generated the MERGE event.

The advantages of MERGE3 compared to MERGE2 are:

  • Sending of INFO messages is spread out over time, preventing messgage peaks which might cause packet loss. This is especially important in large clusters.
  • Only 1 merge should be running at any time. Competing merges, as happening with MERGE2, slow down the merge process, and don't scale to large clusters.
  • An INFO message carries the logical name and physical address of a member. Compared to MERGE2, this allows us to immediately send messages to newly merged members, and not have to solicit this information first.
  • On the downside, MERGE3 has constant (small) traffic by all members.
  • MERGE3 was written for an IP multicast capable transport (UDP), but it also works with other transports (such as TCP), although it isn't as efficient on TCP as on UDP.


The task of failure detection is to probe members of a group and see whether they are alive. When a member is suspected (= deemed dead), then a SUSPECT message is sent to all nodes of the cluster. It is not the task of the failure detection layer to exclude a crashed member (this is done by the group membership protocol, GMS), but simply to notify everyone that a node in the cluster is suspected of having crashed.

The SUSPECT message is handled by the GMS protocol of the current coordinator only; all other members ignore it.

Failure detection protocol based on a ring of TCP sockets created between cluster members. Each member in a cluster connects to its neighbor (the last member connects to the first), thus forming a ring. Member B is suspected when its neighbor A detects abnormally closing of its TCP socket (presumably due to a node B crash). However, if a member B is about to leave gracefully, it lets its neighbor A know, so that it does not become suspected.

If you are using a multi NIC machine note that JGroups versions prior to 2.2.8 have FD_SOCK implementation that does not assume this possibility. Therefore JVM can possibly select NIC unreachable to its neighbor and setup FD_SOCK server socket on it. Neighbor would be unable to connect to that server socket thus resulting in immediate suspecting of a member. Suspected member is kicked out of the group, tries to rejoin, and thus goes into join/leave loop. JGroups version 2.2.8 introduces srv_sock_bind_addr property so you can specify network interface where FD_SOCK TCP server socket should be bound. This network interface is most likely the same interface used for other JGroups traffic. JGroups versions 2.2.9 and newer consult bind.address system property or you can specify network interface directly as FD_SOCK bind_addr property.


To detect the crash or freeze of entire hosts and all of the cluster members running on them, FD_HOST can be used. It is not meant to be used in isolation, as it doesn't detect crashed members on the local host, but in conjunction with other failure detection protocols, such as FD_ALL or FD_SOCK.

FD_HOST can be used when we have multiple cluster members running on a physical box. For example, if we have members {A,B,C,D} running on host 1 and {M,N,O,P} running on host 2, and host 1 is powered down, then A, B, C and D are suspected and removed from the cluster together, typically in one view change.

By default, FD_HOST uses InetAddress.isReachable() to perform liveness checking of other hosts, but if property cmd is set, then any script or command can be used. FD_HOST will launch the command and pass the IP address ot the host to be checked as argument. Example: cmd="ping -c 3".

A typical failure detection configuration would look like this:



...
<FD_SOCK/>
<FD_ALL timeout="60000" interval="20000"/>
<FD_HOST interval="10000" timeout="35000" />
...
            

If we have members {A,B,C} on host 192.168.1.3, {M,N,O} on 192.168.1.4 and {X,Y,Z} on 192.168.1.5, then the behavior is as follows:



NAKACK provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message[9] if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

NAKACK is a Lossless and FIFO delivery of multicast messages, using negative acks. E.g. when receiving P:1, P:3, P:4, a receiver delivers only P:1, and asks P for retransmission of message 2, queuing P3-4. When P2 is finally received, the receiver will deliver P2-4 to the application.

Table 7.27. Properties

NameDescription
become_server_queue_sizeSize of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.
discard_delivered_msgsShould messages delivered to application be discarded
exponential_backoffThe first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0
log_discard_msgsdiscards warnings about promiscuous traffic
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !
max_rebroadcast_timeoutTimeout to rebroadcast messages. Default is 2000 msec
print_stability_history_on_failed_xmitShould stability history be printed if we fail in retransmission. Default is false
retransmit_timeoutsTimeout before requesting retransmissions
suppress_time_non_member_warningsTime during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
use_mcast_xmitRetransmit retransmit responses (messages) using multicast rather than unicast
use_mcast_xmit_reqUse a multicast to request retransmission of missing messages
use_range_based_retransmitterWhether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur
xmit_from_random_memberAsk a random member for retransmission of a missing message. Default is false
xmit_stagger_timeoutNumber of milliseconds to delay the sending of an XMIT request. We pick a random number in the range [1 .. xmit_req_stagger_timeout] and add this to the scheduling time of an XMIT request. When use_mcast_xmit is enabled, if a number of members drop messages from the same member, then chances are that, if staggering is enabled, somebody else already sent the XMIT request (via mcast) and we can cancel the XMIT request once we receive the missing messages. For unicast XMIT responses (use_mcast_xmit=false), we still have an advantage by not overwhelming the receiver with XMIT requests, all at the same time. 0 disabless staggering.
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

NAKACK2 was introduced in 3.1 and is a successor to NAKACK (at some point it will replace NAKACK). It has the same properties as NAKACK, but its implementation is faster and uses less memory, plus it creates fewer tasks in the timer.

Some of the properties of NAKACK were deprecated in NAKACK2, but were not removed so people can simply change from NAKACK to NAKACK2 by changing the protocol name in the config.

Table 7.28. Properties

NameDescription
become_server_queue_sizeSize of the queue to hold messages received after creating the channel, but before being connected (is_server=false). After becoming the server, the messages in the queue are fed into up() and the queue is cleared. The motivation is to avoid retransmissions (see https://issues.jboss.org/browse/JGRP-1509 for details). 0 disables the queue.
discard_delivered_msgsShould messages delivered to application be discarded
log_discard_msgsdiscards warnings about promiscuous traffic
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a RingBuffer. This property might get removed anytime, so don't use it !
max_rebroadcast_timeoutTimeout to rebroadcast messages. Default is 2000 msec
print_stability_history_on_failed_xmitShould stability history be printed if we fail in retransmission. Default is false
suppress_time_non_member_warningsTime during which identical warnings about messages from a non member will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also disables this.
use_mcast_xmitRetransmit retransmit responses (messages) using multicast rather than unicast
use_mcast_xmit_reqUse a multicast to request retransmission of missing messages
xmit_from_random_memberAsk a random member for retransmission of a missing message. Default is false
xmit_intervalInterval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

UNICAST provides reliable delivery and FIFO (= First In First Out) properties for point-to-point messages between one sender and one receiver.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

UNICAST uses positive acks for retransmission; sender A keeps sending message M until receiver B delivers M and sends an ack(M) to A, or until B leaves the cluster or A crashes.

Although JGroups attempts to send acks selectively, UNICAST will still see a lot of acks on the wire. If this is not desired, use UNICAST2 (see Section 7.6.4, “UNICAST2”).

On top of a reliable transport, such as TCP, UNICAST is not really needed. However, concurrent delivery of messages from the same sender is prevented by UNICAST by acquiring a lock on the sender's retransmission table, so unless concurrent delivery is desired, UNICAST should not be removed from the stack even if TCP is used.


UNICAST2 provides lossless, ordered, communication between 2 members. Contrary to UNICAST, it uses negative acks (similar to NAKACK) rather than positive acks. This reduces the communication overhead required for sending an ack for every message.

Negative acks have sender A simply send messages without retransmission, and receivers never ack messages, until they detect a gap: for instance, if A sends messages 1,2,4,5, then B delivers 1 and 2, but queues 4 and 5 because it is missing message 3 from A. B then asks A to retransmit 3. When 3 is received, messages 3, 4 and 5 can be delivered to the application.

Compared to a positive ack scheme as used in UNICAST, negative acks have the advantage that they generate less traffic: if all messages are received in order, we never need to do retransmission.

Table 7.30. Properties

NameDescription
conn_expiry_timeoutTime (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping
exponential_backoffThe first value (in milliseconds) to use in the exponential backoff. Enabled if greater than 0
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_bytesMax number of bytes before a stability message is sent to the sender
max_msg_batch_sizeMax number of messages to be removed from a NakReceiverWindow. This property might get removed anytime, so don't use it !
max_retransmit_timeMax number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this
max_stable_msgsMax number of STABLE messages sent for the same highest_received seqno. A value < 1 is invalid
stable_intervalMax number of milliseconds before a stability message is sent to the sender(s)
timeoutlist of timeouts
use_range_based_retransmitterWhether to use the old retransmitter which retransmits individual messages or the new one which uses ranges of retransmitted messages. Default is true. Note that this property will be removed in 3.0; it is only used to switch back to the old (and proven) retransmitter mechanism if issues occur
xmit_intervalInterval (in milliseconds) at which missing messages (from all retransmit buffers) are retransmitted
xmit_table_automatic_purgingIf enabled, the removal of a message from the retransmission table causes an automatic purge (only for experts)
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

UNICAST3 (available in 3.3) is the successor to UNICAST2, but is based on UNICAST, as it uses a positive acknowledgment mechanism. However, speed wise it is similar to UNICAST2

Details of UNICAST3's design can be found here: UNICAST3

Table 7.31. Properties

NameDescription
ack_batches_immediatelySend an ack for a batch immediately instead of using a delayed ack
ack_thresholdSend an ack immediately when a batch of ack_threshold (or more) messages is received. Otherwise send delayed acks. If 1, ack single messages (similar to UNICAST)
conn_close_timeoutTime (in ms) until a connection marked to be closed will get removed. 0 disables this
conn_expiry_timeoutTime (in milliseconds) after which an idle incoming or outgoing connection is closed. The connection will get re-established when used again. 0 disables connection reaping
log_not_found_msgsIf true, trashes warnings about retransmission messages not found in the xmit_table (used for testing)
max_msg_batch_sizeMax number of messages to be removed from a retransmit window. This property might get removed anytime, so don't use it !
max_retransmit_timeMax number of milliseconds we try to retransmit a message to any given member. After that, the connection is removed. Any new connection to that member will start with seqno #1 again. 0 disables this
xmit_intervalInterval (in milliseconds) at which messages in the send windows are resent
xmit_table_max_compaction_timeNumber of milliseconds after which the matrix in the retransmission table is compacted (only for experts)
xmit_table_msgs_per_rowNumber of elements of a row of the matrix in the retransmission table (only for experts). The capacity of the matrix is xmit_table_num_rows * xmit_table_msgs_per_row
xmit_table_num_rowsNumber of rows of the matrix in the retransmission table (only for experts)
xmit_table_resize_factorResize factor of the matrix in the retransmission table (only for experts)

To serve potential retransmission requests, a member has to store received messages until it is known that every member in the cluster has received them. Message stability for a given message M means that M has been seen by everyone in the cluster.

The stability protocol periodically (or when a certain number of bytes have been received) initiates a consensus protocol, which multicasts a stable message containing the highest message numbers for a given member. This is called a digest.

When everyone has received everybody else's stable messages, a digest is computed which consists of the minimum sequence numbers of all received digests so far. This is the stability vector, and contain only message sequence numbers that have been seen by everyone.

This stability vector is the broadcast to the group and everyone can remove messages from their retransmission tables whose sequence numbers are smaller than the ones received in the stability vector. These messages can then be garbage collected.

Group membership takes care of joining new members, handling leave requests by existing members, and handling SUSPECT messages for crashed members, as emitted by failure detection protocols. The algorithm for joining a new member is essentially:

- loop
- find initial members (discovery)
- if no responses:
    - become singleton group and break out of the loop
- else:
    - determine the coordinator (oldest member) from the responses
    - send JOIN request to coordinator
    - wait for JOIN response
    - if JOIN response received:
        - install view and break out of the loop
    - else
        - sleep for 5 seconds and continue the loop
        

Table 7.34. Properties

NameDescription
flushInvokerClass 
handle_concurrent_startupTemporary switch. Default is true and should not be changed
install_view_locally_firstWhether or not to install a new view locally first before broadcasting it (only done in coord role). Set to true if a state transfer protocol is detected
join_timeoutJoin timeout
leave_timeoutLeave timeout
log_collect_msgsLogs failures for collecting all view acks if true
log_view_warningsLogs warnings for reception of views less than the current, and for views which don't include self
max_bundling_timeMax view bundling timeout if view bundling is turned on. Default is 50 msec
max_join_attemptsNumber of join attempts before we give up and become a singleton. Zero means 'never give up'.
membership_change_policyThe fully qualified name of a class implementing MembershipChangePolicy.
merge_timeoutTimeout (in ms) to complete merge
num_prev_mbrsMax number of old members to keep in history. Default is 50
num_prev_viewsNumber of views to store in history
print_local_addrPrint local address of this member after connect. Default is true
print_physical_addrsPrint physical address(es) on startup
resume_task_timeoutTimeout to resume ViewHandler
use_delta_viewsIf true, then GMS is allowed to send VIEW messages with delta views, otherwise it always sends full views. See https://issues.jboss.org/browse/JGRP-1354 for details.
use_flush_if_presentUse flush for view changes. Default is true
view_ack_collection_timeoutTime in ms to wait for all VIEW acks (0 == wait forever. Default is 2000 msec
view_bundlingView bundling toggle

Flow control takes care of adjusting the rate of a message sender to the rate of the slowest receiver over time. If a sender continuously sends messages at a rate that is faster than the receiver(s), the receivers will either queue up messages, or the messages will get discarded by the receiver(s), triggering costly retransmissions. In addition, there is spurious traffic on the cluster, causing even more retransmissions.

Flow control throttles the sender so the receivers are not overrun with messages.

Note that flow control can be bypassed by setting message flag Message.NO_FC. See Section 5.13, “Tagging messages with flags” for details.

The properties for FlowControl are shown below and can be used in MFC and UFC:


STATE was renamed from (2.x) STREAMING_STATE_TRANSFER, and refactored to extend a common superclass StreamingStateTransfer. The other state transfer protocol extending StreamingStateTransfer is STATE_SOCK (see Section 3.8.11.1.3, “STATE_SOCK”).

STATE uses a streaming approach to state transfer; the state provider writes its state to the output stream passed to it in the getState(OutputStream) callback, which chunks the stream up into chunks that are sent to the state requester in separate messages.

The state requester receives those chunks and feeds them into the input stream from which the state is read by the setState(InputStream) callback.

The advantage compared to STATE_TRANSFER is that state provider and requester only need small (transfer) buffers to keep a part of the state in memory, whereas STATE_TRANSFER needs to copy the entire state into memory.

If we for example have a list of 1 million elements, then STATE_TRANSFER would have to create a byte[] buffer out of it, and return the byte[] buffer, whereas a streaming approach could iterate through the list and write each list element to the output stream. Whenever the buffer capacity is reached, we'd then send a message and the buffer would be reused to receive more data.

Flushing forces group members to send all their pending messages prior to a certain event. The process of flushing acquiesces the cluster so that state transfer or a join can be done. It is also called the stop-the-world model as nobody will be able to send messages while a flush is in process. Flush is used in:

FLUSH is designed as another protocol positioned just below the channel, on top of the stack (e.g. above STATE_TRANSFER). The STATE_TRANSFER and GMS protocols request a flush by sending an event up the stack, where it is handled by the FLUSH protcol. Another event is sent back by the FLUSH protocol to let the caller know that the flush has completed. When done (e.g. view was installed or state transferred), the protocol sends a message, which will allow everyone in the cluster to resume sending.

A channel is notified that the FLUSH phase has been started by the Receiver.block() callback.

Read more about flushing in Section 5.7, “Flushing: making sure every node in the cluster received a message”.


JGroups provides protocols to encrypt cluster traffic (ENCRYPT), and to make sure that only authorized members can join a cluster (AUTH and SASL).

A detailed description of ENCRYPT is found in the JGroups source (JGroups/doc/ENCRYPT.html). Encryption by default only encrypts the message body, but doesn't encrypt message headers. To encrypt the entire message (including all headers, plus destination and source addresses), the property encrypt_entire_message has to be set to true. Also, ENCRYPT has to be below any protocols whose headers we want to encrypt, e.g.



<config ... >
    <UDP />
    <PING />
    <MERGE2 />
    <FD />
    <VERIFY_SUSPECT />
    <pbcast.NAKACK />
    <UNICAST />
    <pbcast.STABLE />
    <FRAG2 />
    <pbcast.GMS />
    <ENCRYPT encrypt_entire_message="false"
             sym_init="128" sym_algorithm="AES/ECB/PKCS5Padding"
             asym_init="512" asym_algorithm="RSA"/>
</config>
                

Note that ENCRYPT sits below NAKACK and UNICAST, so the sequence numbers for these 2 protocols will be encrypted. Had ENCRYPT been placed below UNICAST but above NAKACK, then only UNICAST's headers (including sequence numbers) would have been encrypted, but not NAKACKs.

Note that it doesn't make too much sense to place ENCRYPT even lower in the stack, because then almost all traffic (even merge or discovery traffic) will be encrypted, which may be somewhat of a performance drag.

When we encrypt an entire message, we have to marshal the message into a byte buffer first and then encrypt it. This entails marshalling and copying of the byte buffer, which is not so good performance wise...


SASL is an alternative to the AUTH protocol which provides a layer of authentication to JGroups by allowing the use of one of the SASL mechanisms made available by the JDK. SASL sits below the GMS protocol and listens for JOIN / MERGE REQUEST messages. When a JOIN / MERGE REQUEST is received it tries to find a SaslHeader object which contains the initial response required by the chosen SASL mech. This initiates a sequence of challenge/response messages which, if successful, culminates in allowing the new node to join the cluster. The actual validation logic required by the SASL mech must be provided by the user in the form of a standard javax.security.auth.CallbackHandler implementation.

When authentication is successful, the message is simply passed up the stack to the GMS protocol. When it fails, the SASL protocol creates a JOIN / MERGE RESPONSE message with a failure string and passes it back down the stack. This failure string informs the client of the reason for failure. Clients will then fail to join the group and will throw a SecurityException. If this error string is null then authentication is considered to have passed.

SASL can be (minimally) configured as follows:



<config ... >
    <UDP />
    <PING />
    <pbcast.NAKACK />
    <UNICAST3 />
    <pbcast.STABLE />
    <SASL mech="DIGEST-MD5" 
        client_callback_handler="org.example.ClientCallbackHandler" 
        server_callback_handler="org.example.ServerCallbackHandler"/>
    <pbcast.GMS />
    
</config>
                

The mech property specifies the SASL mech you want to use, as defined by RFC-4422. You will also need to provide two callback handlers, one used when the node is running as coordinator (server_callback_handler) and one used in all other cases (client_callback_handler). Refer to the JDK's SASL reference guide for more details: http://docs.oracle.com/javase/7/docs/technotes/guides/security/sasl/sasl-refguide.html


As discussed in Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender”, the SCOPE protocol is used to deliver updates to different scopes concurrently. It has to be placed somewhere above UNICAST and NAKACK.

SCOPE has a separate thread pool. The reason why the default thread pool from the transport wasn't used is that the default thread pool has a different purpose. For example, it can use a queue to which all incoming messages are added, which would defy the purpose of concurrent delivery in SCOPE. As a matter of fact, using a queue would most likely delay messages get sent up into SCOPE !

Also, the default pool's rejection policy might not be "run", so the SCOPE implementation would have to catch rejection exceptions and engage in a retry protocol, which is complex and wastes resources.

The configuration of the thread pool is shown below. If you expect concurrent messages to N different scopes, then the max pool size would ideally be set to N. However, in most cases, this is not necessary as (a) the messages might not be to different scopes or (b) not all N scopes might get messages at the same time. So even if the max pool size is a bit smaller, the cost of this is slight delays, in the sense that a message for scope Y might wait until the thread processing message for scope X is available.

To remove unused scopes, an expiry policy is provided: expiration_time is the number of milliseconds after which an idle scope is removed. An idle scope is a scope which hasn't seen any messages for expiration_time milliseconds. The expiration_interval value defines the number of milliseconds at which the expiry task runs. Setting both values to 0 disables expiration; it would then have to be done manually (see Section 5.4.4, “Scopes: concurrent message delivery for messages from the same sender” for details).


There are currently 2 locking protocols: org.jgroups.protocols.CENTRAL_LOCK and org.jgroups.protocols.PEER_LOCK. Both extend Locking, which has the following properties:




[9] Note that NAKACK can also be configured to send retransmission requests for M to anyone in the cluster, rather than only to the sender of M.