$Revision: 1.10 $
Copyright © 1998-2008 Bela Ban
Copyright © 2006-2008 Red Hat Inc
This document is copyrighted. Copies are allowed for personal use. Redistribution only with written permission of the author(s).
$Date: 2008/01/23 14:36:56 $
Table of Contents
This is the Programmer's and User's Guide for JGroups. It provides information about the following areas:
Installation and configuration.
Using JGroups.
Architecture and implementation of JGroups. Focus on the protocol stack and protocols.
Most of the Installation and User's Guide has been copied from what is freely available on the JGroups web site . However, the focus of this document is to introduce programmers who want to learn more about JGroups to the architecture and implementation of JGroups. I will for example go into the details of the protocol stack, how a message traverses the stack, and how protocols can process it. I will also explain the various design decisions I had to make when designing JGroups, which hopefully leads to a better understanding of why things are the way they are.
Here are a couple of points I want to abide by throughout this book:
I like brevity. I will strive to describe concepts as clearly as possible (for a non-native English speaker) and will refrain from saying more than I have to to make a point.
I like simplicity. Keep It Simple and Stupid. This is one of the biggest goals I have both in writing this book and in writing JGroups. It is easy to explain simple concepts in complex terms, but it is hard to explain a complex system in simple terms. I'll try to do the latter.
I spent 1998-1999 at the Computer Science Department at Cornell University for a post-doc, in Ken Birman's group. Ken is credited with inventing the group communication paradigm, especially the Virtual Synchrony model. At the time they were working on their third generation group communication prototype, called Ensemble. Ensemble followed Horus (written in C by Robbert VanRenesse), which followed ISIS (written by Ken Birman, also in C). Ensemble was written in OCaml, developed at INRIA, which is a functional language and related to ML. I never liked the OCaml language, which in my opinion has a hideous syntax. Therefore I never got warm with Ensemble either.
However, Ensemble had a Java interface (implemented by a student in a semester project) which allowed me to program in Java and use Ensemble underneath. The Java part would require that an Ensemble process was running somewhere on the same machine, or within the same network, and would connect to it via a bidirectional pipe. The student had developed a simple protocol for talking to the Ensemble engine, and extended the engine as well to talk back to Java.
However, I still needed to compile and install the Ensemble runtime for each different platform, which is exactly why Java was developed in the first place: portability.
Therefore I started writing a simple framework (now JChannel ), which would allow me to treat Ensemble as just another group communication transport, which could be replaced at any time by a pure Java solution. And soon I found myself working on a pure Java implementation of the group communication transport (now: ProtocolStack ). I figured that a pure Java implementation would have a much bigger impact that something written in Ensemble. In the end I didn't spend much time writing scientific papers that nobody would read anyway (I guess I'm not a good scientist, at least not a theoretical one), but rather code for JGroups, which could have a much bigger impact. For me, knowing that real-life projects/products are using JGroups is much more satisfactory than having a paper accepted at a conference/journal.
That's why, after my time was up, I left Cornell and academia altogether, and started a job in the industry: with Fujitsu Network Communications in Silicon Valley.
At around that time (May 2000), SourceForge had just opened its site, and I decided to use it for hosting JGroups. I guess this was a major boost for JGroups because now other developers could work on the code. From then on, the page hit and download numbers for JGroups have steadily risen.
In the fall of 2002, Sacha Labourey contacted me, letting me know that JGroups was being used by JBoss for their clustering implementation. I joined JBoss in 2003 and have been working on JGroups and JBossCache. My goal is to make JGroups the most widely used clustering software in the Java space...
Bela Ban, San Jose, Aug 2002, Kreuzlingen Switzerland 2006
I want to thank all contributors to JGroups, present and past, for their work. Without you, this project would never have taken off the ground.
I also want to thank Ken Birman and Robbert VanRenesse for many fruitful discussions of all aspects of group communication in particular and distributed systems in general.
I want to dedicate this book to Jeannette and Michelle.
Group communication uses the terms group and member. Members are part of a group. In the more common terminology, a member is a node and a groups is a cluster. We use these words interchangeably.
A node is a process, residing on some host. A cluster can have one or more nodes belonging to it. There can be multiple nodes on the same host, and all may or may not be part of the same cluster.
JGroups is toolkit for reliable group communication. Processes can join a group, send messages to all members or single members and receive messages from members in the group. The system keeps track of the members in every group, and notifies group members when a new member joins, or an existing member leaves or crashes. A group is identified by its name. Groups do not have to be created explicitly; when a process joins a non-existing group, that group will be created automatically. Member processes of a group can be located on the same host, within the same LAN, or across a WAN. A member can be part of multiple groups.
The architecture of JGroups is shown in Figure 1.1, “The architecture of JGroups”.
It consists of 3 parts: (1) the Channel API used by application programmers to build reliable group communication applications, (2) the building blocks, which are layered on top of the channel and provide a higher abstraction level and (3) the protocol stack, which implements the properties specified for a given channel.
This document describes how to install and use JGroups, ie. the Channel API and the building blocks. The targeted audience is application programmers who want to use JGroups to build reliable distributed programs that need group communication. Programmers who want to implement their own protocols to be used with JGroups should consult the Programmer's Guide for more details about the architecture and implementation of JGroups.
A channel is connected to a protocol stack. Whenever the application sends a message, the channel passes it on to the protocol stack, which passes it to the topmost protocol. The protocol processes the message and the passes it on to the protocol below it. Thus the message is handed from protocol to protocol until the bottom protocol puts it on the network. The same happens in the reverse direction: the bottom (transport) protocol listens for messages on the network. When a message is received it will be handed up the protocol stack until it reaches the channel. The channel stores the message in a queue until the application consumes it.
When an application connects to the channel, the protocol stack will be started, and when it disconnects the stack will be stopped. When the channel is closed, the stack will be destroyed, releasing its resources.
The following three sections give an overview of channels, building blocks and the protocol stack.
To join a group and send messages, a process has to create a channel and connect to it using the group name (all channels with the same name form a group). The channel is the handle to the group. While connected, a member may send and receive messages to/from all other group members. The client leaves a group by disconnecting from the channel. A channel can be reused: clients can connect to it again after having disconnected. However, a channel allows only 1 client to be connected at a time. If multiple groups are to be joined, multiple channels can be created and connected to. A client signals that it no longer wants to use a channel by closing it. After this operation, the channel cannot be used any longer.
Each channel has a unique address. Channels always know who the other members are in the same group: a list of member addresses can be retrieved from any channel. This list is called a view. A process can select an address from this list and send a unicast message to it (also to itself), or it may send a multicast message to all members of the current view. Whenever a process joins or leaves a group, or when a crashed process has been detected, a new view is sent to all remaining group members. When a member process is suspected of having crashed, a suspicion message is received by all non-faulty members. Thus, channels receive regular messages, view messages and suspicion messages. A client may choose to turn reception of views and suspicions on/off on a channel basis.
Channels are similar to BSD sockets: messages are stored in a channel until a client removes the next one (pull-principle). When no message is currently available, a client is blocked until the next available message has been received.
There is currently only one implementation of Channel: JChannel.
The properties of a channel are typically defined in an XML file, but JGroups also allows for configuration through simple strings, URIs, DOM trees or even programming.
The Channel API and its related classes is described in Chapter 3, API.
Channels are simple and primitive. They offer the bare functionality of group communication, and have on purpose been designed after the simple model of BSD sockets, which are widely used and well understood. The reason is that an application can make use of just this small subset of JGroups, without having to include a whole set of sophisticated classes, that it may not even need. Also, a somewhat minimalistic interface is simple to understand: a client needs to know about 12 methods to be able to create and use a channel (and oftentimes will only use 3-4 methods frequently).
Channels provide asynchronous message sending/reception, somewhat similar to UDP. A message sent is essentially put on the network and the send() method will return immediately. Conceptual requests, or responses to previous requests, are received in undefined order, and the application has to take care of matching responses with requests.
Also, an application has to actively retrieve messages from a channel (pull-style); it is not notified when a message has been received. Note that pull-style message reception often needs another thread of execution, or some form of event-loop, in which a channel is periodically polled for messages.
JGroups offers building blocks that provide more sophisticated APIs on top of a Channel. Building blocks either create and use channels internally, or require an existing channel to be specified when creating a building block. Applications communicate directly with the building block, rather than the channel. Building blocks are intended to save the application programmer from having to write tedious and recurring code, e.g. request-response correlation.
Building blocks are described in Chapter 4, Building Blocks.
The protocol stack containins a number of protocol layers in a bidirectional list. All messages sent and received over the channel have to pass through the protocol stack. Every layer may modify, reorder, pass or drop a message, or add a header to a message. A fragmentation layer might break up a message into several smaller messages, adding a header with an id to each fragment, and re-assemble the fragments on the receiver's side.
The composition of the protocol stack, i.e. its layers, is determined by the creator of the channel: an XML file defines the layers to be used (and the parameters for each layer). This string might be interpreted differently by each channel implementation; in JChannel it is used to create the stack, depending on the protocol names given in the property.
Knowledge about the protocol stack is not necessary when only using channels in an application. However, when an application wishes to ignore the default properties for a protocol stack, and configure their own stack, then knowledge about what the individual layers are supposed to do is needed. Although it is syntactically possible to stack any layer on top of each other (they all have the same interface), this wouldn't make sense semantically in most cases.
A header is a custom bit of information that can be added to each message. JGroups uses headers extensively, for example to add sequence numbers to each message (NAKACK and UNICAST), so that those messages can be delivered in the order in which they were sent.
The installation refers to version 2.5 of JGroups. Refer to the installation instructions that are shipped with JGroups for details.
Note that these instructions are also available in the JGroups distribution (INSTALL.HTML).
JGroups comes in a binary and a source version: the binary version is JGroups-2.x.x.bin.zip, the source version is JGroups-2.x.x.src.zip. The binary version contains the JGroups JAR file, plus a number of JARs needed by JGroups. The source version contains all source files, plus several JAR files needed by JGroups, e.g. ANT to build JGroups from source.
JGroups 2.5 requires JDK 5 or higher.
There is no JNI code present so it should run on all platforms.
If you want to generate HTML-based test reports from the unittests, then xalan.jar needs to be in the CLASSPATH (also available in the lib directory)
The binary version contains
jgroups-all.jar: the JGroups library including the demos
CREDITS: list of contributors
INSTALL.html: this file
commons-logging.jar
log4j.jar. This JAR is optional, for example if JDK logging is used, we don't need it.
Place the JAR files somewhere in your CLASSPATH, and you're ready to start using JGroups. If you want to use the JGroups JMS protocol (org.jgroups.protocols.JMS), then you will also need to place jms.jar somewhere in your CLASSPATH.
The source version consists of the following directories and files:
src: the sources
test: unit and stress tests
conf: configuration files needed by JGroups, plus default protocol stack definitions
doc: documentation
lib: various JARs needed to build and run JGroups:
Ant JARs: used to build JGroups. If you already have Ant installed, you won't need these files
jms.jar: JMS library. Needed only if you intend to run the org.jgroups.protocols.JMS protocol
junit.jar: to run the JUnit test cases
xalan.jar: to format the output of the JUnit tests using an XSLT converter to HTML
commons-logging.jar
log4j.jar
etc
Unzip the source distribution, e.g. unzip JGroups-2.x.x.src.zip. This will create the JGroups-2.x.x directory (root directory) under the current directory.
cd to the root directory
Modify build.properties if you want to use a Java compiler other than javac (e.g. jikes), or if you want to change the interface JGroups uses for sending and receiving messages
On UNIX systems use build.sh, on Windows build.bat: $> ./build.sh compile
This will compile all Java files (into the classes directory).
To generate the JARs: $> ./build.sh jar
This will generate the following JAR files in the dist directory:
jgroups-core.jar - the core JGroups libraries
jgroups-all.jar - the complete JGroups libraries including demos and unit tests
The CLASSPATH now has to be set accordingly: the following directories and/or JARs have to be included:
<JGroups rootdir>/classes
<JGroups rootdir>/conf
All needed JAR files in <JGroups rootdir>/lib . To build from sources, the two Ant JARs are required. To run unit tests, the JUnit (and possibly Xalan) JARs are needed.
To generate JavaDocs simple run $> ./build.sh javadoc and the Javadoc documentation will be generated in the dist/javadoc directory
Note that - if you already have Ant installed on your system - you do not need to use build.sh or build.bat, simply invoke ant on the build.xml file. To be able to invoked ant from any directory below the root directory, place ANT_ARGS="-find build.xml -emacs" into the .antrc file in your home directory.
For more details on Ant see http://jakarta.apache.org/ant/.
To see whether your system can find the JGroups classes, execute the following command:
java org.jgroups.Version
or (from JGroups 2.2.8 on)
java -jar jgroups-all.jar
You should see the following output (more or less) if the class is found:
bela@dell /cygdrive/c/JGroups/dist
$ java -jar jgroups-all.jar
Version: 2.6.0 pre-alpha
CVS: $Id: installation.xml,v 1.5 2007/07/24 16:30:46 belaban Exp $
To test whether JGroups works okay on your machine, run the following command twice:
java org.jgroups.demos.Draw
2 whiteboard windows should appear as shown in Figure 2.1, “Screenshot of 2 Draw instances”.
Both windows should show 2 in their title bars. This means that the two instances found each other and formed a group.
When drawing in one window, the second instance should also be updated. As the default group transport uses IP multicast, make sure that - if you want start the 2 instances in different subnets - IP multicast is enabled. If this is not the case, the 2 instances won't find each other and the sample won't work.
You can change the properties of the demo to for example use a different transport if multicast doesn't work (it should always work on the same machine). Please consult the documentation to see how to do this.
Sometimes there isn't a network connection (e.g. DSL modem is down), or we want to multicast only on the local machine. For this the loopback interface (typically lo) can be configured, e.g.
route add -net 224.0.0.0 netmask 240.0.0.0 dev lo
This means that all traffic directed to the 224.0.0.0 network will be sent to the loopback interface, which means it doesn't need any network to be running. Note that the 224.0.0.0 network is a placeholder for all multicast addresses in most UNIX implementations: it will catch all multicast traffic. This is an undocumented feature of /sbin/route and may not work across all UNIX flavors. The above instructions may also work for Windows systems, but this hasn't been tested. Note that not all systems allow multicast traffic to use the loopback interface.
Typical home networks have a gateway/firewall with 2 NICs: the first (eth0) is connected to the outside world (Internet Service Provider), the second (eth1) to the internal network, with the gateway firewalling/masquerading traffic between the internal and external networks. If no route for multicast traffic is added, the default will be to use the fdefault gateway, which will typically direct the multicast traffic towards the ISP. To prevent this (e.g. ISP drops multicast traffic, or latency is too high), we recommend to add a route for multicast traffic which goes to the internal network (e.g. eth1).
Make sure your machine is set up correctly for IP multicast. There are 2 test programs that can be used to detect this: McastReceiverTest and McastSenderTest. Start McastReceiverTest, e.g.
java org.jgroups.tests.McastReceiverTest -mcast_addr 224.10.10.10 -port 5555
Then start McastSenderTest:
java org.jgroups.tests.McastSenderTest -mcast_addr 224.10.10.10 -port 5555
If you want to bind to a specific network interface card (NIC), use -bind_addr 192.168.0.2, where 192.168.0.2 is the IP address of the NIC to which you want to bind. Use this parameter in both sender and receiver.
You should be able to type in the McastSenderTest window and see the output in the McastReceiverTest. If not, try to use -ttl 32 in the sender. If this still fails, consult a system administrator to help you setup IP multicast correctly. If you are the system administrator, look for another job :-)
Other means of getting help: there is a public forum on JIRA for questions. Also consider subscribing to the javagroups-users mailing list to discuss such and other problems.
In this case we have to use a sledgehammer (running only under JDK 1.4. and higher): we can enable the above sender and receiver test to use all available interfaces for sending and receiving. One of them will certainly be the right one... Start the receiver as follows:
java org.jgroups.tests.McastReceiverTest1_4 -mcast_addr 228.8.8.8 -use_all_interfaces
The multicast receiver uses the 1.4 functionality to list all available network interfaces and bind to all of them (including the loopback interface). This means that whichever interface a packet comes in on, we will receive it. Now start the sender:
java org.jgroups.tests.McastSenderTest1_4 -mcast_addr 228.8.8.8 -use_all_interfaces
The sender will also determine the available network interfaces and send each packet over all interfaces.
This test can be used to find out which network interface to bind to when previously no packets were received. E.g. when you see the following output in the receiver:
bash-2.03$ java org.jgroups.tests.McastReceiverTest1_4 -mcast_addr 228.8.8.8 -bind_addr 192.168.168.4
Socket=0.0.0.0/0.0.0.0:5555, bind interface=/192.168.168.4
dd [sender=192.168.168.4:5555]
dd [sender=192.168.168.1:5555]
dd [sender=192.168.168.2:5555]
you know that you can bind to any of the 192.168.168.{1,2,4} interfaces to receive your multicast packets. In this case you would need to modify your protocol spec to include bind_addr=192.168.168.2 in UDP, e.g. "UDP(mcast_addr=228.8.8.8;bind_addr=192.168.168.2):..." .
Another source of problems might be the use of IPv6, and/or misconfiguration of /etc/hosts. If you communicate between an IPv4 and an IPv6 host, and they are not able to find each other, try the java.net.preferIP4Stack=true property, e.g.
java -Djava.net.preferIPv4Stack=true org.jgroups.demos.Draw -props file:/home/bela/udp.xml
JDK 1.4.1 uses IPv6 by default, although is has a dual stack, that is, it also supports IPv4. Here's more details on the subject.
There is a wiki which lists FAQs and their solutions at http://www.jboss.org/wiki/Wiki.jsp?page=JGroups. It is frequently updated and a useful companion to this user's guide.
If you think that you discovered a bug, submit a bug report on JIRA or send email to javagroups-developers if you're unsure about it. Please include the following information:
Version of JGroups (java org.jgroups.Version)
Platform (e.g. Solaris 8)
Version of JDK (e.g. JDK 1.4.2_07)
Stack trace. Use kill -3 PID on UNIX systems or CTRL-BREAK on windows machines
Small program that reproduces the bug
This chapter explains the classes available in JGroups that will be used by applications to build reliable group communication applications. The focus is on creating and using channels.
Information in this document may not be up-to-date, but the nature of the classes in the JGroups toolkit described here is the same. For the most up-to-date information refer to the Javadoc-generated documentation in the doc/javadoc directory.
All of the classes discussed below reside in the org.jgroups package unless otherwise mentioned.
The org.jgroups.util.Util class contains a collection of useful functionality which cannot be assigned to any particular other package.
The first method takes an object as argument and serializes it into a byte buffer (the object has to be serializable or externalizable). The byte array is then returned. This method is often used to serialize objects into the byte buffer of a message. The second method returns a reconstructed object from a buffer. Both methods throw an exception if the object cannot be serialized or unseriali
Returns a strings containing a pretty-printed list of currently running threads.
These interfaces are used with some of the APIs presented below, therefore they are listed first.
Interface Transport looks as follows:
public interface Transport {
public void send(Message msg) throws Exception;
public Object receive(long timeout) throws Exception;
}
It defines a very small subset of the functionality of a channel, essentially only the methods for sending and receiving messages. There are a number of classes that implement Transport , among others Channel . Many building blocks (see Chapter 4, Building Blocks ) require nothing else than a bare-bone facility to send and receive messages; therefore the Transport interface was created. It increases the genericness and portability of building blocks: being so simple, the Transport interface can easily be ported to a different toolkit, without requiring any modifications to building blocks.
Contrary to the pull-style of channels, some building blocks (e.g. PullPushAdapter ) provide an event-like push-style message delivery model. In this case, the entity to be notified of message reception needs to provide a callback to be invoked whenever a message has been received. The MessageListener interface below provides a method to do so:
public interface MessageListener {
public void receive(Message msg);
byte[] getState();
void setState(byte[] state);
}
Method receive() will be called when a message is received. The getState() and setState() methods are used to fetch and set the group state (e.g. when joining). Refer to Section 3.7.11, “Getting the group's state” for a discussion of state transfer.
JGroups release 2.3 introduces ExtendedMessageListener enabling partial state transfer (refer to Section 3.7.13, “Partial state transfer” ) while release 2.4 further expands ExtendedMessageListener with streaming state transfer callbacks:
public interface ExtendedMessageListener extends MessageListener {
byte[] getState(String state_id);
void setState(String state_id, byte[] state);
/*** since JGroups 2.4 *****/
void getState(OutputStream ostream);
void getState(String state_id, OutputStream ostream);
void setState(InputStream istream);
void setState(String state_id, InputStream istream);
}
Method receive() will be called when a message is received. The getState() and setState() methods are used to fetch and set the group state (e.g. when joining). Refer to Section 3.7.11, “Getting the group's state” for a discussion of state transfer.
The MembershipListener interface is similar to the MessageListener interface above: every time a new view, a suspicion message, or a block event is received, the corresponding method of the class implementing MembershipListener will be called.
public interface MembershipListener {
public void viewAccepted(View new_view);
public void suspect(Object suspected_mbr);
public void block();
}
Oftentimes the only method containing any functionality will be viewAccepted() which notifies the receiver that a new member has joined the group or that an existing member has left or crashed. The suspect() callback is invoked by JGroups whenever a member if suspected of having crashed, but not yet excluded [1].
The block() method is called to notify the member that it will soon be blocked sending messages. This is done by the FLUSH protocol, for example to ensure that nobody is sending messages while a state transfer is in progress. When block() returns, any thread sending messages will be blocked, until FLUSH unblocks the thread again, e.g. after the state has been transferred successfully.
Therefore, block() can be used to send pending messages or complete some other work. However, sending of messages should be done on a different thread, e.g. the current thread blocks on a mutex, starts a different thread which notifies the mutex once the work has been done.
Note that block() should take a small amount of time to complete, otherwise the entire FLUSH protocol is blocked.
The ExtendedMembershipListener interface extends MembershipListener:
public interface ExtendedMembershipListener extends MembershipListener {
public void unblock();
}
The unblock() method is called to notify the member that the flush protocol has completed and the member can resume sending messages. If the member did not stop sending messages on block(), FLUSH simply blocked them and will resume, so no action is required from a member. Implementation of the unblock() callback is optional.
public interface ChannelListener {
void channelConnected(Channel channel);
void channelDisconnected(Channel channel);
void channelClosed(Channel channel);
void channelShunned();
void channelReconnected(Address addr);
}
A class implementing ChannelListener can use the Channel.setChannelListener() method to register with a channel to obtain information about state changes in a channel. Whenever a channel is closed, disconnected or opened a callback will be invoked.
public interface Receiver extends MessageListener, MembershipListener {
}
A Receiver can be used to receive all relevant messages and view changes in push-style; rather than having to pull these events from a channel, they will be dispatched to the receiver as soon as they have been received. This saves one thread (application thread, pulling messages from a channel, or the PullPushAdapter thread
public interface ExtendedReceiver extends ExtendedMessageListener, MembershipListener {
}
This is a receiver who will be able to handle partial state transfer
The Extended- interfaces (ExtendedMessageListener, ExtendedReceiver) will be merged with their parents in the 3.0 release of JGroups. The reason is that this will create an API backwards incompatibility, which we didn't want to introduce in the 2.x series.
Each member of a group has an address, which uniquely identifies the member. The interface for such an address is Address, which requires concrete implementations to provide methods for comparison and sorting of addresses, and for determination whether the address is a multicast address. JGroups addresses have to implement the following interface:
public interface Address extends Externalizable, Comparable, Cloneable {
boolean isMulticastAddress();
int compareTo(Object o) throws ClassCastException;
boolean equals(Object obj);
int hashCode();
String toString();
}
Actual implementations of addresses are often generated by the bottommost protocol layer (e.g. UDP or TCP). This allows for all possible sorts of addresses to be used with JGroups, e.g. ATM.
In JChannel, it is the IP address of the host on which the stack is running and the port on which the stack is receiving incoming messages; it is represented by the concrete class org.jgroups.stack.IpAddress . Instances of this class are only used within the JChannel protocol stack; users of a channel see addresses (of any kind) only as Addresses . Since an address uniquely identifies a channel, and therefore a group member, it can be used to send messages to that group member, e.g. in Messages (see next section).
Data is sent between members in the form of messages ( Message ). A message can be sent by a member to a single member , or to all members of the group of which the channel is an endpoint. The structure of a message is shown in Figure 3.1, “Structure of a message” .
A message contains 5 fields:
The address of the receiver. If null , the message will be sent to all current group members
The address of the sender. Can be left null , and will be filled in by the transport protocol (e.g. UDP) before the message is put on the network
This is one byte used for flags. The currently recognized flags are OOB, LOW_PRIO and HIGH_PRIO. See the discussion on the concurrent stack for OOB.
The actual data (as a byte buffer). The Message class contains convenience methods to set a serializable object and to retrieve it again, using serialization to convert the object to/from a byte buffer.
A list of headers that can be attached to a message. Anything that should not be in the payload can be attached to a message as a header. Methods putHeader() , getHeader() and removeHeader() of Message can be used to manipulate headers.
A message is similar to an IP packet and consists of the payload (a byte buffer) and the addresses of the sender and receiver (as Addresses). Any message put on the network can be routed to its destination (receiver address), and replies can be returned to the sender's address.
A message usually does not need to fill in the sender's address when sending a message; this is done automatically by the protocol stack before a message is put on the network. However, there may be cases, when the sender of a message wants to give an address different from its own, so that for example, a response should be returned to some other member.
The destination address (receiver) can be an Address, denoting the address of a member, determined e.g. from a message received previously, or it can be null , which means that the message will be sent to all members of the group. A typical multicast message, sending string "Hello" to all members would look like this:
Message msg=new Message(null, null, "Hello".getBytes());
channel.send(msg);
A View ( View ) is a list of the current members of a group. It consists of a ViewId , which uniquely identifies the view (see below), and a list of members. Views are set in a channel automatically by the underlying protocol stack whenever a new member joins or an existing one leaves (or crashes). All members of a group see the same sequence of views.
Note that there is a comparison function which orders all the members of a group in the same way. Usually, the first member of the list is the coordinator (the one who emits new views). Thus, whenever the membership changes, every member can determine the coordinator easily and without having to contact other members.
The code below shows how to send a (unicast) message to the first member of a view (error checking code omitted):
View myview=channel.getView();
Address first=myview.getMembers().first();
Message msg=new Message(first, null, "Hello world");
channel.send(msg);
Whenever an application is notified that a new view has been installed (e.g. by MembershipListener.viewAccepted() or Channel.receive() ), the view is already set in the channel. For example, calling Channel.getView() in a viewAccepted() callback would return the same view (or possibly the next one in case there has already been a new view !).
The ViewId is used to uniquely number views. It consists of the address of the view creator and a sequence number. ViewIds can be compared for equality and put in a hashtable as they implement equals() and hashCode() methods.
Whenever a group splits into subgroups, e.g. due to a network partition, and later the subgroups merge back together, a MergeView instead of a View will be received by the application. The MergeView class is a subclass of View and contains as additional instance variable the list of views that were merged. As an example if the group denoted by view V1:(p,q,r,s,t) split into subgroups V2:(p,q,r) and V2:(s,t) , the merged view might be V3:(p,q,r,s,t) . In this case the MergeView would contains a list of 2 views: V2:(p,q,r) and V2:(s,t) .
This class can be used for keeping rack of members instead of a Vector class. It adds several functions, such as duplicate elimination, merging with other Membership instances and sorting.
In order to join a group and send messages, a process has to create a channel. A channel is like a socket. When a client connects to a channel, it gives the the name of the group it would like to join. Thus, a channel is (in its connected state) always associated with a particular group. The protocol stack takes care that channels with the same group name find each other: whenever a client connects to a channel given group name G, then it tries to find existing channels with the same name, and joins them, resulting in a new view being installed (which contains the new member). If no members exist, a new group will be created.
A state transition diagram for the major states a channel can assume are shown in Figure 3.2, “Channel states” .
When a channel is first created, it is in the unconnected state. An attempt to perform certain operations which are only valid in the connected state (e.g. send/receive messages) will result in an exception. After a successful connection by a client, it moves to the connected state. Now channels will receive messages, views and suspicions from other members and may send messages to other members or to the group. Getting the local address of a channel is guaranteed to be a valid operation in this state (see below). When the channel is disconnected, it moves back to the unconnected state. Both a connected and unconnected channel may be closed, which makes the channel unusable for further operations. Any attempt to do so will result in an exception. When a channel is closed directly from a connected state, it will first be disconnected, and then closed.
The methods available for creating and manipulating channels are discussed now.
A channel can be created in two ways: an instance of a subclass of Channel is created directly using its public constructor (e.g. new JChannel() ), or a channel factory is created, which -- upon request -- creates instances of channels. We will only look at the first method of creating channel: by direct instantiation. Note that instantiation may differ between the various channel implementations. As example we will look at JChannel .
The public constructor of JChannel looks as follows:
public JChannel(Object properties) throws ChannelException {}
It creates an instance of JChannel . The properties argument defines the composition of the protocol stack (number and type of layers, parameters for each layer, and their order). For JChannel, this has to be a String. An example of a channel creation is:
String props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" +
"PING(timeout=3000;num_initial_members=6):" +
"FD(timeout=5000):" +
"VERIFY_SUSPECT(timeout=1500):" +
"pbcast.STABLE(desired_avg_gossip=10000):" +
"pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):" +
"UNICAST(timeout=5000;min_wait_time=2000):" +
"FRAG:" +
"pbcast.GMS(initial_mbrs_timeout=4000;join_timeout=5000;" +
"shun=false;print_local_addr=false)";
JChannel channel;
try {
channel=new JChannel(props);
}
catch(Exception ex) {
// channel creation failed
}
The argument is a colon-delimited string of protocols, specified from bottom to top (left to right). The example properties argument will be used to create a protocol stack that uses IP Multicast (UDP) as bottom protocol, the PING protocol to locate the initial members, FD for failure detection, VERIFY_SUSPECT for double-checking of suspected members, STABLE for garbage collection of messages received by all members, NAKACK for lossless delivery of multicast messages, UNICAST for lossless delivery of unicast messages and GMS for group membership (handling of join or leave requests).
If the properties argument is null, the default properties will be used. An exception will be thrown if the channel cannot be created. Possible causes include protocols that were specified in the property argument, but were not found, or wrong parameters to protocols.
In version 2.0 of JGroups an XML-based scheme to define protocol stacks was introduced. Instead of specifying a string containing the protocol spec, an URL pointing to a valid protocol stack definition can be given. For example, the Draw demo can be launched as follows:
java org.javagroups.demos.Draw -props file:/home/bela/vsync.xml
or
java org.javagroups.demos.Draw -props http://www.jgroups.org/udp.xml
In the latter case, an application downloads its protocol stack specification from a server, which allows for central administration of application properties.
A sample XML configuration looks like this (edited from udp.xml):
<config>
<UDP
mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"
mcast_port="${jgroups.udp.mcast_port:45588}"
discard_incompatible_packets="true"
max_bundle_size="60000"
max_bundle_timeout="30"
ip_ttl="${jgroups.udp.ip_ttl:2}"
enable_bundling="true"
use_concurrent_stack="true"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="25"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="Run"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>
<PING timeout="2000"
num_initial_members="3"/>
<MERGE2 max_interval="30000"
min_interval="10000"/>
<FD_SOCK/>
<FD timeout="10000" max_tries="5" shun="true"/>
<VERIFY_SUSPECT timeout="1500" />
<BARRIER />
<pbcast.NAKACK
use_mcast_xmit="false" gc_lag="0"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="true"/>
<UNICAST timeout="300,600,1200,2400,3600"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<VIEW_SYNC avg_send_interval="60000" />
<pbcast.GMS print_local_addr="true" join_timeout="3000"
shun="false"
view_bundling="true"/>
<FC max_credits="20000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000" />
<pbcast.STATE_TRANSFER />
</config>
A stack is wrapped by <config> and </config> elements and lists all protocols from bottom (UDP) to top (STATE_TRANSFER). Each element defines one protocol.
Each protocol is implemented as a Java class. When a protocol stack is created based on the above XML configuration, the first element ("UDP") becomes the bottom-most layer, the second one will be placed on the first, etc: the stack is created from the bottom to the top.
Each element has to be the name of a Java class that resides in the org.jgroups.stack.protocols package. Note that only the base name has to be given, not the fully specified class name ( UDP instead of org.jgroups.stack.protocols.UDP ). If the protocol class is not found, JGroups assumes that the name given is a fully qualified classname and will therefore try to instantiate that class. If this does not work an exception is thrown. This allows for protocol classes to reside in different packages altogether, e.g. a valid protocol name could be com.sun.eng.protocols.reliable.UCAST .
Each layer may have zero or more arguments, which are specified as a list of name/value pairs in parentheses directly after the protocol name. In the example above, UDP is configured with some options, one of them being the IP multicast address (mcast_addr) which is set to 228.10.10.10, or to the value of the system property jgroups.udp.mcast_addr, if set.
Note that all members in a group have to have the same protocol stack.
A number of options can be set in a channel. To do so, the following method is used:
public void setOpt(int option, Object value);
Arguments are the options number and a value. The following options are currently recognized:
The argument is a boolean object. If true, block messages will be received. If this option is set to true, views will also be set to true. Default is false.
Local delivery. The argument is a boolean value. If set to true, a member will receive all messages it sent to itself. Otherwise, all messages sent by itself will be discarded. This option allows to send messages to the group, without receiving a copy. Default is true (members will receive their own copy of messages multicast to the group).
When set to true, a shunned channel will leave the group and then try to automatically re-join. Default is false
When set to true a shunned channel, after reconnection, will attempt to fetch the state from the coordinator. This requires AUTO_RECONNECT to be true as well. Default is false.
The equivalent method to get options is getOpt() :
public Object getOpt(int option);
Given an option, the current value of the option is returned.
When a client wants to join a group, it connects to a channel giving the name of the group to be joined:
public void connect(String clustername) throws ChannelClosed;
The cluster name is a string, naming the cluster to be joined. All channels that are connected to the same name form a cluster. Messages multicast on any channel in the cluster will be received by all members (including the one who sent it [2] ).
The method returns as soon as the group has been joined successfully. If the channel is in the closed state (see Figure 3.2, “Channel states” ), an exception will be thrown. If there are no other members, i.e. no other client has connected to a group with this name, then a new group is created and the member joined. The first member of a group becomes its coordinator . A coordinator is in charge of multicasting new views whenever the membership changes [3] .
Clients can also join a cluster group and fetch cluster state in one operation. The best way to conceptualize connect and fetch state connect method is to think of it as an invocation of regular connect and getstate methods executed in succession. However, there are several advantages of using connect and fetch state connect method over regular connect. First of all, underlying message exchange is heavily optimized, especially if the flush protocol is used in the stack. But more importantly, from clients perspective, connect and join operations become one atomic operation.
public void connect(string cluster_name, address target, string state_id, long timeout) throws channelexception;
Just as in regular connect method cluster name represents a cluster to be joined. Address parameter indicates a cluster member to fetch state from. Null address parameter indicates that state should be fetched from the cluster coordinator. If state should be fetched from a particular member other than coordinator clients can provide an address of that member. State id used for partial state transfer while timeout bounds entire join and fetch operation.
Method getLocalAddress() returns the local address of the channel. In the case of JChannel , the local address is generated by the bottom-most layer of the protocol stack when the stack is connected to. That means that -- depending on the channel implementation -- the local address may or may not be available when a channel is in the unconnected state.
public Address getLocalAddress();
Method getClusterlName() returns the name of the cluster in which the channel is a member:
public String getClusterName();
Again, the result is undefined if the channel is in the unconnected or closed state.
The following method can be used to get the current view of a channel:
public View getView();
This method does not retrieve a new view (message) from the channel, but only returns the current view of the channel. The current view is updated every time a view message is received: when method receive() is called, and the return value is a view, before the view is returned, it will be installed in the channel, i.e. it will become the current view.
Calling this method on an unconnected or closed channel is implementation defined. A channel may return null, or it may return the last view it knew of.
Once the channel is connected, messages can be sent using the send() methods:
public void send(Message msg) throws ChannelNotConnected, ChannelClosed;
public void send(Address dst, Address src, Object obj) throws ChannelNotConnected, ChannelClosed;
The first send() method has only one argument, which is the message to be sent. The message's destination should either be the address of the receiver (unicast) or null (multicast). When it is null, the message will be sent to all members of the group (including itself). The source address may be null; if it is, it will be set to the channel's address (so that recipients may generate a response and send it back to the sender).
The second send() method is a helper method and uses the former method internally. It requires the address of receiver and sender and an object (which has to be serializable), constructs a Message and sends it.
If the channel is not connected, or was closed, an exception will be thrown upon attempting to send a message.
Here's an example of sending a (multicast) message to all members of a group:
Hashtable data; // any serializable data
try {
channel.send(null, null, data);
}
catch(Exception ex) {
// handle errors
}
The null value as destination address means that the message will be sent to all members in the group. The sender's address will be filled in by the bottom-most protocol. The payload is a hashtable, which will be serialized into the message's buffer and unserialized at the receiver's end. Alternatively, any other means of generating a byte buffer and setting the message's buffer to it (e.g. using Message.setBuffer()) would also work.
Here's an example of sending a (unicast) message to the first member (coordinator) of a group:
Address receiver;
Message msg;
Hashtable data;
try {
receiver=channel.getView().getMembers().first();
channel.send(receiver, null, data);
}
catch(Exception ex) {
// handle errors
}
It creates a Message with a specific address for the receiver (the first member of the group). Again, the sender's address can be left null as it will be filled in by the bottom-most protocol.
Method receive() is used to receive messages, views, suspicions and blocks:
public Object receive(long timeout) throws ChannelNotConnected, ChannelClosed, Timeout;
A channel receives messages asynchronously from the network and stores them in a queue. When receive() is called, the next available message from the top of that queue is removed and returned. When there are no messages on the queue, the method will block. If timeout is greater than 0, it will wait the specified number of milliseconds for a message to be received, and throw a TimeoutException exception if none was received during that time. If the timeout is 0 or negative, the method will wait indefinitely for the next available message.
Depending on the channel options (see Section 3.7.2, “Setting options” ), the following types of objects may be received:
A regular message. To send a response to the sender, a new message can be created. Its destination address would be the received message's source address. Method Message.makeReply() is a helper method to create a response.
A view change, signalling that a member has joined, left or crashed. The application may or may not perform some action upon receiving a view change (e.g. updating a GUI object of the membership, or redistributing a load-balanced collaborative task to all members). Note that a longer action, or any action that blocks should be performed in a separate thread. A MergeView will be received when 2 or more subgroups merged into one (see Section 3.5.2, “MergeView” for details). Here, a possible state merge by the application needs to be done in a separate thread.
Notification of a member that is suspected. Method SuspectEvent.getMember() retrieves the address of the suspected member. Usually this message will be followed by a view change.
The application has to stop sending messages. When the application has stopped sending messages, it needs to acknowledge this message with a Channel.blockOk() method.
The BlockEvent reception can be used to complete pending tasks, e.g. send pending messages, but once Channel.blockOk() has been called, all threads that send messages (calling Channel.send() or Channel.down()) will be blocked until FLUSH unblocks them.
The application can resume sending messages. Any previously messages blocked by FLUSH will be unblocked; when the UnblockEvent is received the channel has already been unblocked.
Received when the application's current state should be saved (for a later state transfer. A copy of the current state should be made (possibly wrapped in a synchronized statement and returned calling method Channel.returnState() . If state transfer events are not enabled on the channel (default), then this event will never be received. This message will only be received with the Virtual Synchrony suite of protocols (see the Programmer's Guide).
Received when the application's current state should be provided to a state requesting group member. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.
Received as response to a getState(s) method call. The argument contains the state of a single member ( byte[] ) or of all members ( Vector ). Since the state of a single member could also be a vector, the interpretation of the argument is left to the application.
Received at state requesting member when the state InputStream becomes ready for reading. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.
The caller has to check the type of the object returned. This can be done using the instanceof operator, as follows:
Object obj;
Message msg;
View v;
obj=channel.receive(0); // wait forever
if(obj instanceof Message)
msg=(Message)obj;
else if(obj instanceof View)
v=(View)obj;
else
; // don't handle suspicions or blocks
If for example views, suspicions and blocks are disabled, then the caller is guaranteed to only receive return values of type Message . In this case, the return value can be cast to a Message directly, without using the instanceof operator.
If the channel is not connected, or was closed, a corresponding exception will be thrown.
The example below shows how to retrieve the "Hello world" string from a message:
Message msg; // received above
String s;
try {
s=(String)msg.getObject(); // error if object not Serializable
// alternative: s=new String(msg.getBuffer());
}
catch(Exception ex) {
// handle errors, e.g. casting error above)
}
The Message.getObject() method retrieves the message's byte buffer, converts it into a (serializable) object and returns the object.
Instead of pulling messages from a channel in an application thread, a Receiver can be registered with a channel; all received messages, view changes and state transfer requests will invoke callbacks on the registered Receiver:
JChannel ch=new JChannel();
ch.setReceiver(new ExtendedReceiverAdapter() {
public void receive(Message msg) {
System.out.println("received message " + msg);
}
public void viewAccepted(View new_view) {
System.out.println("received view " + new_view);
}
});
ch.connect("bla");
The ExtendedReceiverAdapter class implements all callbacks of ExtendedReceiver with no-ops, in the example above we override receive() and viewAccepted().
The advantage of using a Receiver is that the application doesn't have to waste 1 thread for pulling messages out of a channel. In addition, the channel doesn't have to maintain an (unbounded) queue of messages/views, which can quickly get large if the receiver cannot process messages fast enough, and the sender keeps sending messages.
Instead of removing the next available message from the channel, peek() just returns a reference to the next message, but does not remove it. This is useful when one has to check the type of the next message, e.g. whether it is a regular message, or a view change. The signature of this method is not shown here, it is the same as for receive() .
A newly joined member may wish to retrieve the state of the group before starting work. This is done with getState(). This method returns the state of one member (in most cases, of the oldest member, the coordinator). It returns true or false, depending on whether a valid state could be retrieved. For example, if a member is a singleton, then calling this method would always return false [4] .
The actual state is returned as the return value of one of the subsequent receive() calls, in the form of a SetStateEvent object. If getState() returned true, then a valid state (non-null) will be returned, otherwise a null state will be returned. Alternatively if an application uses MembershipListener (see Section 3.2.4, “MembershipListener” ) instead of pulling messages from a channel, the getState() method will be invoked and a copy of the current state should be returned. By the same token, setting a state would be accomplished by JGroups calling the setState() method of the state fetcher.
The reason for not directly returning the state as a result of getState() is that the state has to be returned in the correct position relative to other messages. Returning it directly would violate the FIFO properties of a channel, and state transfer would not be correct.
The following code fragment shows how a group member participates in state transfers:
channel=new JChannel();
channel.connect("TestChannel");
boolean rc=channel.getState(null, 5000);
...
Object state, copy;
Object ret=channel.receive(0);
if(ret instanceof Message)
;
else if(ret instanceof GetStateEvent) {
copy=copyState(state); // make a copy so that other msgs don't change the state
channel.returnState(Util.objectToByteBuffer(copy));
}
else if(ret instanceof SetStateEvent) {
SetStateEvent e=(SetStateEvent)ret;
state=e.getArg();
}
A JChannel has to be created whose stack includes the STATE_TRANSFER or pbcast.STATE_TRANSFER protocols (see Chapter 5, Advanced Concepts ). Method getState() subsequently asks the channel to return the current state. If there is a current state (there may not be any other members in the group !), then true is returned. In this case, one of the subsequent receive() method invocations on the channel will return a SetStateEvent object which contains the current state. In this case, the caller sets its state to the one received from the channel.
Method receive() might return a GetStateEvent object, requesting the state of the member to be returned. In this case, a copy of the current state should be made and returned using JChannel.returnState() . It is important to a) synchronize access to the state when returning it since other accesses may modify it while it is being returned and b) make a copy of the state since other accesses after returning the state may still be able to modify it ! This is possible because the state is not immediately returned, but travels down the stack (in the same address space), and a reference to it could still alter it.
As an alternative to handling the GetStateEvent and SetStateEvent events, and calling Channel.returnState(), a Receiver could be used. The example above would look like this:
class MyReceiver extends ReceiverAdapter {
final Map m=new HashMap();
public byte[] getState() {
synchronized(m) { // so nobody else can modify the map while we serialize it
byte[] state=Util.objectToByteBuffer(m);
return state;
}
}
public void setState(byte[] state) {
synchronized(m) {
Map new_m=(Map)Util.objectFromByteBuffer(state);
m.clear();
m.addAll(new_m);
}
}
}
channel=new JChannel(); // use default properties (has to include pbcast.STATE_TRANSFER protocol)
channel.setReceiver(new MyReceiver());
channel.connect("TestChannel");
boolean rc=channel.getState(null, 5000);
In a group consisting of A,B and C, with D joining the group and calling Channel.getState(), the following sequence of callbacks happens:
Partial state transfer means that instead of transferring the entire state, we may want to transfer only a substate. For example, with HTTP session replication, a new node in a cluster may want to transfer only the state of a specific session, not all HTTP sessions. This can be done with either the pull or push model. The method to call would be Channel.getState(), including the ID of the substate (a string). In the pull model, GetStateEvent and SetStateEvent have an additional member, state_id, and in the push model, there are 2 additional getState() and setState() callbacks. The example below shows partial state transfer for the push model:
class MyReceiver extends ExtendedReceiverAdapter {
final Map m=new HashMap();
public byte[] getState() {
return getState(null);
}
public byte[] getState(String substate_id) {
synchronized(m) { // so nobody else can modify the map while we serialize it
byte[] state=null;
if(substate_id == null) {
state=Util.objectToByteBuffer(m);
}
else {
Object value=m.get(substate_id);
if(value != null) {
return Util.objectToByteBuffer(value);
}
}
return state;
}
}
public void setState(byte[] state) {
setState(null, state);
}
public void setState(String substate_id, byte[] state) {
synchronized(m) {
if(substate_id != null) {
Object value=Util.objectFromByteBuffer(state);
m.put(substate_id, value);
}
else {
Map new_m=(Map)Util.objectFromByteBuffer(state);
m.clear();
m.addAll(new_m);
}
}
}
}
channel=new JChannel(); // use default properties (has to include pbcast.STATE_TRANSFER protocol)
channel.setReceiver(new MyReceiver());
channel.connect("TestChannel");
boolean rc=channel.getState(null, "MyID", 5000);
The example shows that the Channel.getState() method specifies the ID of the substate, in this case "MyID". The getState(String substate_id) method checks whether the substate ID is not null, and returns the substate pertaining to the ID, or the entire state if the substate_id is null. The same goes for setting the substate: if setState(String substate_id, byte[] state) has a non-null substate_id, only that part of the current state will be overwritten, otherwise (if null) the entire state will be overwritten.
Streaming state transfer allows transfer of application (partial) state without having to load entire state into memory prior to sending it to a joining member. Streaming state transfer is especially useful if the state is very large (>1Gb), and use of regular state transfer would likely result in OutOfMemoryException. Streaming state transfer was introduced in JGroups 2.4. JGroups channel has to be configured with either regular or streaming state transfer. The JChannel API that invokes state transfer (i.e. JChannel.getState(long timeout, Address member)) remains the same.
Streaming state transfer, just as regular byte based state transfer, can be used in both pull and push mode. Similarly to the current getState and setState methods of org.jgroups.MessageListener, the application interested in streaming state transfer in a push mode would implement streaming getState method(s) by sending/writing state through a provided OutputStream reference and setState method(s) by receiving/reading state through a provided InputStream reference. In order to use streaming state transfer in a push mode, existing ExtendedMessageListener has been expanded to include additional four methods:
public interface ExtendedMessageListener {
/*non-streaming callback methods ommitted for clarity*/
void getState(OutputStream ostream);
void getState(String state_id, OutputStream ostream);
void setState(InputStream istream);
void setState(String state_id, InputStream istream);
} For a pull mode (when application uses channel.receive() to fetch events) two new event classes will be introduced:
StreamingGetStateEvent
StreamingSetStateEvent
These two events/classes are very similar to existing GetStateEvent and SetStateEvent but introduce a new field; StreamingGetStateEvent has an OutputStream and StreamingSetStateEvent has an InputStream.
The following code snippet demonstrates how to pull events from a channel, processing StreamingGetStateEvent and sending hypothetical state through a provided OutputStream reference. Handling of StreamingSetStateEvent is analogous to this example:
...
Object obj=channel.receive(0);
if(obj instanceof StreamingGetStateEvent) {
StreamingGetStateEvent evt=(StreamingGetStateEvent)obj;
OutputStream oos = null;
try {
oos = new ObjectOutputStream(evt.getArg());
oos.writeObject(state);
oos.flush();
}
catch (Exception e) {}
finally {
try {
oos.close();
}
catch (IOException e) {
System.err.println(e);
}
}
}
...JGroups has a great flexibility with state transfer methodology by allowing application developers to implement both byte based and streaming based state transfers. Application can, for example, implement streaming and byte based state transfer callbacks and then interchange state transfer protocol in channel configuration to use either streaming or byte based state transfer. However, one cannot configure a channel with both state transfers at the same time and then in runtime choose which particular state transfer type to use.
Disconnecting from a channel is done using the following method:
public void disconnect();
It will have no effect if the channel is already in the disconnected or closed state. If connected, it will remove itself from the group membership. This is done (transparently for a channel user) by sending a leave request to the current coordinator. The latter will subsequently remove the channel's address from its local view and send the new view to all remaining members.
After a successful disconnect, the channel will be in the unconnected state, and may subsequently be re-connected to.
To destroy a channel instance (destroy the associated protocol stack, and release all resources), method close() is used:
public void close();
It moves the channel to the closed state, in which no further operations are allowed (most throw an exception when invoked on a closed channel). In this state, a channel instance is not considered used any longer by an application and -- when the reference to the instance is reset -- the channel essentially only lingers around until it is garbage collected by the Java runtime system.
[1] It could be that the member is suspected falsely, in which case the next view would still contain the suspected member (there is currently no unsuspect() method
[2] Local delivery can be turned on/off using setOpt() .
[3] This is managed internally however, and an application programmer does not need to be concerned about it.
[4] A member will never retrieve the state from itself !
Building blocks are layered on top of channels. Most of them do not even need a channel, all they need is a class that implements interface Transport (channels do). This enables them to work on any type of group transport that obeys this interface. Building blocks can be used instead of channels whenever a higher-level interface is required. Whereas channels are simple socket-like constructs, building blocks may offer a far more sophisticated interface. In some cases, building blocks offer access to the underlying channel, so that -- if the building block at hand does not offer a certain functionality -- the channel can be accessed directly. Building blocks are located in the org.jgroups.blocks package. Only the ones that are relevant for application programmers are discussed below.
Note that this building block has been deprecated and should not be used anymore !
This class is a converter (or adapter, as used in [Gamma:1995] between the pull-style of actively receiving messages from the channel and the push-style where clients register a callback which is invoked whenever a message has been received. Clients of a channel do not have to allocate a separate thread for message reception.
A PullPushAdapter is always created on top of a class that implements interface Transport (e.g. a channel). Clients interested in being called when a message is received can register with the PullPushAdapter using method setListener(). They have to implement interface MessageListener, whose receive() method will be called when a message arrives. When a client is interested in getting view, suspicion messages and blocks, then it must additionally register as a MembershipListener using method setMembershipListener(). Whenever a view, suspicion or block is received, the corresponding method will be called.
Upon creation, an instance of PullPushAdapter creates a thread which constantly calls the receive() method of the underlying Transport instance, blocking until a message is available. When a message is received, if there is a registered message listener, its receive() method will be called.
As this class does not implement interface Transport, but merely uses it for receiving messages, an underlying object has to be used to send messages (e.g. the channel on top of which an object of this class resides). This is shown in Figure 4.1, “Class PullPushAdapter”.
As is shown, the thread constantly pulls messages from the channel and forwards them to the registered listeners. An application thus does not have to actively pull for messages, but the PullPushAdapter does this for it. Note however, that the application has to directly access the channel if it wants to send a message.
This section shows sample code for using a PullPushAdapter. The example has been shortened for readability (error handling has been removed).
public class PullPushTest implements MessageListener {
Channel channel;
PullPushAdapter adapter;
byte[] data="Hello world".getBytes();
String props; // fetch properties
public void receive(Message msg) {
System.out.println("Received msg: " + msg);
}
public void start() throws Exception {
channel=new JChannel(props);
channel.connect("PullPushTest");
adapter=new PullPushAdapter(channel);
adapter.setListener(this);
for(int i=0; i < 10; i++) {
System.out.println("Sending msg #" + i);
channel.send(new Message(null, null, data));
Thread.currentThread().sleep(1000);
}
adapter.stop();
channel.close();
}
public static void main(String args[]) {
try {
new PullPushTest().start();
}
catch(Exception e) { /* error */ }
}
}
First a channel is created and connected to. Then an instance of PullPushAdapter is created with the channel as argument. The constructor of PullPushAdapter starts its own thread which continually reads on the channel. Then the MessageListener is set, which causes all messages received on the channel to be sent to receive(). Then a number of messages are sent via the channel to the entire group. As group messages are also received by the sender, the receive() method will be called every time a message is received. Finally the PullPushAdapter is stopped and the channel closed. Note that explicitly stopping the PullPushAdapter is not actually necessary, a closing the channel would cause the PullPushAdapter to terminate anyway.
Note that, compared to the pull-style example, push-style message reception is considerably easier (no separate thread management) and requires less code to program.
Channels are simple patterns to asynchronously send a receive messages. However, a significant number of communication patterns in group communication require synchronous communication. For example, a sender would like to send a message to the group and wait for all responses. Or another application would like to send a message to the group and wait only until the majority of the receivers have sent a response, or until a timeout occurred.
MessageDispatcher offers a combination of the above pattern with other patterns. It provides synchronous (as well as asynchronous) message sending with request-response correlation, e.g. matching responses with the original request. It also offers push-style message reception (by internally using the PullPushAdapter).
An instance of MessageDispatcher is created with a channel as argument. It can now be used in both client and server role: a client sends requests and receives responses and a server receives requests and send responses. MessageDispatcher allows a application to be both at the same time. To be able to serve requests, the RequestHandler.handle() method has to be implemented: