Chapter 3. API

This chapter explains the classes available in JGroups that will be used by applications to build reliable group communication applications. The focus is on creating and using channels.

Information in this document may not be up-to-date, but the nature of the classes in the JGroups toolkit described here is the same. For the most up-to-date information refer to the Javadoc-generated documentation in the doc/javadoc directory.

All of the classes discussed below reside in the org.jgroups package unless otherwise mentioned.

3.1. Utility classes

The org.jgroups.util.Util class contains a collection of useful functionality which cannot be assigned to any particular package.

3.1.1. objectToByteBuffer(), objectFromByteBuffer()

The first method takes an object as argument and serializes it into a byte buffer (the object has to be serializable or externalizable). The byte array is then returned. This method is often used to serialize objects into the byte buffer of a message. The second method returns a reconstructed object from a buffer. Both methods throw an exception if the object cannot be serialized or unserialized.

3.2. Interfaces

These interfaces are used with some of the APIs presented below, therefore they are listed first.

3.2.1. MessageListener

Contrary to the pull-style of channels, some building blocks (e.g. PullPushAdapter ) provide an event-like push-style message delivery model. In this case, the entity to be notified of message reception needs to provide a callback to be invoked whenever a message has been received. The MessageListener interface below provides a method to do so:

          public interface MessageListener {
              public void receive(Message msg);
              byte[] getState();
              void setState(byte[] state);
          }
          

Method receive() will be called when a message is received. The getState() and setState() methods are used to fetch and set the group state (e.g. when joining). Refer to Section 3.6.13, “Getting the group's state” for a discussion of state transfer.

3.2.2. ExtendedMessageListener

JGroups release 2.3 introduced ExtendedMessageListener enabling partial state transfer (refer to Section 3.6.15, “Partial state transfer” ) while release 2.4 further expands ExtendedMessageListener with streaming state transfer callbacks:

         public interface ExtendedMessageListener extends MessageListener {
             byte[] getState(String state_id);
             void setState(String state_id, byte[] state);
                      
             /*** since JGroups 2.4 *****/
             void getState(OutputStream ostream);
             void getState(String state_id, OutputStream ostream);
             void setState(InputStream istream);
             void setState(String state_id, InputStream istream);
          }
       

3.2.3. MembershipListener

The MembershipListener interface is similar to the MessageListener interface above: every time a new view, a suspicion message, or a block event is received, the corresponding method of the class implementing MembershipListener will be called.

            public interface MembershipListener {
                public void viewAccepted(View new_view);
                public void suspect(Object suspected_mbr);
                public void block();
            }
          

Oftentimes the only method containing any functionality will be viewAccepted() which notifies the receiver that a new member has joined the group or that an existing member has left or crashed. The suspect() callback is invoked by JGroups whenever a member if suspected of having crashed, but not yet excluded [1].

The block() method is called to notify the member that it will soon be blocked sending messages. This is done by the FLUSH protocol, for example to ensure that nobody is sending messages while a state transfer is in progress. When block() returns, any thread sending messages will be blocked, until FLUSH unblocks the thread again, e.g. after the state has been transferred successfully.

Therefore, block() can be used to send pending messages or complete some other work.

Note that block() should be brief, or else the entire FLUSH protocol is blocked.

Sending messages in callbacks

Note that anything that could block should not be done in a callback. This includes sending of messages; if we have FLUSH on the stack, and send a message in a viewAccepted() callback, then the following happens: the FLUSH protocol blocks all (multicast) messages before installing a view, then installs the view, then unblocks. However, because installation of the view triggers the viewAccepted() callback, sending of messages inside of viewAccepted() will block. This in turn blocks the viewAccepted() thread, so the flush will never return !

If we need to send a message in a callback, the sending should be done on a separate thread, or a timer task should be submitted to the timer.

3.2.4. ExtendedMembershipListener

The ExtendedMembershipListener interface extends MembershipListener:

              public interface ExtendedMembershipListener extends MembershipListener {
                  public void unblock();
              }
          

The unblock() method is called to notify the member that the FLUSH protocol has completed and the member can resume sending messages. If the member did not stop sending messages on block(), FLUSH simply blocked them and will resume, so no action is required from a member. Implementation of the unblock() callback is optional.

3.2.5. ChannelListener

           public interface ChannelListener {
               void channelConnected(Channel channel);
               void channelDisconnected(Channel channel);
               void channelClosed(Channel channel);
               void channelShunned(); // deprecated in 2.8
               void channelReconnected(Address addr); // deprecated in 2.8
           }
        

A class implementing ChannelListener can use the Channel.setChannelListener() method to register with a channel to obtain information about state changes in a channel. Whenever a channel is closed, disconnected or opened a callback will be invoked.

3.2.6. Receiver

         public interface Receiver extends MessageListener, MembershipListener {
         }
      

A Receiver can be used to receive messages and view changes in push-style; rather than having to pull these events from a channel, they will be dispatched to the receiver as soon as they have been received. This saves one thread (application thread, pulling messages from a channel, or the PullPushAdapter thread

Note that JChannel.receive() has been deprecated and will be removed in 3.0. The preferred way of receiving messages is now via a Receiver callback (push style).

3.2.7. ExtendedReceiver

      public interface ExtendedReceiver extends ExtendedMessageListener, MembershipListener {
      }
      

This is a receiver who will be able to handle partial state transfer

3.2.8. ReceiverAdapter and ExtendedReceiverAdapter

These classes implement Receiver and ExtendedReceiver. When implementing a callback, one can simply extend ReceiverAdapter and overwrite receive() in order to not having to implement all callbacks of the interface.

Merging of Extended interfaces with their super interfaces

The Extended- interfaces (ExtendedMessageListener, ExtendedReceiver) will be merged with their parents in the 3.0 release of JGroups. The reason is that this will create an API backwards incompatibility, which we didn't want to introduce in the 2.x series.

3.3. Address

Each member of a group has an address, which uniquely identifies the member. The interface for such an address is Address, which requires concrete implementations to provide methods for comparison and sorting of addresses, and for determination whether the address is a multicast address. JGroups addresses have to implement the following interface:

       public interface Address extends Externalizable, Comparable, Cloneable {
           boolean isMulticastAddress();
           int size();
       }
    

Please never use implementations of Address directly; Address should always be used as an opaque identifier of a cluster node !

Actual implementations of addresses are often generated by the bottommost protocol layer (e.g. UDP or TCP). This allows for all possible sorts of addresses to be used with JGroups, e.g. ATM.

In JChannel, it is the IP address of the host on which the stack is running and the port on which the stack is receiving incoming messages; it is represented by the concrete class org.jgroups.stack.IpAddress. Instances of this class are only used within the JChannel protocol stack; users of a channel see addresses (of any kind) only as Addresses. Since an address uniquely identifies a channel, and therefore a group member, it can be used to send messages to that group member, e.g. in Messages (see next section).

In 2.8, the default implementation of Address was changed from IpAddress to org.jgroups.util.UUID.

3.4. Message

Data is sent between members in the form of messages ( org.jgroups.Message ). A message can be sent by a member to a single member , or to all members of the group of which the channel is an endpoint. The structure of a message is shown in Figure 3.1, “Structure of a message” .

Structure of a message

Figure 3.1. Structure of a message

A message contains 5 fields:

Destination address

The address of the receiver. If null , the message will be sent to all current group members

Source address

The address of the sender. Can be left null , and will be filled in by the transport protocol (e.g. UDP) before the message is put on the network

Flags

This is one byte used for flags. The currently recognized flags are OOB, LOW_PRIO and HIGH_PRIO. See the discussion on the concurrent stack for OOB.

Payload

The actual data (as a byte buffer). The Message class contains convenience methods to set a serializable object and to retrieve it again, using serialization to convert the object to/from a byte buffer.

Headers

A list of headers that can be attached to a message. Anything that should not be in the payload can be attached to a message as a header. Methods putHeader() , getHeader() and removeHeader() of Message can be used to manipulate headers.

A message is similar to an IP packet and consists of the payload (a byte buffer) and the addresses of the sender and receiver (as Addresses). Any message put on the network can be routed to its destination (receiver address), and replies can be returned to the sender's address.

A message usually does not need to fill in the sender's address when sending a message; this is done automatically by the protocol stack before a message is put on the network. However, there may be cases, when the sender of a message wants to give an address different from its own, so that for example, a response should be returned to some other member.

The destination address (receiver) can be an Address, denoting the address of a member, determined e.g. from a message received previously, or it can be null , which means that the message will be sent to all members of the group. A typical multicast message, sending string "Hello" to all members would look like this:

         Message msg=new Message(null, null, "Hello");
         channel.send(msg);
      

3.5. View

A View ( View ) is a list of the current members of a group. It consists of a ViewId , which uniquely identifies the view (see below), and a list of members. Views are set in a channel automatically by the underlying protocol stack whenever a new member joins or an existing one leaves (or crashes). All members of a group see the same sequence of views.

Note that there is a comparison function which orders all the members of a group in the same way. Usually, the first member of the list is the coordinator (the one who emits new views). Thus, whenever the membership changes, every member can determine the coordinator easily and without having to contact other members.

The code below shows how to send a (unicast) message to the first member of a view (error checking code omitted):

       View view=channel.getView();
       Address first=view.getMembers().first();
       Message msg=new Message(first, null, "Hello world");
       channel.send(msg);
    

Whenever an application is notified that a new view has been installed (e.g. by Receiver.viewAccepted(), the view is already set in the channel. For example, calling Channel.getView() in a viewAccepted() callback would return the same view (or possibly the next one in case there has already been a new view !).

3.5.1. ViewId

The ViewId is used to uniquely number views. It consists of the address of the view creator and a sequence number. ViewIds can be compared for equality and put in a hashtable as they implement equals() and hashCode() methods.[2]

3.5.2. MergeView

Whenever a group splits into subgroups, e.g. due to a network partition, and later the subgroups merge back together, a MergeView instead of a View will be received by the application. The MergeView class is a subclass of View and contains as additional instance variable the list of views that were merged. As an example if the group denoted by view V1:(p,q,r,s,t) split into subgroups V2:(p,q,r) and V2:(s,t) , the merged view might be V3:(p,q,r,s,t) . In this case the MergeView would contains a list of 2 views: V2:(p,q,r) and V2:(s,t) .

3.6. JChannel

In order to join a group and send messages, a process has to create a channel. A channel is like a socket. When a client connects to a channel, it gives the the name of the group it would like to join. Thus, a channel is (in its connected state) always associated with a particular group. The protocol stack takes care that channels with the same group name find each other: whenever a client connects to a channel given group name G, then it tries to find existing channels with the same name, and joins them, resulting in a new view being installed (which contains the new member). If no members exist, a new group will be created.

A state transition diagram for the major states a channel can assume are shown in Figure 3.2, “Channel states” .

Channel states

Figure 3.2. Channel states

When a channel is first created, it is in the unconnected state. An attempt to perform certain operations which are only valid in the connected state (e.g. send/receive messages) will result in an exception. After a successful connection by a client, it moves to the connected state. Now channels will receive messages, views and suspicions from other members and may send messages to other members or to the group. Getting the local address of a channel is guaranteed to be a valid operation in this state (see below). When the channel is disconnected, it moves back to the unconnected state. Both a connected and unconnected channel may be closed, which makes the channel unusable for further operations. Any attempt to do so will result in an exception. When a channel is closed directly from a connected state, it will first be disconnected, and then closed.

The methods available for creating and manipulating channels are discussed now.

3.6.1. Creating a channel

A channel can be created in two ways: an instance of a subclass of Channel is created directly using its public constructor (e.g. new JChannel() ), or a channel factory is created, which -- upon request -- creates instances of channels. We will only look at the first method of creating channel: by direct instantiation.

The public constructor of JChannel looks as follows:

          public JChannel(String props) throws ChannelException {}
      

It creates an instance of JChannel . The props argument points to an XML file containing the configuration of the protocol stack to be used. This can be a String, but there are also other constructors which take for example a DOM element or a URL (more on this later).

If the props argument is null, the default properties will be used. An exception will be thrown if the channel cannot be created. Possible causes include protocols that were specified in the property argument, but were not found, or wrong parameters to protocols.

For example, the Draw demo can be launched as follows:

          java org.javagroups.demos.Draw -props file:/home/bela/udp.xml
        

or

          java org.javagroups.demos.Draw -props http://www.jgroups.org/udp.xml
        

In the latter case, an application downloads its protocol stack specification from a server, which allows for central administration of application properties.

A sample XML configuration looks like this (edited from udp.xml):

              <config>
                  <UDP
                       mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
                       mcast_port="${jgroups.udp.mcast_port:45588}"
                       discard_incompatible_packets="true"
                       max_bundle_size="60000"
                       max_bundle_timeout="30"
                       ip_ttl="${jgroups.udp.ip_ttl:2}"
                       enable_bundling="true"
                       thread_pool.enabled="true"
                       thread_pool.min_threads="1"
                       thread_pool.max_threads="25"
                       thread_pool.keep_alive_time="5000"
                       thread_pool.queue_enabled="false"
                       thread_pool.queue_max_size="100"
                       thread_pool.rejection_policy="Run"
                       oob_thread_pool.enabled="true"
                       oob_thread_pool.min_threads="1"
                       oob_thread_pool.max_threads="8"
                       oob_thread_pool.keep_alive_time="5000"
                       oob_thread_pool.queue_enabled="false"
                       oob_thread_pool.queue_max_size="100"
                       oob_thread_pool.rejection_policy="Run"/>
                  <PING timeout="2000"
                          num_initial_members="3"/>
                  <MERGE2 max_interval="30000"
                          min_interval="10000"/>
                  <FD_SOCK/>
                  <FD timeout="10000" max_tries="5" />
                  <VERIFY_SUSPECT timeout="1500"  />
                  <BARRIER />
                  <pbcast.NAKACK
                                 use_mcast_xmit="false" gc_lag="0"
                                 retransmit_timeout="300,600,1200,2400,4800"
                                 discard_delivered_msgs="true"/>
                  <UNICAST timeout="300,600,1200,2400,3600"/>
                  <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                                 max_bytes="400000"/>
                  <VIEW_SYNC avg_send_interval="60000"   />
                  <pbcast.GMS print_local_addr="true" join_timeout="3000"
                              view_bundling="true"/>
                  <FC max_credits="20000000"
                                  min_threshold="0.10"/>
                  <FRAG2 frag_size="60000"  />
                  <pbcast.STATE_TRANSFER  />
              </config>
          

A stack is wrapped by <config> and </config> elements and lists all protocols from bottom (UDP) to top (STATE_TRANSFER). Each element defines one protocol.

Each protocol is implemented as a Java class. When a protocol stack is created based on the above XML configuration, the first element ("UDP") becomes the bottom-most layer, the second one will be placed on the first, etc: the stack is created from the bottom to the top.

Each element has to be the name of a Java class that resides in the org.jgroups.stack.protocols package. Note that only the base name has to be given, not the fully specified class name (UDP instead of org.jgroups.stack.protocols.UDP). If the protocol class is not found, JGroups assumes that the name given is a fully qualified classname and will therefore try to instantiate that class. If this does not work an exception is thrown. This allows for protocol classes to reside in different packages altogether, e.g. a valid protocol name could be com.sun.eng.protocols.reliable.UCAST .

Each layer may have zero or more arguments, which are specified as a list of name/value pairs in parentheses directly after the protocol name. In the example above, UDP is configured with some options, one of them being the IP multicast address (mcast_addr) which is set to 228.10.10.10, or to the value of the system property jgroups.udp.mcast_addr, if set.

Note that all members in a group have to have the same protocol stack.

3.6.1.1. Programmatic creation

Usually, channels are created by passing the name of an XML configuration file to the JChannel() constructor. On top of this declarative configuration, JGroups provides an API to create a channel programmatically. The way to do this is to first create a JChannel, then an instance of ProtocolStack, then add all desired protocols to the stack and finally calling init() on the stack to set it up. The rest, e.g. calling JChannel.connect() is the same as with the declarative creation.

An example of how to programmatically create a channel is shown below (copied from ProgrammaticChat):

     JChannel ch=new JChannel(false);         // 1
     ProtocolStack stack=new ProtocolStack(); // 2
     ch.setProtocolStack(stack);              // 3
     stack.addProtocol(new UDP().setValue("bind_addr", InetAddress.getByName("192.168.1.5")))
             .addProtocol(new PING())
             .addProtocol(new MERGE2())
             .addProtocol(new FD_SOCK())
             .addProtocol(new FD_ALL().setValue("timeout", 12000).setValue("interval", 3000))
             .addProtocol(new VERIFY_SUSPECT())
             .addProtocol(new BARRIER())
             .addProtocol(new NAKACK())
             .addProtocol(new UNICAST2())
             .addProtocol(new STABLE())
             .addProtocol(new GMS())
             .addProtocol(new UFC())
             .addProtocol(new MFC())
             .addProtocol(new FRAG2());       // 4
     stack.init();                            // 5

     ch.setReceiver(new ReceiverAdapter() {
         public void viewAccepted(View new_view) {
             System.out.println("view: " + new_view);
         }

         public void receive(Message msg) {
             System.out.println(msg.getObject() + " [" + msg.getSrc() + "]");
         }
     });

     ch.connect("ChatCluster");

     for(;;) {
         String line=Util.readStringFromStdin(": ");
         ch.send(null, null, line);
     }
                

First a JChannel is created. The 'false' argument tells the channel not to create a ProtocolStack. This is needed because we will create one ourselves later (2) and set it in the channel (3).

Next, all protocols are added to the stack. Note that the order is from bottom (transport protocol) to top. So UDP as transport is added first, then PING and so on, until FRAG2, which is the top protocol. Every protocol can be configured via setters, but there is also a generic setValue(String attr_name, Object value), which can be used to configure protocols as well, as shown in the example.

Once the stack is configured, we call ProtocolStack.init() to link all protocols correctly and to call init() in every protocol instance. After this, the channel is ready to be used and all subsequent actions (e.g. connect()) can be executed. When the init() method returns, we have essentially the equivalent of new JChannel(config_file).

3.6.2. Setting options

A number of options can be set in a channel. To do so, the following method is used:

              public void setOpt(int option, Object value);
              

Arguments are the options number and a value. The following options are currently recognized:

Channel.BLOCK

The argument is a boolean object. If true, block messages will be received.

Channel.LOCAL

Local delivery. The argument is a boolean value. If set to true, a member will receive all messages it sent to itself. Otherwise, all messages sent by itself will be discarded. This option allows to send messages to the group, without receiving a copy. Default is true (members will receive their own copy of messages multicast to the group).

Channel.AUTO_RECONNECT

When set to true, a shunned channel will leave the group and then try to automatically re-join. Default is false. Note that in 2.8, shunning has been removed, therefore this option has been deprecated.

Channel.AUTO_GETSTATE

When set to true a shunned channel, after reconnection, will attempt to fetch the state from the coordinator. This requires AUTO_RECONNECT to be true as well. Default is false. Note that in 2.8, shunning has been removed, therefore this option has been deprecated.

The equivalent method to get options is getOpt():

          public Object getOpt(int option);
      

Given an option, the current value of the option is returned.

Deprecating options in 3.0

Most of the options (except LOCAL) have been deprecated in 2.6.x and will be removed in 3.0.

3.6.3. Giving the channel a logical name

A channel can be given a logical name which is then used instead of the channel's address. A logical name might show the function of a channel, e.g. "HostA-HTTP-Cluster", which is more legible than a UUID 3c7e52ea-4087-1859-e0a9-77a0d2f69f29.

For example, when we have 3 channels, using logical names we might see a view "{A,B,C}", which is nicer than "{56f3f99e-2fc0-8282-9eb0-866f542ae437, ee0be4af-0b45-8ed6-3f6e-92548bfa5cde, 9241a071-10ce-a931-f675-ff2e3240e1ad} !"

If no logical name is set, JGroups generates one, using the hostname and a random number, e.g. linux-3442. If this is not desired and the UUIDs should be shown, use system property -Djgroups.print_uuids=true.

The logical name can be set using:

               public void setName(String logical_name);
            

This should be done before connecting a channel. Note that the logical name stays with a channel until the channel is destroyed, whereas a UUID is created on each connection.

When JGroups starts, it prints the logical name and the associated physical address(es):

            -------------------------------------------------------------------
            GMS: address=mac-53465, cluster=DrawGroupDemo, physical address=192.168.1.3:49932
            -------------------------------------------------------------------
            ** View=[mac-53465|0] [mac-53465]
            

The logical name is mac-53465 and the physical address is 192.168.1.3:49932. The UUID is not shown here.

3.6.4. Generating custom addresses

Since 2.12 address generation is pluggable. This means that an application can determine what kind of addresses it uses. The default address type is UUID, and since some protocols use UUID, it is recommended to provide custom classes as subclasses of UUID.

This can be used to for example pass additional data around with an address, for example information about the location of the node to which the address is assigned. Note that methods equals(), hashCode() and compare() of the UUID super class should not be changed.

To use custom addresses, the following things have to be done:

  • Write an implementation of org.jgroups.stack.AddressGenerator
  • For any class CustomAddress, it will need to get registered with the ClassConfigurator in order to marshal it correctly:
            class CustomAddress extends UUID {
                static {
                    ClassConfigurator.add((short)8900, CustomAddress.class);
                }
            }
                        
    Note that the ID should be chosen such that it doesn't collide with any IDs defined in jg-magic-map.xml.
  • Set the address generator in JChannel: setAddressGenerator(AddressGenerator). This has to be done before the channel is connected

An example of a subclass is org.jgroups.util.PayloadUUID.

3.6.5. Connecting to a channel

When a client wants to join a group, it connects to a channel giving the name of the group to be joined:

           public void connect(String clustername) throws ChannelClosed;
      

The cluster name is a string, naming the cluster to be joined. All channels that are connected to the same name form a cluster. Messages multicast on any channel in the cluster will be received by all members (including the one who sent it [3] ).

The method returns as soon as the group has been joined successfully. If the channel is in the closed state (see Figure 3.2, “Channel states” ), an exception will be thrown. If there are no other members, i.e. no other member has connected to a group with this name, then a new group is created and the member joined. The first member of a group becomes its coordinator . A coordinator is in charge of multicasting new views whenever the membership changes [4] .

3.6.6. Connecting to a channel and getting the state in one operation

Clients can also join a cluster group and fetch cluster state in one operation. The best way to conceptualize connect and fetch state connect method is to think of it as an invocation of regular connect and getstate methods executed in succession. However, there are several advantages of using connect and fetch state connect method over regular connect. First of all, underlying message exchange is heavily optimized, especially if the flush protocol is used in the stack. But more importantly, from clients perspective, connect and join operations become one atomic operation.

		      public void connect(string cluster_name, address target,
                                  string state_id, long timeout)
                          throws ChannelException;
		      

Just as in regular connect method cluster name represents a cluster to be joined. Address parameter indicates a cluster member to fetch state from. Null address parameter indicates that state should be fetched from the cluster coordinator. If state should be fetched from a particular member other than coordinator clients can provide an address of that member. State id used for partial state transfer while timeout bounds entire join and fetch operation.

3.6.7. Getting the local address and the group name

Method getLocalAddress() returns the local address of the channel[5]. In the case of JChannel , the local address is generated by the bottom-most layer of the protocol stack when the stack is connected to. That means that -- depending on the channel implementation -- the local address may or may not be available when a channel is in the unconnected state.

         public Address getLocalAddress(); // use getAddress() with 2.8.0+
      

Method getClusterName() returns the name of the cluster in which the channel is a member:

          public String getClusterName();
      

Again, the result is undefined if the channel is in the unconnected or closed state.

3.6.8. Getting the current view

The following method can be used to get the current view of a channel:

         public View getView();
      

This method does not retrieve a new view (message) from the channel, but only returns the current view of the channel. The current view is updated every time a view message is received: when method receive() is called, and the return value is a view, before the view is returned, it will be installed in the channel, i.e. it will become the current view.

Calling this method on an unconnected or closed channel is implementation defined. A channel may return null, or it may return the last view it knew of.

3.6.9. Sending a message

Once the channel is connected, messages can be sent using the send() methods:

         public void send(Message msg) throws ChannelNotConnected, ChannelClosed;
         public void send(Address dst, Address src, Object obj)
                          throws ChannelNotConnected, ChannelClosed;
      

The first send() method has only one argument, which is the message to be sent. The message's destination should either be the address of the receiver (unicast) or null (multicast). When it is null, the message will be sent to all members of the group (including itself). The source address may be null; if it is, it will be set to the channel's address (so that recipients may generate a response and send it back to the sender).

The second send() method is a helper method and uses the former method internally. It requires the address of receiver and sender and an object (which has to be serializable), constructs a Message and sends it.

If the channel is not connected, or was closed, an exception will be thrown upon attempting to send a message.

Here's an example of sending a (multicast) message to all members of a group:

          Map data; // any serializable data
          try {
              channel.send(null, null, data);
          }
          catch(Exception ex) {
              // handle errors
          }
      

The null value as destination address means that the message will be sent to all members in the group. The sender's address will be filled in by the bottom-most protocol. The payload is a hashmap, which will be serialized into the message's buffer and unserialized at the receiver's end. Alternatively, any other means of generating a byte buffer and setting the message's buffer to it (e.g. using Message.setBuffer()) would also work.

Here's an example of sending a (unicast) message to the first member (coordinator) of a group:

          Map data;
          try {
              Address receiver=channel.getView().getMembers().first();
              channel.send(receiver, null, data);
          }
          catch(Exception ex) {
              // handle errors
          }
      

It creates a Message with a specific address for the receiver (the first member of the group). Again, the sender's address can be left null as it will be filled in by the bottom-most protocol.

3.6.10. Receiving a message

Method receive() is used to receive messages, views, suspicions and blocks:

         public Object receive(long timeout) throws ChannelNotConnected,
                                                  ChannelClosed, Timeout;
      

A channel receives messages asynchronously from the network and stores them in a queue. When receive() is called, the next available message from the top of that queue is removed and returned. When there are no messages on the queue, the method will block. If timeout is greater than 0, it will wait the specified number of milliseconds for a message to be received, and throw a TimeoutException exception if none was received during that time. If the timeout is 0 or negative, the method will wait indefinitely for the next available message.

Depending on the channel options (see Section 3.6.2, “Setting options” ), the following types of objects may be received:

Message

A regular message. To send a response to the sender, a new message can be created. Its destination address would be the received message's source address. Method Message.makeReply() is a helper method to create a response.

View

A view change, signalling that a member has joined, left or crashed. The application may or may not perform some action upon receiving a view change (e.g. updating a GUI object of the membership, or redistributing a load-balanced collaborative task to all members). Note that a longer action, or any action that blocks should be performed in a separate thread. A MergeView will be received when 2 or more subgroups merged into one (see Section 3.5.2, “MergeView” for details). Here, a possible state merge by the application needs to be done in a separate thread.

SuspectEvent

Notification of a member that is suspected. Method SuspectEvent.getMember() retrieves the address of the suspected member. Usually this message will be followed by a view change.

BlockEvent

The application has to stop sending messages. When the application has stopped sending messages, it needs to acknowledge this message with a Channel.blockOk() method.

The BlockEvent reception can be used to complete pending tasks, e.g. send pending messages, but once Channel.blockOk() has been called, all threads that send messages (calling Channel.send() or Channel.down()) will be blocked until FLUSH unblocks them.

UnblockEvent

The application can resume sending messages. Any previously messages blocked by FLUSH will be unblocked; when the UnblockEvent is received the channel has already been unblocked.

GetStateEvent

Received when the application's current state should be saved (for a later state transfer. A copy of the current state should be made (possibly wrapped in a synchronized statement and returned calling method Channel.returnState() . If state transfer events are not enabled on the channel (default), then this event will never be received. This message will only be received with the Virtual Synchrony suite of protocols (see the Programmer's Guide).

StreamingGetStateEvent

Received when the application's current state should be provided to a state requesting group member. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.

SetStateEvent

Received as response to a getState(s) method call. The argument contains the state of a single member ( byte[] ) or of all members ( Vector ). Since the state of a single member could also be a vector, the interpretation of the argument is left to the application.

StreamingSetStateEvent

Received at state requesting member when the state InputStream becomes ready for reading. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.

The caller has to check the type of the object returned. This can be done using the instanceof operator, as follows:

          Object obj=channel.receive(0); // wait forever
          if(obj instanceof Message)
              Message msg=(Message)obj;
          else if(obj instanceof View)
              View v=(View)obj;
          else
              ; // don't handle suspicions or blocks
      

If for example views, suspicions and blocks are disabled, then the caller is guaranteed to only receive return values of type Message . In this case, the return value can be cast to a Message directly, without using the instanceof operator.

If the channel is not connected, or was closed, a corresponding exception will be thrown.

The example below shows how to retrieve the "Hello world" string from a message:

          Message msg; // received above
          try {
              String s=(String)msg.getObject(); // error if obj not Serializable
              // alternative: s=new String(msg.getBuffer());
          }
          catch(Exception ex) {
              // handle errors, e.g. casting error above)
          }
      

The Message.getObject() method retrieves the message's byte buffer, converts it into a (serializable) object and returns the object.

3.6.11. Using a Receiver to receive messages

Instead of pulling messages from a channel in an application thread, a Receiver can be registered with a channel. This is the preferred and recommended way of receiving messages. In 3.0, the receive() method will be removed from JChannel. All received messages, view changes and state transfer requests will invoke callbacks on the registered Receiver:

          JChannel ch=new JChannel();
          ch.setReceiver(new ExtendedReceiverAdapter() {
              public void receive(Message msg) {
                  System.out.println("received message " + msg);
              }
              public void viewAccepted(View new_view) {
                  System.out.println("received view " + new_view);
              }
          });
          ch.connect("bla");
      

The ExtendedReceiverAdapter class implements all callbacks of ExtendedReceiver with no-ops, in the example above we override receive() and viewAccepted().

The advantage of using a Receiver is that the application doesn't have to waste 1 thread for pulling messages out of a channel. In addition, the channel doesn't have to maintain an (unbounded) queue of messages/views, which can quickly get large if the receiver cannot process messages fast enough, and the sender keeps sending messages.

Note

Note that the Channel.receive() method has been deprecated, and will be removed in 3.0. Use the Receiver interface instead and register as a Receiver with Channel.setReceiver(Receiver r).

3.6.12. Peeking at a message

Instead of removing the next available message from the channel, peek() just returns a reference to the next message, but does not remove it. This is useful when one has to check the type of the next message, e.g. whether it is a regular message, or a view change. The signature of this method is not shown here, it is the same as for receive() .

Note

The peek() method has also been deprecated, and will be removed in 3.0.

3.6.13. Getting the group's state

A newly joined member may wish to retrieve the state of the group before starting work. This is done with getState(). This method returns the state of one member (in most cases, of the oldest member, the coordinator). It returns true or false, depending on whether a valid state could be retrieved. For example, if a member is a singleton, then calling this method would always return false [6] .

The actual state is returned as the return value of one of the subsequent receive() calls, in the form of a SetStateEvent object. If getState() returned true, then a valid state (non-null) will be returned, otherwise a null state will be returned. Alternatively if an application uses MembershipListener (see Section 3.2.3, “MembershipListener” ) instead of pulling messages from a channel, the getState() method will be invoked and a copy of the current state should be returned. By the same token, setting a state would be accomplished by JGroups calling the setState() method of the state fetcher.

The reason for not directly returning the state as a result of getState() is that the state has to be returned in the correct position relative to other messages. Returning it directly would violate the FIFO properties of a channel, and state transfer would not be correct.

The following code fragment shows how a group member participates in state transfers:

          channel=new JChannel();
          channel.connect("TestChannel");
          boolean rc=channel.getState(null, 5000);

          ...

          Object state, copy;
          Object ret=channel.receive(0);
          if(ret instanceof Message)
              ;
          else if(ret instanceof GetStateEvent) {
              // make a copy so that other msgs don't change the state
              copy=copyState(state);
              channel.returnState(Util.objectToByteBuffer(copy));
          }
          else if(ret instanceof SetStateEvent) {
              SetStateEvent e=(SetStateEvent)ret;
              state=e.getArg();
          }
      

A JChannel has to be created whose stack includes the STATE_TRANSFER or pbcast.STATE_TRANSFER protocols (see Chapter 5, Advanced Concepts ). Method getState() subsequently asks the channel to return the current state. If there is a current state (there may not be any other members in the group !), then true is returned. In this case, one of the subsequent receive() method invocations on the channel will return a SetStateEvent object which contains the current state. In this case, the caller sets its state to the one received from the channel.

Method receive() might return a GetStateEvent object, requesting the state of the member to be returned. In this case, a copy of the current state should be made and returned using JChannel.returnState() . It is important to a) synchronize access to the state when returning it since other accesses may modify it while it is being returned and b) make a copy of the state since other accesses after returning the state may still be able to modify it ! This is possible because the state is not immediately returned, but travels down the stack (in the same address space), and a reference to it could still alter it.

3.6.14. Getting the state with a Receiver

As an alternative to handling the GetStateEvent and SetStateEvent events, and calling Channel.returnState(), a Receiver could be used. The example above would look like this:

          class MyReceiver extends ReceiverAdapter {
              final Map m=new HashMap();
              public byte[] getState() {
                  // so nobody else can modify the map while we serialize it
                  synchronized(m) {
                      byte[] state=Util.objectToByteBuffer(m);
                      return state;
                  }
              }

              public void setState(byte[] state) {
                  synchronized(m) {
                      Map new_m=(Map)Util.objectFromByteBuffer(state);
                      m.clear();
                      m.addAll(new_m);
                  }
              }
          }
          // use default props (has to include STATE_TRANSFER)
          channel=new JChannel();
          channel.setReceiver(new MyReceiver());
          channel.connect("TestChannel");
          boolean rc=channel.getState(null, 5000);
      

In a group consisting of A,B and C, with D joining the group and calling Channel.getState(), the following sequence of callbacks happens:

  • D calls Channel.getState(). The state will be retrieved from the oldest member, A
  • A.MyReceiver.getState() is called. A returns a copy of its hashmap
  • D: getState() returns true
  • D.MyReceiver.setState() is called with the serialized state. D unserializes the state and sets it

3.6.15. Partial state transfer

Partial state transfer means that instead of transferring the entire state, we may want to transfer only a substate. For example, with HTTP session replication, a new node in a cluster may want to transfer only the state of a specific session, not all HTTP sessions. This can be done with either the pull or push model. The method to call would be Channel.getState(), including the ID of the substate (a string). In the pull model, GetStateEvent and SetStateEvent have an additional member, state_id, and in the push model, there are 2 additional getState() and setState() callbacks. The example below shows partial state transfer for the push model:

          class MyReceiver extends ExtendedReceiverAdapter {
              final Map m=new HashMap();

              public byte[] getState() {
                  return getState(null);
              }

              public byte[] getState(String substate_id) {
                  // so nobody can modify the map while we serialize it
                  synchronized(m) {
                      byte[] state=null;
                      if(substate_id == null) {
                          state=Util.objectToByteBuffer(m);
                      }
                      else {
                          Object value=m.get(substate_id);
                          if(value != null) {
                              return Util.objectToByteBuffer(value);
                          }
                      }
                      return state;
                  }
              }

              public void setState(byte[] state) {
                  setState(null, state);
              }

              public void setState(String substate_id, byte[] state) {
                  synchronized(m) {
                      if(substate_id != null) {
                          Object value=Util.objectFromByteBuffer(state);
                          m.put(substate_id, value);
                      }
                      else {
                          Map new_m=(Map)Util.objectFromByteBuffer(state);
                          m.clear();
                          m.addAll(new_m);
                      }
                  }
              }
          }

          // use default props (has to include pbcast.STATE_TRANSFER)
          channel=new JChannel();
          channel.setReceiver(new MyReceiver());
          channel.connect("TestChannel");
          boolean rc=channel.getState(null, "MyID", 5000);
      

The example shows that the Channel.getState() method specifies the ID of the substate, in this case "MyID". The getState(String substate_id) method checks whether the substate ID is not null, and returns the substate pertaining to the ID, or the entire state if the substate_id is null. The same goes for setting the substate: if setState(String substate_id, byte[] state) has a non-null substate_id, only that part of the current state will be overwritten, otherwise (if null) the entire state will be overwritten.

3.6.16. Streaming state transfer

Streaming state transfer allows transfer of application (partial) state without having to load entire state into memory prior to sending it to a joining member. Streaming state transfer is especially useful if the state is very large (>1Gb), and use of regular state transfer would likely result in OutOfMemoryException. Streaming state transfer was introduced in JGroups 2.4. JGroups channel has to be configured with either regular or streaming state transfer. The JChannel API that invokes state transfer (i.e. JChannel.getState(long timeout, Address member)) remains the same.

Streaming state transfer, just as regular byte based state transfer, can be used in both pull and push mode. Similarly to the current getState and setState methods of org.jgroups.MessageListener, the application interested in streaming state transfer in a push mode would implement streaming getState method(s) by sending/writing state through a provided OutputStream reference and setState method(s) by receiving/reading state through a provided InputStream reference. In order to use streaming state transfer in a push mode, existing ExtendedMessageListener has been expanded to include additional four methods:

         public interface ExtendedMessageListener {

             /*non-streaming callback methods ommitted for clarity*/
 
             void getState(OutputStream ostream);
             void getState(String state_id, OutputStream ostream);
             void setState(InputStream istream);
             void setState(String state_id, InputStream istream);
          }
      

For a pull mode (when application uses channel.receive() to fetch events) two new event classes will be introduced:

  • StreamingGetStateEvent

  • StreamingSetStateEvent

These two events/classes are very similar to existing GetStateEvent and SetStateEvent but introduce a new field; StreamingGetStateEvent has an OutputStream and StreamingSetStateEvent has an InputStream.

The following code snippet demonstrates how to pull events from a channel, processing StreamingGetStateEvent and sending hypothetical state through a provided OutputStream reference. Handling of StreamingSetStateEvent is analogous to this example:

          ...
          Object obj=channel.receive(0);
          if(obj instanceof StreamingGetStateEvent) {
              StreamingGetStateEvent evt=(StreamingGetStateEvent)obj;
              OutputStream oos = null;
              try {
                  oos=new ObjectOutputStream(evt.getArg());
                  oos.writeObject(state);
                  oos.flush();
              }
              catch (Exception e) {}
              finally {
                  try {
                      oos.close();
                  }
                  catch (IOException e) {
                      System.err.println(e);
                  }
              }
          }                 
          

JGroups has a great flexibility with state transfer methodology by allowing application developers to implement both byte based and streaming based state transfers. Application can, for example, implement streaming and byte based state transfer callbacks and then interchange state transfer protocol in channel configuration to use either streaming or byte based state transfer. However, one cannot configure a channel with both state transfers at the same time and then in runtime choose which particular state transfer type to use.

3.6.17. Disconnecting from a channel

Disconnecting from a channel is done using the following method:

          public void disconnect();
          

It will have no effect if the channel is already in the disconnected or closed state. If connected, it will remove itself from the group membership. This is done (transparently for a channel user) by sending a leave request to the current coordinator. The latter will subsequently remove the channel's address from its local view and send the new view to all remaining members.

After a successful disconnect, the channel will be in the unconnected state, and may subsequently be re-connected to.

3.6.18. Closing a channel

To destroy a channel instance (destroy the associated protocol stack, and release all resources), method close() is used:

         public void close();
        

It moves the channel to the closed state, in which no further operations are allowed (most throw an exception when invoked on a closed channel). In this state, a channel instance is not considered used any longer by an application and -- when the reference to the instance is reset -- the channel essentially only lingers around until it is garbage collected by the Java runtime system.



[1] It could be that the member is suspected falsely, in which case the next view would still contain the suspected member (there is currently no unsuspect() method

[3] Local delivery can be turned on/off using setOpt() .

[4] This is managed internally however, and an application programmer does not need to be concerned about it.

[5] getAddress()

[6] A member will never retrieve the state from itself !