JBoss.orgCommunity Documentation

Chapter 3. API

3.1. Utility classes
3.1.1. objectToByteBuffer(), objectFromByteBuffer()
3.1.2. objectToStream(), objectFromStream()
3.2. Interfaces
3.2.1. MessageListener
3.2.2. MembershipListener
3.2.3. Receiver
3.2.4. ReceiverAdapter
3.2.5. ChannelListener
3.3. Address
3.4. Message
3.5. Header
3.6. Event
3.7. View
3.7.1. ViewId
3.7.2. MergeView
3.8. JChannel
3.8.1. Creating a channel
3.8.2. Giving the channel a logical name
3.8.3. Generating custom addresses
3.8.4. Joining a cluster
3.8.5. Joining a cluster and getting the state in one operation
3.8.6. Getting the local address and the cluster name
3.8.7. Getting the current view
3.8.8. Sending messages
3.8.9. Receiving messages
3.8.10. Receiving view changes
3.8.11. Getting the group's state
3.8.12. Disconnecting from a channel
3.8.13. Closing a channel

This chapter explains the classes available in JGroups that will be used by applications to build reliable group communication applications. The focus is on creating and using channels.

Information in this document may not be up-to-date, but the nature of the classes in JGroups described here is the same. For the most up-to-date information refer to the Javadoc-generated documentation in the doc/javadoc directory.

All of the classes discussed here are in the org.jgroups package unless otherwise mentioned.

The org.jgroups.util.Util class contains useful common functionality which cannot be assigned to any other package.

These interfaces are used with some of the APIs presented below, therefore they are listed first.

The MembershipListener interface is similar to the MessageListener interface above: every time a new view, a suspicion message, or a block event is received, the corresponding method of the class implementing MembershipListener will be called.



public interface MembershipListener {
    public void viewAccepted(View new_view);
    public void suspect(Object suspected_mbr);
    public void block();
    public void unblock();
}
            

Oftentimes the only callback that needs to be implemented will be viewAccepted() which notifies the receiver that a new member has joined the group or that an existing member has left or crashed. The suspect() callback is invoked by JGroups whenever a member if suspected of having crashed, but not yet excluded [1].

The block() method is called to notify the member that it will soon be blocked sending messages. This is done by the FLUSH protocol, for example to ensure that nobody is sending messages while a state transfer or view installation is in progress. When block() returns, any thread sending messages will be blocked, until FLUSH unblocks the thread again, e.g. after the state has been transferred successfully.

Therefore, block() can be used to send pending messages or complete some other work. Note that block() should be brief, or else the entire FLUSH protocol is blocked.

The unblock() method is called to notify the member that the FLUSH protocol has completed and the member can resume sending messages. If the member did not stop sending messages on block(), FLUSH simply blocked them and will resume, so no action is required from a member. Implementation of the unblock() callback is optional.

Each member of a group has an address, which uniquely identifies the member. The interface for such an address is Address, which requires concrete implementations to provide methods such as comparison and sorting of addresses. JGroups addresses have to implement the following interface:



public interface Address extends Externalizable, Comparable, Cloneable {
    int size();
}
        

For marshalling purposes, size() needs to return the number of bytes an instance of an address implementation takes up in serialized form.

Please never use implementations of Address directly; Address should always be used as an opaque identifier of a cluster node !

Actual implementations of addresses are often generated by the bottommost protocol layer (e.g. UDP or TCP). This allows for all possible sorts of addresses to be used with JGroups.

Since an address uniquely identifies a channel, and therefore a group member, it can be used to send messages to that group member, e.g. in Messages (see next section).

The default implementation of Address is org.jgroups.util.UUID. It uniquely identifies a node, and when disconnecting and reconnecting to a cluster, a node is given a new UUID on reconnection.

UUIDs are never shown directly, but are usually shown as a logical name (see Section 3.8.2, “Giving the channel a logical name”). This is a name given to a node either via the user or via JGroups, and its sole purpose is to make logging output a bit more readable.

UUIDs maps to IpAddresses, which are IP addresses and ports. These are eventually used by the transport protocol to send a message.

Data is sent between members in the form of messages (org.jgroups.Message). A message can be sent by a member to a single member, or to all members of the group of which the channel is an endpoint. The structure of a message is shown in Figure 3.1, “Structure of a message”.


A message has 5 fields:

Destination address

The address of the receiver. If null, the message will be sent to all current group members. Message.getDest() returns the destination address of a message.

Source address

The address of the sender. Can be left null, and will be filled in by the transport protocol (e.g. UDP) before the message is put on the network. Message.getSrc() returns the source address, ie. the address of the sender of a message.

Flags

This is one byte used for flags. The currently recognized flags are OOB, DONT_BUNDLE, NO_FC, NO_RELIABILITY, NO_TOTAL_ORDER, NO_RELAY and RSVP. For OOB, see the discussion on the concurrent stack (Section 5.4, “The concurrent stack”). For the use of flags see Section 5.13, “Tagging messages with flags”.

Payload

The actual data (as a byte buffer). The Message class contains convenience methods to set a serializable object and to retrieve it again, using serialization to convert the object to/from a byte buffer. A message also has an offset and a length, if the buffer is only a subrange of a larger buffer.

Headers

A list of headers that can be attached to a message. Anything that should not be in the payload can be attached to a message as a header. Methods putHeader() , getHeader() and removeHeader() of Message can be used to manipulate headers.

Note that headers are only used by protocol implementers; headers should not be added or removed by application code !

A message is similar to an IP packet and consists of the payload (a byte buffer) and the addresses of the sender and receiver (as Addresses). Any message put on the network can be routed to its destination (receiver address), and replies can be returned to the sender's address.

A message usually does not need to fill in the sender's address when sending a message; this is done automatically by the protocol stack before a message is put on the network. However, there may be cases, when the sender of a message wants to give an address different from its own, so that for example, a response should be returned to some other member.

The destination address (receiver) can be an Address, denoting the address of a member, determined e.g. from a message received previously, or it can be null, which means that the message will be sent to all members of the group. A typical multicast message, sending string "Hello" to all members would look like this:



Message msg=new Message(null, "Hello");
channel.send(msg);
        

A header is a custom bit of information that can be added to each message. JGroups uses headers extensively, for example to add sequence numbers to each message (NAKACK and UNICAST), so that those messages can be delivered in the order in which they were sent.

Events are means by which JGroups protcols can talk to each other. Contrary to Messages, which travel over the network between group members, events only travel up and down the stack.

A view (org.jgroups.View) is a list of the current members of a group. It consists of a ViewId, which uniquely identifies the view (see below), and a list of members. Views are installed in a channel automatically by the underlying protocol stack whenever a new member joins or an existing one leaves (or crashes). All members of a group see the same sequence of views.

Note that the first member of a view is the coordinator (the one who emits new views). Thus, whenever the membership changes, every member can determine the coordinator easily and without having to contact other members, by picking the first member of a view.

The code below shows how to send a (unicast) message to the first member of a view (error checking code omitted):



View view=channel.getView();
Address first=view.getMembers().get(0);
Message msg=new Message(first, "Hello world");
channel.send(msg);
        

Whenever an application is notified that a new view has been installed (e.g. by Receiver.viewAccepted(), the view is already set in the channel. For example, calling Channel.getView() in a viewAccepted() callback would return the same view (or possibly the next one in case there has already been a new view !).

In order to join a group and send messages, a process has to create a channel. A channel is like a socket. When a client connects to a channel, it gives the the name of the group it would like to join. Thus, a channel is (in its connected state) always associated with a particular group. The protocol stack takes care that channels with the same group name find each other: whenever a client connects to a channel given group name G, then it tries to find existing channels with the same name, and joins them, resulting in a new view being installed (which contains the new member). If no members exist, a new group will be created.

A state transition diagram for the major states a channel can assume are shown in Figure 3.2, “Channel states”.


When a channel is first created, it is in the unconnected state. An attempt to perform certain operations which are only valid in the connected state (e.g. send/receive messages) will result in an exception. After a successful connection by a client, it moves to the connected state. Now the channel will receive messages from other members and may send messages to other members or to the group, and it will get notified when new members join or leave. Getting the local address of a channel is guaranteed to be a valid operation in this state (see below). When the channel is disconnected, it moves back to the unconnected state. Both a connected and unconnected channel may be closed, which makes the channel unusable for further operations. Any attempt to do so will result in an exception. When a channel is closed directly from a connected state, it will first be disconnected, and then closed.

The methods available for creating and manipulating channels are discussed now.

A channel is created using one of its public constructors (e.g. new JChannel()).

The most frequently used constructor of JChannel looks as follows:

public JChannel(String props) throws Exception;

The props argument points to an XML file containing the configuration of the protocol stack to be used. This can be a String, but there are also other constructors which take for example a DOM element or a URL (see the javadoc for details).

The code sample below shows how to create a channel based on an XML configuration file:

JChannel ch=new JChannel("/home/bela/udp.xml");

If the props argument is null, the default properties will be used. An exception will be thrown if the channel cannot be created. Possible causes include protocols that were specified in the property argument, but were not found, or wrong parameters to protocols.

For example, the Draw demo can be launched as follows:

java org.javagroups.demos.Draw -props file:/home/bela/udp.xml

or

java org.javagroups.demos.Draw -props http://www.jgroups.org/udp.xml

In the latter case, an application downloads its protocol stack specification from a server, which allows for central administration of application properties.

A sample XML configuration looks like this (edited from udp.xml):



<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <UDP
         mcast_port="${jgroups.udp.mcast_port:45588}"
         tos="8"
         ucast_recv_buf_size="20M"
         ucast_send_buf_size="640K"
         mcast_recv_buf_size="25M"
         mcast_send_buf_size="640K"
         loopback="true"
         discard_incompatible_packets="true"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         ip_ttl="${jgroups.udp.ip_ttl:2}"
         enable_bundling="true"
         enable_diagnostics="true"
         thread_naming_pattern="cl"

         timer_type="new"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="2"
         thread_pool.max_threads="8"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="true"
         thread_pool.queue_max_size="10000"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="Run"/>

    <PING timeout="2000"
            num_initial_members="3"/>
    <MERGE3 max_interval="30000"
            min_interval="10000"/>
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK use_stats_for_retransmission="false"
                   exponential_backoff="0"
                   use_mcast_xmit="true"
                   retransmit_timeout="300,600,1200"
                   discard_delivered_msgs="true"/>
    <UNICAST timeout="300,600,1200"/>
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    <UFC max_credits="2M"
         min_threshold="0.4"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <pbcast.STATE_TRANSFER />
</config>
            

A stack is wrapped by <config> and </config> elements and lists all protocols from bottom (UDP) to top (STATE_TRANSFER). Each element defines one protocol.

Each protocol is implemented as a Java class. When a protocol stack is created based on the above XML configuration, the first element ("UDP") becomes the bottom-most layer, the second one will be placed on the first, etc: the stack is created from the bottom to the top.

Each element has to be the name of a Java class that resides in the org.jgroups.protocols package. Note that only the base name has to be given, not the fully specified class name ( UDP instead of org.jgroups.protocols.UDP). If the protocol class is not found, JGroups assumes that the name given is a fully qualified classname and will therefore try to instantiate that class. If this does not work an exception is thrown. This allows for protocol classes to reside in different packages altogether, e.g. a valid protocol name could be com.sun.eng.protocols.reliable.UCAST.

Each layer may have zero or more arguments, which are specified as a list of name/value pairs in parentheses directly after the protocol name. In the example above, UDP is configured with some options, one of them being the IP multicast port (mcast_port) which is set to 45588, or to the value of the system property jgroups.udp.mcast_port, if set.

Note that all members in a group have to have the same protocol stack.

Usually, channels are created by passing the name of an XML configuration file to the JChannel() constructor. On top of this declarative configuration, JGroups provides an API to create a channel programmatically. The way to do this is to first create a JChannel, then an instance of ProtocolStack, then add all desired protocols to the stack and finally calling init() on the stack to set it up. The rest, e.g. calling JChannel.connect() is the same as with the declarative creation.

An example of how to programmatically create a channel is shown below (copied from ProgrammaticChat):



public class ProgrammaticChat {
    public static void main(String[] args) throws Exception {
        JChannel ch=new JChannel(false);         // (1)
        ProtocolStack stack=new ProtocolStack(); // (2)
        ch.setProtocolStack(stack);
        stack.addProtocol(new UDP().setValue("bind_addr",
                                              InetAddress.getByName("192.168.1.5")))
                .addProtocol(new PING())
                .addProtocol(new MERGE3())
                .addProtocol(new FD_SOCK())
                .addProtocol(new FD_ALL().setValue("timeout", 12000)
                                         .setValue("interval", 3000))
                .addProtocol(new VERIFY_SUSPECT())
                .addProtocol(new BARRIER())
                .addProtocol(new NAKACK())
                .addProtocol(new UNICAST2())
                .addProtocol(new STABLE())
                .addProtocol(new GMS())
                .addProtocol(new UFC())
                .addProtocol(new MFC())
                .addProtocol(new FRAG2());       // (3)
        stack.init();                            // (4)
        ch.setReceiver(new ReceiverAdapter() {
            public void viewAccepted(View new_view) {
                System.out.println("view: " + new_view);
            }
            public void receive(Message msg) {
                Address sender=msg.getSrc();
                System.out.println(msg.getObject() + " [" + sender + "]");
            }
        });
        ch.connect("ChatCluster");
        for(;;) {
            String line=Util.readStringFromStdin(": ");
            ch.send(null, line);
        }
    }
}
                    

First a JChannel is created (1). The 'false' argument tells the channel not to create a ProtocolStack. This is needed because we will create one ourselves later and set it in the channel (2).

Next, all protocols are added to the stack (3). Note that the order is from bottom (transport protocol) to top. So UDP as transport is added first, then PING and so on, until FRAG2, which is the top protocol. Every protocol can be configured via setters, but there is also a generic setValue(String attr_name, Object value), which can be used to configure protocols as well, as shown in the example.

Once the stack is configured, we call ProtocolStack.init() to link all protocols correctly and to call init() in every protocol instance (4). After this, the channel is ready to be used and all subsequent actions (e.g. connect()) can be executed. When the init() method returns, we have essentially the equivalent of new JChannel(config_file).

When a client wants to join a cluster, it connects to a channel giving the name of the cluster to be joined:

public void connect(String cluster) throws Exception;

The cluster name is the name of the cluster to be joined. All channels that call connect() with the same name form a cluster. Messages sent on any channel in the cluster will be received by all members (including the one who sent it [3] ).

The connect() method returns as soon as the cluster has been joined successfully. If the channel is in the closed state (see Figure 3.2, “Channel states”), an exception will be thrown. If there are no other members, i.e. no other member has connected to a cluster with this name, then a new cluster is created and the member joins it as first member. The first member of a cluster becomes its coordinator. A coordinator is in charge of installing new views whenever the membership changes [4] .

Once the channel is connected, messages can be sent using one of the send() methods:



public void send(Message msg) throws Exception;
public void send(Address dst, Serializable obj) throws Exception;
public void send(Address dst, byte[] buf) throws Exception;
public void send(Address dst, byte[] buf, int off, int len) throws Exception;
            

The first send() method has only one argument, which is the message to be sent. The message's destination should either be the address of the receiver (unicast) or null (multicast). When the destination is null, the message will be sent to all members of the cluster (including itself).

The remainaing send() methods are helper methods; they take either a byte[] buffer or a serializable, create a Message and call send(Message).

If the channel is not connected, or was closed, an exception will be thrown upon attempting to send a message.

Here's an example of sending a message to all members of a cluster:



Map data; // any serializable data
channel.send(null, data);
            

The null value as destination address means that the message will be sent to all members in the cluster. The payload is a hashmap, which will be serialized into the message's buffer and unserialized at the receiver. Alternatively, any other means of generating a byte buffer and setting the message's buffer to it (e.g. using Message.setBuffer()) also works.

Here's an example of sending a unicast message to the first member (coordinator) of a group:



Map data;
Address receiver=channel.getView().getMembers().get(0);
channel.send(receiver, "hello world");
            

The sample code determines the coordinator (first member of the view) and sends it a "hello world" message.

While JGroups guarantees that a message will eventually be delivered at all non-faulty members, sometimes this might take a while. For example, if we have a retransmission protocol based on negative acknowledgments, and the last message sent is lost, then the receiver(s) will have to wait until the stability protocol notices that the message has been lost, before it can be retransmitted.

This can be changed by setting the Message.RSVP flag in a message: when this flag is encountered, the message send blocks until all members have acknowledged reception of the message (of course excluding members which crashed or left meanwhile).

This also serves as another purpose: if we send an RSVP-tagged message, then - when the send() returns - we're guaranteed that all messages sent before will have been delivered at all members as well. So, for example, if P sends message 1-10, and marks 10 as RSVP, then, upon JChannel.send() returning, P will know that all members received messages 1-10 from P.

Note that since RSVP'ing a message is costly, and might block the sender for a while, it should be used sparingly. For example, when completing a unit of work (ie. member P sending N messages), and P needs to know that all messages were received by everyone, then RSVP could be used.

To use RSVP, 2 things have to be done:

First, the RSVP protocol has to be in the config, somewhere above the reliable transmission protocols such as NAKACK or UNICAST(2), e.g.:



<config>
    <UDP/>
    <PING />
    <FD_ALL/>
    <pbcast.NAKACK use_mcast_xmit="true"
                   discard_delivered_msgs="true"/>
    <UNICAST timeout="300,600,1200"/>
    <RSVP />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"
                view_bundling="true"/>
    ...
</config>
        

Secondly, the message we want to get ack'ed must be tagged with RSVP:



Message msg=new Message(null, null, "hello world");
msg.setFlag(Message.RSVP);
ch.send(msg);
                

Here, we send a message to all cluster members (dest = null). (Note that RSVP also works for sending a message to a unicast destination). Method send() will return as soon as it has received acks from all current members. If there are 4 members A, B, C and D, and A has received acks from itself, B and C, but D's ack is missing and D crashes before the timeout kicks in, then this will nevertheless make send() return, as if D had actually sent an ack.

If the timeout property if greater than 0, and we don't receive all acks within timeout milliseconds, a TimeoutException will be thrown (if RSVP.throw_exception_on_timeout is true). The application can choose to catch this (runtime) exception and do something with it, e.g. retry.

The configuration of RSVP is described here: Section 7.6.6, “RSVP”.

Note

RSVP was added in version 3.1.

As shown above, the viewAccepted() callback of ReceiverAdapter can be used to get callbacks whenever a cluster membership change occurs. The receiver needs to be set via JChannel.setReceiver(Receiver).

As discussed in Section 3.2.4, “ReceiverAdapter”, code in callbacks must avoid anything that takes a lot of time, or blocks; JGroups invokes this callback as part of the view installation, and if this user code blocks, the view installation would block, too.

A newly joined member may want to retrieve the state of the cluster before starting work. This is done with getState():

public void getState(Address target, long timeout) throws Exception;

This method returns the state of one member (usually of the oldest member, the coordinator). The target parameter can usually be null, to ask the current coordinator for the state. If a timeout (ms) elapses before the state is fetched, an exception will be thrown. A timeout of 0 waits until the entire state has been transferred.

To participate in state transfer, both state provider and state requester have to implement the following callbacks from ReceiverAdapter (Receiver):



public void getState(OutputStream output) throws Exception;
public void setState(InputStream input) throws Exception;
            

Method getState() is invoked on the state provider (usually the coordinator). It needs to write its state to the output stream given. Note that output doesn't need to be closed when done (or when an exception is thrown); this is done by JGroups.

The setState() method is invoked on the state requester; this is the member which called JChannel.getState(). It needs to read its state from the input stream and set its internal state to it. Note that input doesn't need to be closed when done (or when an exception is thrown); this is done by JGroups.

In a cluster consisting of A, B and C, with D joining the cluster and calling Channel.getState(), the following sequence of callbacks happens:

The following code fragment shows how a group member participates in state transfers:



public void getState(OutputStream output) throws Exception {
    synchronized(state) {
        Util.objectToStream(state, new DataOutputStream(output));
    }
}
public void setState(InputStream input) throws Exception {
    List<String> list;
    list=(List<String>)Util.objectFromStream(new DataInputStream(input));
    synchronized(state) {
        state.clear();
        state.addAll(list);
    }
    System.out.println(list.size() + " messages in chat history):");
    for(String str: list)
        System.out.println(str);
    }
}
            

This code is the Chat example from the JGroups tutorial and the state here is a list of strings.

The getState() implementation synchronized on the state (so no incoming messages can modify it during the state transfer), and uses the JGroups utility method objectToStream().

The setState() implementation also uses the Util.objectFromStream() utility method to read the state from the input stream and assign it to its internal list.



[1] It could be that the member is suspected falsely, in which case the next view would still contain the suspected member (there is no unsuspect() method

[2] Note that the latter 2 methods only take the ID into account.

[3] Local delivery can be turned off using setDiscardOwnMessages(true).

[4] This is managed internally however, and an application programmer does not need to be concerned about it.