Copyright Red Hat 2014 - 2023

This document is licensed under the "Creative Commons Attribution-ShareAlike (CC-BY-SA) 3.0" license.

This is the jgroups-raft manual. It provides information about

  • Design and architecture

  • Configuration and use

of jgroups-raft.

Bela Ban, Kreuzlingen Switzerland 2017

1. Overview

The jgroups-raft project is an implementation of Raft in JGroups.

It provides a consensus based system where leader election and changes are committed by consensus (majority agreement). A fixed number of nodes form a cluster and each node is a state machine. A leader is elected by consensus and all changes happen through the leader which replicates them to all nodes, which add them to their persistent log.

Because Raft guarantees that there’s only ever one leader at any time, and changes are identified uniquely, all state machines receive the same ordered stream of updates and thus have the exact same state.

Raft favors consistency over availability; in terms of the Cap theorem, jgroups-raft is a CP system. This means jgroups-raft is highly consistent, and the data replicated to nodes will never diverge, even in the face of network partitions (split brains), or restarts. Or, on an extended version, jgroups-raft provides the means to build PC/EC systems concerning the PACELC theorem.

In case of a network partition, in a cluster of N nodes, at least N/2+1 nodes have to be running for the system to be available.

If for example, in a 5 node cluster, 2 nodes go down, then the system can still commit changes and elect leaders as 3 is still the majority. However, if another node goes down, the system becomes unavailable and client requests will be rejected. (Depending on configuration, there may still be some limited form of read-only availability.)

By implementing jgroups-raft in JGroups, the following benefits can be had:

  • Transports already available: UDP, TCP

    • Contains thread pools, priority delivery (OOB), batching etc

  • Variety of discovery protocols

  • Encryption, authentication, compression

  • Fragmentation, reliability over UDP

  • Multicasting for larger clusters

  • Failure detection

  • Sync/async cluster RPCs

The code required to be written for a full Raft implementation is smaller than if it had been implemented outside of JGroups.

The feature set of jgroups-raft includes

  • Leader election and append entries functionality by consensus

  • Persistent log (using LevelDB)

  • Dynamic addition and removal of cluster nodes

  • Cluster wide atomic counters

  • Replicated hash maps (replicated state machines)

1.1. Architecture

The architecture of jgroups-raft is shown below.

Diagram
Figure 1. The architecture of jgroups-raft.

The components that make up jgroups-raft are

  • A JGroups protocol stack with jgroups-raft specific protocols added:

    • NO_DUPES: makes sure that a jgroups-raft node does not appear in a view more than once

    • ELECTION: handles leader election

    • RAFT: implements the Raft algorithm, ie. appending entries to the persistent log, committing them, syncing new members etc

    • REDIRECT: redirects requests to the leader

    • CLIENT: accepts client requests over a socket, executes them and sends the results back to the clients

  • Channel: this is a regular JGroups JChannel or ForkChannel

  • RaftHandle: the main class for users of jgroups-raft to interact with

  • StateMachine: an implementation of StateMachine. This is typically a replicated state machine. jgroups-raft ships with a number of building blocks implementing StateMachine such as CounterService or ReplicatedStateMachine.

The figure above shows one node in a cluster, but the other nodes have the same setup except that every node is required to have a different raft_id (defined in RAFT). This is a string which defines one cluster member; all members need to have different raft_ids (more on this later).

2. Using jgroups-raft

Since jgroups-raft build on JGroups, we follow similar requirements. Currently, jgroups-raft requires JDK 11. jgroups-raft can be included in the POM:

<dependency>
    <groupId>org.jgroups</groupId>
    <artifactId>jgroups-raft</artifactId>
    <version>X.Y.Z.Final</version>
</dependency>

Where X=major, Y=minor, and Z=patch. The tags are available on GitHub releases.

2.1. Cluster members and identity

Each cluster member has an address (a UUID) assigned by JGroups and a raft_id which needs to be assigned by the user. The latter is a string (e.g. "A") which needs to be unique in the entire cluster. In other words, the raft_id is the identity of a member for the sake of jgroups-raft.

A Raft cluster has a fixed size, so that a majority can be computed for leader election and appending of entries. The members allowed into the cluster is defined in RAFT.members, e.g.

<raft.RAFT members="A,B,C" raft_id="${raft_id:undefined}"/>

This defines a cluster of 3 members: "A", "B" and "C" (whose majority is 2).

These are the raft_id attributes of the 3 members, so attribute raft_id in the example above needs to be one of them. If we don’t start this member with the system property -Draft_id=X (where X needs to be "A", "B", or "C"), then the member will start up as "undefined" is not a member of {"A", "B", "C"}.

Note that while RAFT ensures that non-members cannot join a cluster, the NO_DUPES protocol makes sure that no duplicate member can join. Example: if we have RAFT.members="A,B,C" and actual members "A" and "B" joined, then a join attempt by a member with duplicate name "B" will be rejected and that member won’t be able to join.

Attribute raft_id is also used to define the location of the persistent log; unless log_name is defined in RAFT, the location is computed as <temp_dir>/<raft_id>.log, e.g. /tmp/A.log.

Note that members can be added and removed dynamically (without taking the entire cluster down, changing the configuration and restarting it), see Adding and removing members dynamically.

2.2. RaftHandle

As shown in The architecture of jgroups-raft., RaftHandle is the main class users will be dealing with. It provides methods to change state (append entries) in the replicated state machines, and a state machine can be registered with it. The state machine will be initialized at startup and updated by jgroups-raft whenever there is a change.

A successful change is committed to the persistent logs of all cluster members and applied to their state machines, so all state machines have exactly the same state.

2.2.1. Creation

An instance of RaftHandle is associated with exactly one JGroups channel, and can be created as follows:

JChannel ch=new JChannel("/home/bela/raft.xml"); (1)
RaftHandle handle=new RaftHandle(ch, this);      (2)
ch.connect("raft-cluster");                      (3)
1 A new JGroups channel is created (see the JGroups manual for details on the JGroups API)
2 A RaftHandle instance is created over the channel (which must be non-null). The second argument is an implementation of StateMachine. If null, no changes will be applied. The state machine can be set with stateMachine(StateMachine sm).
3 The channel is connected which causes the member to join the cluster

2.2.2. Making changes

The setX() methods can be used to make changes:

byte[] set(byte[] buf, int offset, int length) throws Exception; (1)
byte[] set(byte[] buf, int offset, int length, long timeout, TimeUnit unit) throws Exception; (2)
CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length); (3)
1 Synchronous change; the caller will be blocked until the change has been forwarded to the leader, which sends it to all cluster members which apply it to their persistent logs and ack the change back to the leader. Once the leader gets a majority of acks, it commits the change to its own log, applies it to its state machine and returns the response to the caller. The state machines thus only contain committed changes.
2 Same as above, except that this call is bounded with a timeout. If it elapses before a majority of acks have been received, a TimeoutException will be thrown.
3 Asynchronous change; this method returns immediately with a CompletableFuture which can be used to retrieve the result later, or to provide some code that’s executed as soon as the result is available (e.g. whenComplete()).

The contents of the request and response buffers is application specific.

For example, if we implemented a replicated hash map, then a request could be a put(key,value). The put() would have to be serialized into the buffer, as well as the key and the value.

When committing the change, every state machine needs to de-serialize the buffer into a put(key,value) and apply it to its state (see Implementing a StateMachine). If there is a return value to the put() call, e.g. the previous value associated with key, then it will be serialized into a buffer and returned as result of one of the setX() calls.

2.2.3. Implementing a StateMachine

StateMachine is an interface and is defined as follows:

public interface StateMachine {
    byte[] apply(byte[] data, int offset, int length) throws Exception;  (1)
    void   readContentFrom(DataInput in) throws Exception;               (2)
    void   writeContentTo(DataOutput out) throws Exception;              (3)
}
1 This method is called whenever a log entry is committed. The buffer’s contents are application specific (e.g this could be a serialized put(key,value) as discussed above. If there is a return value of applying the change to the state machine, it needs to be serialized so that it can be returned to the caller (e.g. a client).
2 This method is called when RAFT needs to initialize a state machine from a snapshot (a dump of a state machine’s contents to an external stream (e.g. a file)). The writeContentTo() method below wrote the contents to a file before, in an application specific format, and this method now needs to read the contents back into the state machine.
3 This method is the opposite of readContentFrom() and writes the contents of this state machine to a stream (e.g. a file).

2.2.4. Snapshotting and state transfer

All cluster members maintain a persistent log and append all changes as log entries to the end of the log. To prevent logs from growing indefinitely, a snapshot of the state machine can be made and the log truncated. This is done (programatically) with method snapshot(), or declaratively (see below).

This method calls StateMachine.writeContentTo() to dump the state of the state machine into a snapshot file and then truncates the log. New members who don’t have a log yet are initialized by sending them the snapshot first. After that, they will catch up via the regular Raft mechanism.

Logs can be snapshot automatically by setting RAFT.max_log_size to the max number of bytes that a log is allowed to grow to until a snapshot is taken.

2.2.5. Miscellaneous methods

Other methods in RaftHandle include:

addServer(String server)

Asynchronously adds a new member to the cluster with id server and returns a CompletableFuture<byte[]>. See more about membership change in Adding and removing members dynamically.

removeServer(String server)

Asynchronously removes a member from the cluster with id server and returns a CompletableFuture<byte[]>. See more about membership change in Adding and removing members dynamically.

leader()

Returns the address of the current Raft leader, or null if there is no leader (e.g. in case there was no majority to elect a leader)

isLeader()

Whether or not the current member is the leader

addRoleListener(RAFT.RoleChange listener)

Allows to register a RoleChange listener which is notified when the current member changes its role (Leader, Follower, Candidate)

currentTerm()

Returns the current term (see Raft for details)

lastApplied()

Returns the index of the last log entry that was appended to the log

commitIndex()

Returns the index of the last log entry that was committed

raft()

Returns a reference to the RAFT protocol in the current member’s stack. Provided for experts who need to access RAFT directly.

raftId(String id)

Used to set the raft_id programmatically (note that this can also be done by setting raft_id in RAFT in the XML configuration. For example, the following code sets raft_id from the command line:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);   (2)
    RaftHandle handle=new RaftHandle(ch, this).raftId(raft_id); (3)
    ch.connect("raft-cluster");  (4)
}

public static void main(String[] args) throws Exception {
    new bla().start(args[0]);  (1)
}
1 The raft_id can for example be passed to the program as an argument
2 The channel is created and its logical name set to be the same as raft_id. This is not necessary, but convenient.
3 Now raft_id can be set via RaftHandle.raftId(String id).

2.3. Configuration

The configuration of a member is either done declaratively via an XML config file or programmatically. Refer to the JGroups documentation for details.

A sample XML configuration file is shown below (edited for brevity):

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <UDP
         mcast_addr="228.5.5.5"
         mcast_port="${jgroups.udp.mcast_port:45588}"/>
    <PING />
    <MERGE3 />
    <FD_SOCK/>
    <FD_ALL/>
    <VERIFY_SUSPECT timeout="1500"  />
    <pbcast.NAKACK2 xmit_interval="500"/>
    <UNICAST3 xmit_interval="500"/>
    <pbcast.STABLE desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <raft.NO_DUPES/>                                                         (1)
    <pbcast.GMS print_local_addr="true" join_timeout="2000"/>
    <UFC max_credits="2M" min_threshold="0.4"/>
    <MFC max_credits="2M" min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <raft.ELECTION election_min_interval="100" election_max_interval="500"/> (2)
    <raft.RAFT members="A,B,C,D" raft_id="${raft_id:undefined}"/>            (3)
    <raft.REDIRECT/>                                                         (4)
    <raft.CLIENT bind_addr="0.0.0.0" />                                      (5)
</config>
1 NO_DUPES: checks that joining a new member doesn’t lead to duplicate raft_ids in the membership. Rejects the JOIN if it would. Must be placed somewhere below GMS
2 ELECTION: this protocol implements leader election, as defined in Raft. It is independent from RAFT and could (and may, in the future) be replaced with a different election protocol. Attributes election_min_interval and election_max_interval define the range from which jgroups-raft picks a random election timeout.
3 RAFT: the main protocol implementing log appending and committing, handling state machine updates, snapshotting etc. Attribute members defines the (fixed) membership (may still be redfined by addServer/removeServer log entries when initializing a member from the persistent log). Attribute raft_id defines the ID of the current member (needs to be an element of members, as discussed earlier).
4 REDIRECT is used to redirect requests to the current Raft leader, or to throw an exception if no member is leader
5 CLIENT listens on a socket (port 1965 by default) for client requests, executes them and sends the result back to the clients. Currently, addServer and removeServer has been implemented.

This is a regular JGroups XML configuration, except that jgroups-raft added a few additional protocols.

2.4. Adding and removing members dynamically

The RAFT protocol provides methods addServer(String raft_id) and removeServer(String raft_id) to add and remove servers from the static membership (defined by RAFT.members). Only one server at a time can be added and removed, and adding or removing a server needs a majority ack to be committed.

Both methods are exposed via JMX, so jconsole could be used. However, jgroups-raft also provides a script (client.sh) to do this in a more convenient way. The script uses Client to connect to a member’s CLIENT protocol running at localhost:1965 (can be changed). The request is then forwarded to the current leader.

The steps to add a member are as follows (say we have RAFT.members="A,B,C" and want to add "D"):

  • Call bin/client.sh -add D

    • If needed, -port PORT or -bind_addr ADDR can be given, e.g. if we need to reach a member running on a different host

  • Once A (the leader) processed addServer("D"), everybody’s RAFT.members is "A","B","C","D"

  • At this point, the XML configuration files should be updated so that RAFT.members="A,B,C,D"

  • If not, members will read the correct membership when getting initialized by their logs

  • A new member D can now be started (its XML config needs to have the correct members attribute !)

Notice that membership changes survive through restarts. If a node must be removed or added, an operation must be submitted, only restarting does not affect membership.

3. Building blocks

Similar to JGroups' building blocks, jgroups-raft also has building blocks, which provide additional functionality on top of a RaftHandle. They are typically given a JChannel, create a RaftHandle and register themselves as StateMachine with the handle. Building blocks offer a different interface to the users, e.g. a replicated hashmap with puts and gets, or a distributed counter or lock.

3.1. ReplicatedStateMachine

ReplicatedStateMachine is a key-value store replicating its contents to all cluster members. Contrary to the JGroups equivalent (ReplicatedHashMap), changes are replicated by consensus and logged to a persistent log.

While the JGroups version is allowed to make progress during network partitions, and users need to merge possibly diverging state from different partitions after a partition heals, ReplicatedStateMachine will allow progress only in the majority partition, so no state merging needs to be done after a partition heals.

Not having to merge state is certainly simpler, but comes at the expense of availability: if N/2+1 members leave or split into different partitions, ReplicatedStateMachine will be unavailable (all requests will time out).

However, the advantage is that the members' states will never diverge.

ReplicatedStateMachine requires a JChannel in its constructor and has put(), get() and remove() methods. The code below shows how to create an instance of ReplicatedStateMachine and add an element to it:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);
    ReplicatedStateMachine<String,String> rsm=new ReplicatedStateMachine<>(ch)
        .raftId(raft_id);
    ch.connect("rsm-cluster");
    rsm.put("name", "Bela");
}

There’s a demo ReplicatedStateMachineDemo which can be used to interactively use ReplicatedStateMachine.

3.1.1. Reads and consensus

ReplicatedStateMachine has a configurable behavior for reads, using quorum reads by default. This means that all read requests are sent through RAFT and require a majority to complete. This provides a linearizable read, but, on the other hand, it takes longer to complete.

If an application does not require linearizable reads, it can change the behavior and only read the value locally, possibly stale, but having a faster response since the request does not go through RAFT.

The behavior is changed by calling ReplicatedStateMachine.allowDirtyReads(boolean).

A quorum read creates an entry in the persistent log. See Issue 18 for details.

3.2. CounterService

CounterService provides a replicated counter which can be used in a synchronous (SyncCounter) and asynchronous (AsyncCounter) mode. The base interface is Counter:

public interface Counter {
    String                getName();
    SyncCounter           sync();
    AsyncCounter          async();
    <T extends Counter> T withOptions(Options opts);
}

The sync() and async() methods can be used to switch between the modes. The withOptions() method can be used to make a counter ignore return values (see below).

3.2.1. Synchronous counters

Synchronous counters block until an operation has completed, and return a value. The interface SyncCounter is:

public interface SyncCounter extends Counter {
    long            get();
    long            getLocal();
    void            set(long new_value);
    default boolean compareAndSet(long exp, long upd) {return compareAndSwap(exp, upd) == exp;}
    long            compareAndSwap(long expect, long update);
    default long    incrementAndGet() {return addAndGet(1L);}
    default long    decrementAndGet() {return addAndGet(-1L);}
    long            addAndGet(long delta);
}

The methods are:

Name Description

get

Returns the value of the counter. If the CounterService allows for dirty reads (see below), getLocal() is called. Otherwise, a remote get request is sent to the leader.

getLocal

Returns the value of the counter. This call is purely local and may return a stale value.

set

Sets the value of the counter

compareAndSet

Atomically updates the counter using a CAS operation

compareAndSwap

Atomically updates the counter using a compare-and-swap operation

incrementAndGet

Atomically increments the counter and returns the new value

decrementAndGet

Atomically decrements the counter and returns the new value

addAndGet

Atomically adds the given value to the current value

The synchronous methods block until consensus has been reached. This means that a method (e.g. addAndGet()) may block indefinitely, e.g. when less than a majority of members have ack’ed the change.

3.2.2. Asynchronous counters

Asynchronous counters are defined in AsyncCounter:

public interface AsyncCounter extends Counter {
    default CompletionStage<Long>    get() {return addAndGet(0);}
    CompletionStage<Long>            getLocal();
    CompletionStage<Void>            set(long new_value);

    default CompletionStage<Boolean> compareAndSet(long expect, long update) {
        return compareAndSwap(expect, update).thenApply(value -> value == expect);
    }
    CompletionStage<Long>            compareAndSwap(long expect, long update);
    default CompletionStage<Long> incrementAndGet() {
        return addAndGet(1);
    }
    default CompletionStage<Long>    decrementAndGet() {
        return addAndGet(-1);
    }
    CompletionStage<Long>            addAndGet(long delta);
}

The semantics of the methods correspond to their synchronous counterparts, but they return a CompletionStage instead of the value. An example is shown below:

public void testCompareAndSwapChained() {
    AsyncCounter counter=counter_service.getOrCreateCounte("ctr", 0).async();
    final long initialValue = 100;
    final long finalValue = 10;

    counter.set(initialValue).toCompletableFuture().join();

    AtomicLong rv = new AtomicLong();
    boolean result = counter.compareAndSwap(1, finalValue)
            .thenCompose(rValue -> {
                rv.set(rValue);
                return counters.get(0).compareAndSwap(rValue, finalValue);
            })
            .thenApply(value -> value == initialValue)
            .toCompletableFuture()
            .join();

    assert result;
    assert initialValue == rv.longValue();
}

A Counter implementation is created via the CounterService building block:

public class CounterService implements StateMachine {
    public CounterService(Channel ch);
    public long           replTimeout();
    public CounterService replTimeout(long timeout);
    public boolean        allowDirtyReads();
    public CounterService allowDirtyReads(boolean flag);
    public CounterService raftId(String id);

    /**
     * Returns an existing counter, or creates a new one if none exists
     * @param name Name of the counter, different counters have to have different names
     * @param initial_value The initial value of a new counter if there is no existing counter.
     * Ignored if the counter already exists
     * @return The counter implementation
     */
    public Counter getOrCreateCounter(String name, long initial_value) throws Exception;


    /**
     * Deletes a counter instance (on the coordinator)
     * @param name The name of the counter. No-op if the counter doesn't exist
     */
    public void deleteCounter(String name) throws Exception;
}

CounterService is mainly used to get an existing or create a new Counter implementation (getOrCreateCounter()), or to delete an existing counter (deleteCounter()).

To create an instance of CounterService, a JChannel has to be passed to the constructor. The sample code below shows how to use this:

protected void start(String raft_id) throws Exception {
    JChannel ch=new JChannel("raft.xml").name(raft_id);
    CounterService cs=new CounterService(ch);                   (1)
    ch.connect("counter-cluster");
    SyncCounter counter=cs.getOrCreateCounter("mycounter", 1);  (2)
    counter.incrementAndGet();                                  (3)
    counter.compareAndSet(2, 5);                                (4)
    long current_value=counter.get();                           (5)
}
1 First a CounterService is created and given a reference to a channel
2 Once the member has joined the cluster, we create a counter named "mycounter" with an initial value of 1
3 The counter is then incremented to 2
4 Now a compare-and-set operation sets the counter to 5 if it was 2
5 The last operation fetches the current value of "mycounter"

Any member in the cluster can change the same counter and all operations are ordered by the Raft leader, which causes the replicated counters to have exactly the same value in all members.

Comparing this to the JGroups equivalent, a jgroups-raft counter never diverges in different members, again at the expense of availability. In the JGroups version, counters are always available, but may diverge, e.g. in a split brain scenario, and have to be reconciled by the application after the split brain is resolved.

There’s a demo CounterServiceDemo which can be used to interactively manipulate replicated counters.

3.2.3. Reads and consensus

Currently (as of jgroups-raft version 0.4), reading a counter is by default dirty, meaning that a read may return a stale value.

This can be changed by calling counter_service.allowDirtyReads(false).

However, this inserts a dummy read log entry which returns the value of counter when committed. Since this dummy entry is ordered correctly wrt writes in the log, it will always return correct values.

The cost is that reads take up space in the persistent logs and that we need consensus (majority) for reads. In the next release of jgroups-raft, the mechanism for client reads as suggested in the Raft paper will be implemented. See Issue 18 for details.

3.2.4. Ignoring return values

Sometimes, a caller is not interested in the result of an operation. E.g. a stress test may want to update a counter many times, e.g. with many different threads, and only then fetch the final counter value. When this is the case, an option can be used with a counter:

public void testIgnoreReturnValue() {
    SyncCounter counter=counter_service.getOrCreateCounter("ctr", 0);
    long ret=counter.incrementAndGet(); (1)

    counter=counter.withOptions(Options.create(true)); (2)
    ret=counter.incrementAndGet(); (3)
    assert ret == 0;
    ret=counter.getLocal(); (4)
}

In (1), a counter is incremented and the new value returned. This returns 1

In (2) , a counter is created with an Option, which declares that return values are to be ignored. Consequently, when we increment the counter in (3), the return value is 0, although the counter was indeed incremented, as shown when fetching the value in (4).

Returning 0 may not be the most clever use of options, but is the result of autoboxing a null Long value into a long. The idea is that the result of an operation that has this option set, should not be assigned to a variable.

When the ignore-return-value option is set, REDIRECT doesn’t need to serialize and send the result from the leader to the follower, and RAFT does not need to serialize the result into a byte[] array, either. The cost reduction here may not be insignificant, depending on the (serialized) size of the result values and the frequency of operations.

3.3. Cluster singleton service

A singleton service is a service which is supposed to run only once in an entire cluster. Typically, in JGroups, a singleton service is started on the first member of a cluster. For example, if we have {A,B,C,D,E}, the singleton service (or services) would be running on A.

If we have a partition, such that the cluster falls apart into {A,B,C} and {D,E}, then an additional singleton would be started on D, as D became coordinator and doesn’t know {A,B,C} didn’t leave, but were partitioned away instead.

When the partition ends, if D is not coordinator anymore, it would stop its singleton services.

If multiple singletons (as provided by JGroups, e.g. during a network split) cannot be tolerated by the application, and the application has a requirement that at most one singleton service can be running (better none than two), jgroups-raft can be used.

The mechanism to implement singleton services in jgroups-raft is leader election: it is guaranteed that at most one leader exists in a given cluster at the same time. This is exactly what we need for singletons. The code below shows how to do this:

JChannel ch=null;
RaftHandle handle=new RaftHandle(ch, this); (1)
handle.addRoleListener(role -> {            (2)
    if(role == Role.Leader)                 (3)
        // start singleton services
    else
        // stop singleton services
});
1 A RaftHandle is created over a channel
2 A RAFT.RoleChange callback is registered with the handle. Alternatively, addRoleListener() could be called directly on an instance of RAFT retrieved from the protocol stack associated with the given channel
3 When we become the Raft leader, the singleton services can be started, when not, they should be stopped (if running)

In jgroups-raft, utilizing JGroups' views and an altered leader election algorithm strengthens the singleton more than the one described by Raft. In a usual Raft implementation, the members compete to become leaders based on time outs. This behavior causes disruptions in the cluster, and it allows for multiple leaders at the same time but on different terms.

More details about the jgroups-raft custom election algorithm are available in the design documents. In a few words, our implementation is more robust and less prone to disruption of competing members. These design changes converge in a more stable singleton service.

Singleton is not a distributed lock.

Utilizing the singleton service as a distributed lock to access critical sections or resources is dangerous during network partitions. The current leader may split in a minority partition, which means the majority elects another node. If this happens while a node is in a critical section, it could result in two nodes accessing the resource.

4. List of protocols

This chapter describes the most frequently used protocols, and their configuration.

Meanwhile, we recommend that users should copy one of the predefined configurations (shipped with jgroups-raft), e.g. raft.xml, and make only minimal changes to it.

4.1. NO_DUPES

This protocol prevents duplicate members from joining the cluster. The protocol needs to be located somewhere below GMS.

NO_DUPES catches JOIN requests from a joiner to the JGroups coordinator and checks if the joiner’s raft_id is already contained in the current membership, and rejects the JOIN if this is the case.

For example, if we have current members {A,B} and another member with raft_id "B" joins, then the joiner would get the following exception when trying to join the cluster:

-------------------------------------------------------------------
GMS: address=B, cluster=cntrs, physical address=127.0.0.1:64733
-------------------------------------------------------------------
Exception in thread "main" java.lang.Exception: connecting to channel "cntrs" failed
	at org.jgroups.JChannel._connect(JChannel.java:570)
	at org.jgroups.JChannel.connect(JChannel.java:294)
	at org.jgroups.JChannel.connect(JChannel.java:279)
	at org.jgroups.raft.demos.CounterServiceDemo.start(CounterServiceDemo.java:32)
	at org.jgroups.raft.demos.CounterServiceDemo.main(CounterServiceDemo.java:163)
Caused by: java.lang.SecurityException: join of B rejected as it would create a view with duplicate members (current view: [B|1] (2) [B, A])
	at org.jgroups.protocols.pbcast.ClientGmsImpl.isJoinResponseValid(ClientGmsImpl.java:187)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.installViewIfValidJoinRsp(ClientGmsImpl.java:153)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.joinInternal(ClientGmsImpl.java:111)
	at org.jgroups.protocols.pbcast.ClientGmsImpl.join(ClientGmsImpl.java:41)
	at org.jgroups.protocols.pbcast.GMS.down(GMS.java:1087)
	at org.jgroups.protocols.FlowControl.down(FlowControl.java:353)
	at org.jgroups.protocols.FlowControl.down(FlowControl.java:353)
	at org.jgroups.protocols.FRAG2.down(FRAG2.java:136)
	at org.jgroups.protocols.RSVP.down(RSVP.java:153)
	at org.jgroups.protocols.pbcast.STATE_TRANSFER.down(STATE_TRANSFER.java:202)
	at org.jgroups.protocols.raft.ELECTION.down(ELECTION.java:112)
	at org.jgroups.protocols.raft.RAFT.down(RAFT.java:442)
	at org.jgroups.protocols.raft.REDIRECT.down(REDIRECT.java:103)
	at org.jgroups.stack.ProtocolStack.down(ProtocolStack.java:1038)
	at org.jgroups.JChannel.down(JChannel.java:791)
	at org.jgroups.JChannel._connect(JChannel.java:564)
	... 4 more
[mac] /Users/bela/jgroups-raft$

The error message is SecurityException: join of B rejected as it would create a view with duplicate members (current view: [B|1] (2) [B, A]), which shows that view {B,A} already contains a member with raft_id B, and so the JOIN request of the new member is rejected.

4.2. ELECTION

ELECTION is the protocol which performs leader election, as defined by Raft. Its attributes define the election timeout and the heartbeat interval (see Raft for details).

Table 1. ELECTION
Name Description

vote_timeout

Max time (ms) to wait for vote responses

4.3. ELECTION2

ELECTION2 is an alternative election algorithm. It builds on top of ELECTION to include a pre-vote mechanism. The pre-vote runs before delegating to the algorithm of ELECTION.

By design, ELECTION uses view changes to start election rounds and should be stable without interruptions. ELECTION2 is an alternative in networks with recurrent partitions that could lead to more disruptions with unnecessary election rounds. More information about how it works is available in the design documents.

Table 2. ELECTION2
Name Description

vote_timeout

Max time (ms) to wait for vote responses

4.4. RAFT

RAFT is the main protocol in jgroups-raft; it implements log appending and committing, snapshotting and log compaction, syncing of new members and so on.

Table 3. RAFT
Name Description

dynamic_view_changes

If true, we can change members at runtime

log_args

Arguments to the log impl, e.g. k1=v1,k2=v2. These will be passed to init()

log_class

The fully qualified name of the class implementing Log

log_dir

The directory in which the log and snapshots are stored. Defaults to the temp dir

log_prefix

The prefix of the log and snapshot. If null, the logical name of the channel is used as prefix

log_use_fsync

n/a

max_log_cache_size

n/a

max_log_size

Max number of bytes a log can have until a snapshot is created

members

List of members (logical names); majority is computed from it

processing_queue_max_size

Max size in items the processing queue can have

raft_id

The identifier of this node. Needs to be unique and an element of members. Must not be null

resend_interval

Interval (ms) at which AppendEntries messages are resent to members with missing log entries

send_commits_immediately

Send commit message to followers immediately after leader commits (majority has consensus). Caution : it may generate more traffic than expected

4.5. REDIRECT

The REDIRECT protocol needs to be somewhere above RAFT. It keeps track of the current Raft leader and redirects requests to the right leader. If there is no leader, e.g. because there’s no majority to elect one, an exception will be thrown.

4.6. CLIENT

CLIENT listens on a socket for client requests. When a request is received, it is sent down where it will be forwarded (by REDIRECT) to the current leader which executes the request. The responses is then sent back to the client.

Table 4. CLIENT
Name Description

bind_addr

The bind address which should be used by the server socket. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address

idle_time

Number of ms a thread can be idle before being removed from the thread pool

max_threads

Max number of threads in the thread pool

min_threads

The min threads in the thread pool

port

Port to listen for client requests

recv_buf_size

Number of bytes of the server socket’s receive buffer