OUTPUT

The blog of Maxime Kjaer

CS-451 Distributed Algorithms

Introduction

In terms of abstraction layers, distributed algorithms are sandwiched between the application layer (processes) the network layer (channels). We have a few commonly used abstractions in this course:

  • Processes abstract computers
  • Channels (or communication links) abstract networks
  • Failure detectors abstract time

We consider that a distributed system is composed of processes making up a static set (i.e. it doesn’t change over time). These processes communicate by sending messages over the network channel. The distributed algorithm consists of a set of distributed automata, one for each process. All processes implement the same automaton.

When defining a problem, there are two important properties that we care about:

  • Safety states that nothing bad should happen
  • Liveness states that something good should happen eventually

Safety is trivially implemented by doing nothing, so we also need liveness to make sure that the correct things actually happen.

Two nodes can communicate through a link by passing messages. However, this message passing can be faulty: it can drop messages or repeat them. How can we ensure correct and reliable message passing under such conditions?

A link has two basic types of events:

  • Send: we place a message on the link
  • Deliver: the link gives us a message

A fair loss link is a link that may lose or repeat some packets. This is the weakest type of link we can assume. In practice, it corresponds to UDP.

Deliver can be thought of as a reception event on the receiver end. The terminology used here (“deliver”) implies that the link delivers to the client, but this can equally be thought of as the client receiving from the link.

For a link to be considered a fair-loss link, we must respect the following three properties:

  • FLL1. Fair loss: If a correct process infinitely often sends a message to a correct process , then delivers an infinite number of times.
  • FLL2. Finite duplication: If a correct process sends a message a finite number of times to process , then cannot be delivered an infinite number of times by .
  • FLL3. No creation: If some process delivers a message with sender , then was previously sent to by process .

Let’s try to get some intuition for what these properties mean:

  • FLL1 does not guarantee that all messages get through, but at least ensures that some messages get through.
  • FLL2 means that message can only be repeated by the link a finite number of times.
  • FLL3 means that every delivery must be the result of a send; no message must be created out of the blue.

There’s no real algorithm to implement here; we have only placed assumptions on the link itself. Still, let’s take a look at the interface.

1
2
3
4
5
6
7
8
9
Module:
    Name: FairLossLinks (flp2p)

Events:
    Request: <flp2pSend, dest, m>: requests to send message m to process dest
    Indication: <flp2pDeliver, src, m>: delivers messages m sent by src

Properties:
    FLL1, FLL2, FLL3

A stubborn link is one that stubbornly delivers messages; that is, it ensures that the message is received. Here, we’ll disregard performance, and just keep sending the message.

The properties that we look for in a stubborn link are:

  • SL1. Stubborn delivery: If a correct process sends a message once to a correct process , then delivers an infinite number of times.
  • SL2. No creation: If some process delivers a message with sender , then was previously sent to by .

A stubborn link can be implemented with a FLL as the following algorithm, which we could call “retransmit forever”. We could probably make it more efficient with the use of timeouts, but since we’re mainly concerned with correctness for now, we’ll just keep it simple.

1
2
3
4
5
6
7
8
9
10
def send(m):
    """ 
    Keep sending the same message
    over and over again on the FLL 
    """
    while True:
        FLL.send(m)

# When the underlying FLL delivers, deliver to the layer above
FLL.on_delivery(lambda m: deliver(m))

The above is written in Python, but the syntax we’ll use in this course is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Implements: 
    StubbornLinks (sp2p)
Uses:
    FairLossLinks (flp2p)
Events:
    Request: <sp2pSend, dest, m>: requests to send message m to dest
    Indication: <sp2pDeliver, src, m>: delivers message m sent by src
Properties:
    SL1, SL2

upon event <sp2pSend, dest, m> do:
    while (true) do:
        trigger <flp2p, dest, m>;

upon event <flp2pDeliver, src, m> do:
    trigger <sp2pDeliver, src, m>;

Note that this piece of code is meant to sit between two abstraction levels; it is between the channel and the application. As such, it receives sends from the application and forwards them to the link, and receives delivers from the link and forwards them to the application. It must respect the interface of the underlying FLL, and as such, only specifies send and receive hooks.

Note that a stubborn link will deliver the same message infinitely many times, according to SL1. Wanting to only deliver once will lead us to perfect links.

Here again, we respect the send/deliver interface. The properties are:

  • PL1. Reliable delivery: If a correct process sends a message to a correct process , then eventually delivers
  • PL2. No duplication: No message is delivered by a process more than once
  • PL3. No creation: If some process delivers a message with sender , then was previously sent to by .

This is the type of link that we usually use: TCP is a perfect link, although it also has more guarantees (notably on message ordering, which this definition of a perfect link does not have). TCP keeps retransmitting a message stubbornly, until it gets an acknowledgment, which means that it can stop transmitting. Acknowledgments aren’t actually needed in theory, it would still work without them, but we would also completely flood the network, so acknowledgments are a practical consideration for performance; just note that the theorists don’t care about them.

Compared to the stubborn link, the perfect link algorithm could be called “eliminate duplicates”. In addition to what the stubborn links do, it keeps track of messages that

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Implements: 
    PerfectLinks (pp2p)
Uses:
    StubbornLinks (sp2p)
Events:
    Request: <pp2pSend, dest, m>: requests to send message m to process q
    Indication: <pp2pDeliver, src, m>: delivers message m sent by src
Properties:
    PL1, PL2, PL3

upon event <pp2p, Init> do:
    delivered := ;

upon event <pp2pSend, dest, m> do:
    trigger <sp2pSend, dest, m>;

upon event <sp2pDeliver, src, m> do:
    if m  delivered:
        delivered := delivered  {m};
        trigger <pp2pDeliver, src, m>;

Throughout the course, we’ll use perfect links as the underlying link (unless otherwise specified).

Impossibility of consensus

Suppose we’d like to compute prime numbers on a distributed system. Let P be the producer of prime numbers. Whenever it finds one, it notifies two servers, S1 and S2 about it. A client C may request the full list of known prime numbers from either server.

As in any distributed system, we want the servers to behave as a single (abstract) machine.

Solvable atomicity problem

P finds 1013 as a new prime number, and sends it to S1, which receives it immediately, and S2, which receives it after a long delay. In the meantime, before both servers have received the update, we have an atomicity problem: one server has a different list from the other. In this time window, C will get different results from S1 (which has numbers up to 1013) and S2 (which only has numbers up to 1009, which is the previous prime).

A simple way to solve this is to have C send the new number (1013) to the other servers; if it requested from S1 it’ll send the update to S2 as a kind of write back, to make sure that S2 also has it for the next request. We haven’t strictly defined the problem or its requirements, but this may need to assume a link that guarantees delivery and order (i.e. TCP, not UDP).

Unsolvable atomicity problem

Now assume that we have two prime number producers P1 and P2. This introduces a new atomicity problem: the updates may not reach all servers atomically in order, and the servers cannot agree on the order.

This is impossible to solve; we won’t prove it, but universality of Turing is lost (unless we make very strong assumptions). This is known as the impossibility of consensus.

Timing assumptions

An important element for describing distributed algorithms is how the system behaves with respect to the passage of time. Often, we need to be able to make assumptions about time bounds.

Measuring time in absolute terms with a physical clock (measuring seconds, minutes and hours) is a bit of a dead end for discussing algorithms. Instead, we’ll use the concept of logical time, which is defined with respect to communications. This clock is just an abstraction we use to reason about algorithms; it isn’t accessible to the processes or algorithms.

The three time-related models are:

  • Synchronous: assuming a synchronous system comes down to assuming the following properties:
    • Synchronous computation: receiving a message can imply a local computation, and this computation can result in sending back a message. This assumption simply states that all the time it takes to do this is bounded and known.
    • Synchronous communication: there is a known upper bound limit on message transmission delays; the time between sending and delivering a message on the other end of the link is smaller that the bound
    • Synchronous clocks: the drift between a local clock and the global, real-time clock is bounded and known
  • Eventually synchronous: the above assumptions hold eventually
  • Asynchronous: no assumptions

We can easily see how a distributed system would be synchronous: placing bounds on computation and message transmission delays should be possible most of the time. But network overload and message loss may lead the system to become partially synchronous, which is why we have the concept of eventually synchronous.

To abstract these timing assumptions, we will introduce failure detectors in the following section.

Failure detection

A failure detector is a distributed oracle that provides processes with suspicions about crashed processes. There are two kinds of failure detectors, with the following properties

  • Perfect failure detector
    • PFD1. Strong completeness: eventually, every process that crashes is permanently suspected by every correct process
    • PFD2. Strong accuracy: if a process is detected by any process, then has crashed
  • Eventually perfect failure detector
    • EPFD1. Strong completeness = PFD1
    • EPFD2. Eventual strong accuracy: eventually, no correct process is ever suspected by any correct process

A perfect failure detector tells us when a process has crashed by emitting a <Crash, p> event. It never makes mistakes, never changes its mind; decisions are permanent and accurate.

An eventually perfect detector may make mistakes, falsely suspecting a correct process to be crashed. If it does so, it will eventually change its mind and tell us the truth. When it suspects a process , it emits a <Suspect, p> event; if it changes its mind, it emits a <Restore, p> event. In aggregate, eventually perfect failure detectors are accurate.

A failure detector can be implemented by the following algorithm:

  1. Processes periodically send heartbeat messages
  2. A process sets a timeout based on worst case round trip of a message exchange
  3. A process suspects another process has failed if it timeouts that process
  4. A process that delivers a message from a suspected process revises its suspicion and doubles the time-out

Reliable broadcast

Broadcast is useful for applications with pubsub-like mechanisms, where some processes subscribe to events published by others (e.g. stock prices).

The subscribers might need some reliability guarantees from the publisher (these guarantees are called “quality of service”, or QoS). These quality guarantees are typically not offered by the underlying network, so we’ll see different broadcasting algorithms with different guarantees.

A broadcast operation is an operation in which a process sends a message to all processes in a system, including itself. We consider broadcasting to be a single operation, but it of course may take time to send all the messages over the network.

Best-effort broadcast (BEB)

In best-effort broadcast (BEB), the sender is the one ensuring the reliability; the receivers do not have to be concerned with enforcing the reliability. On the other hand, if the sender fails, all guarantees go out the window.

Properties

The guarantees of BEB are as follows:

  • BEB1. Validity: if and are correct, then every message broadcast by is eventually delivered by
  • BEB2. No duplication: no message is delivered more than once
  • BEB3. No creation: if a process delivers a message with sender , then was previously broadcast by

BEB1 is a liveness property, while no BEB2 and BEB3 are safety properties.

As we said above, the broadcasting machine may still crash in the middle of a broadcast, where it hasn’t broadcast the message to everyone yet, and it’s important to note that BEB offers no guarantee against that.

Algorithm

The algorithm for BEB is fairly straightforward: it just sends the message to all processes in the network using perfect links (remember that perfect links use stubborn links, sending the same message continuously). Perfect links already guarantees no duplication (PL2), so we can just forward delivered messages to the application layer above BEB.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Implements: 
    BestEffortBroadcast (beb)
Uses: 
    PerfectLinks (pp2p)
Events:
    Request: <bebBroadcast, m>: broadcasts a message m to all processes
    Indication: <bebDeliver, src, m>: delivers a message m sent by src

upon event <bebBroadcast, m> do:
    forall q  Π do:
        trigger <pp2pSend, q, m>;

upon event <pp2pDeliver, src, m> do:
    trigger <bebDeliver, src, m>;

Correctness

This is not the most efficient algorithm, but we’re not concerned about that. We just care about whether it’s correct, which we’ll sketch out a proof for:

  • Validity: By PL1 (the validity property of perfect links), and the very facts that:
    • the sender pp2Sends the message to all processes in
    • every correct process that pp2pDelivers a message bebDelivers it too
  • No duplication: by PL2 (the no duplication property of perfect links)
  • No creation: by PL3 (the no creation property of perfect links)

Reliable broadcast (RB)

As we said above, BEB offers no guarantees if the sender crashes while sending. If it does fail while sending, we may end up in a situation where some processes deliver the messages, and others don’t. In other words, not all processes agree on the delivery.

As it turns out, it’s even more subtle than that: the sender may already have done a bebSend and pp2pSend, and so on, placed all messages on the wire, and then crash. Because the underlying perfect link do not guarantee delivery when the sender crashes, we have no guarantee that the messages have been delivered.

Properties

To address this, we want an additional property compared to BEB, agreement:

  • RB1. Validity = BEB1
  • RB2. No duplication = BEB2
  • RB3. No creation = BEB3
  • RB4. Agreement: for any message , if a correct process delivers , then every correct process delivers

RB4 tells us that even if the broadcaster crashes in the middle of a broadcast and is unable to send to other processes, we’ll honor the agreement property. This is done by distinguishing receiving and delivering; the broadcaster may not have sent to everyone, but in that case, reliable broadcast makes sure that no one delivers.

Algorithm

For the first time, we’ll use a perfect failure detector in our implementation of RB. Since we’re aiming to do the same as BEB but with the added agreement property, we’ll use BEB as the underlying link.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Implements:
    ReliableBroadcast (rb)
Uses:
    BestEfforBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <rbBroadcast, m>: broadcasts a message m to all processes
    Indication: <rbDeliver, src, m>: delivers a message m sent by src
Properties:
    RB1, RB2, RB3, RB4

upon event <rb, Init> do:
    delivered := ;
    correct := Π;
    from := [];
    forall p  Π do:
        from[p] := ;

upon event <rbBroadcast, m> do:
    delivered := delivered  {m};
    trigger <rbDeliver, self, m>; # deliver to self
    trigger <bebBroadcast, [Data, self, m]>; # broadcast to others using beb

# Here, it's important to distinguish the sender (at the other 
# side of the link) from the src (original broadcaster):
upon event <bebDeliver, sender, [Data, src, m]> do:
    if m  delivered:
        # deliver m from src:
        delivered := delivered  {m};
        trigger <rbDeliver, src, m>;
        # echo to others if src no longer correct:
        if src  correct:
            trigger <bebBroadcast, [Data, src, m]>;
        else:
            from[sender] := from[sender]  {[src, m]};

upon event <crash, p> do:
    correct := correct \ {p};
    # echo all previous messages from crashed p:
    forall [src, m]  from[p] do:
        trigger <bebBroadcast, [Data, src, m]>

The idea is to echo all messages from a process that has crashed. From the moment we get the crash message from the perfect failure predictor , we forward all subsequent messages from the crashed sender to all nodes. But we may also have received messages from the sender before knowing that it was crashed. To solve this, we keep track of all broadcasts, and rebroadcast all old messages when we find out the sender crashed.

Correctness

We’ll sketch a proof for the properties:

  • Validity: as for RB
  • No duplication: as for RB
  • No creation: as for RB
  • Agreement: Assume some correct process rbDelivers a message that was rbBroadcast by some process .
    • If is correct, then by BEB1 (BEB validity), all correct processes will bebDeliver , and according to the algorithm (ATTA), deliver through rbDeliver.
    • If crashes, then by PFD1 (strong completeness of ), detects the crash and ATTA echoes with bebBroadcast to all. Since is correct, then by BEB1 (BEB validity), all correct processes bebDeliver and then, ATTA, rbDeliver .

Note that the proof only uses the completeness property of the failure detector (PFD1), not the accuracy property (PFD2). Therefore, the predictor can either be perfect or eventually perfect .

Uniform reliable broadcast (URB)

In RB, we only required that correct processes should agree on the set of messages to deliver. We made no requirements on what messages we allow faulty processes to deliver.

For instance, a scenario possible under RB is that we want to rbBroadcast from a process . It could rbDeliver it to itself, and then crash before it had time to bebBroadcast it to others (see lines 24 and 25 of the RB algorithm). In this scenario, all correct nodes still agree not to deliver the message (after all, none of them have received it), but has already delivered it.

Properties

Uniform reliable broadcast solves this problem, by ensuring that all processes agree. Its properties are:

  • URB1. Validity = BEB1
  • URB2. No duplication = BEB2
  • URB3. No creation = BEB3
  • URB4. Uniform agreement: for any message , if a process delivers , then every correct process delivers

We’ve removed the word “correct” in agreement, and this changes things quite a bit. Uniform agreement is a stronger assertion, which ensures that the set of messages delivered by faulty processes is a subset of those delivered by correct processes.

Algorithm

The algorithm is given by:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
Implements: 
    uniformBroadcast (urb)
Uses:
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <rbBroadcast, m>: broadcasts a message m to all processes
    Indication: <rbDeliver, src, m>: delivers a message m sent by src
Properties:
    URB1, URB2, URB3, URB4

upon event <urb, Init> do:
    correct := Π;
    delivered := ;
    pending := ; # set of [src, msg] that we have sent (broadcast or echoed)
    ack[Message] := ; # set of nodes that have acknowledged Message

upon event <crash, p> do:
    correct := correct \ {p};

# before broadcasting, save message in pending
upon event <urbBroadcast, m> do:
    pending := pending  {[self, m]};
    trigger <bebBroadcast, [Data, self, m]>;

# If I haven't sent the message, echo it
# If I've already sent it, don't do it again
upon event <bebDeliver, sender, [Data, src, m]> do:
    ack[m] := ack[m]  {sender};
    if [src, m]  pending:
        pending := pending  {[src, m]};
        trigger <bebBroadcast, [Data, src, m]>;

# Deliver the message when we know that all correct processes
# have delivered (and if we haven't delivered already)
upon event (exists [src, m]  pending) 
  such that (can_deliver(m) and m  delivered) do:
    delivered := delivered  {m};
    trigger <urbDeliver, src, m>;

# We can deliver if all correct nodes have acknowledged m:
def can_deliver(m):
    return correct  ack[m];

When a process sees a message (that is, bebDelivers it), it relays it once; this relay serves as an acknowledgment, but also as a way to forward the message to other nodes. All processes keep track of who they have received messages from (either acks or the original message). Once all correct nodes have sent it the message (again, either an ack or the original), it can urbDeliver.

Because all nodes all echo the message to each other once, the number of messages sent is .

Because the algorithm waits for confirmation from all correct nodes, it only urbDelivers messages that it knows that all correct nodes have seen.

Correctness

To prove the correctness, we must first have a simple lemma: if a correct process bebDelivers a message , then eventually urbDelivers the message .

This can be proven as follows: ATTA, any process that bebDelivers bebBroadcasts . By the PFD1 (completeness of ), and BEB1 (validity of BEB), there is a time at which bebDelivers from every correct process and hence, ATTA, urbDelivers it.

The proof is then:

  • Validity: If a correct process urbBroadcasts a message , then eventually bebBroadcasts and bebDelivers . Then, by our lemma, urbDelivers it.
  • No duplication: as BEB
  • No creation: as BEB
  • Uniform agreement: Assume some process urbDelivers a message . ATTA, and by PFD1 and PFD2 (completeness and accuracy of ), every correct process bebDelivers . By our lemma, every correct process will therefore urbDeliver .

Unlike previous algorithms, this relies on perfect failure detection. But under the assumption that the majority of processes stay correct, we can do with an eventually perfect failure detector . To do so, we remove the crash event above, and replace the can_deliver method with the following:

1
2
def can_deliver(m):
    return len(ack[m]) > N/2

Causal order broadcast (CB)

So far, we didn’t consider ordering among messages. In particular, we considered messages to be independent.

Two messages from the same process might not be delivered in the order they were broadcast. This could be problematic: imagine a message board implemented with (uniform) reliable broadcast. For instance, in a message board application, a broadcaster could send out a message , immediately change its mind and send out a rectification . But due to network delays, messages may come out of order, and may be delivered before by the receiving node . This is problematic, because the modification won’t make sense to as long as it hasn’t delivered .

A nice property to have in these cases is causal order, where we don’t necessarily impose a total ordering constraint, but do want certain groups of messages to be ordered in a way that makes sense for the applications.

Causal order

We say that causally precedes , denoted as , if any of the properties below hold:

  • FIFO Order: Some process broadcasts before broadcasting
  • Causal Order: Some process delivers and then broadcasts
  • Transitivity: There is a message such that and .

Note that doesn’t mean that caused ; it only means that it may potentially have caused . But without any input from the application layer about what messages are logically dependent on each other, we can still enforce the above causal order.

Properties

The causal order property (CO) guarantees that messages are delivered in a way that respects all causality relations. It is respected when we can guarantee that any process delivering a message has already delivered every message such that .

So all in all, the properties we want from CB are:

  • CB1. Validity = RB1 = BEB1
  • CB2. No duplication = RB2 = BEB2
  • CB3. No creation = RB3 = BEB3
  • CB4. Agreement = RB4
  • CB5. Causal order: if then any process delivering has already delivered .

No-waiting Algorithm

The following uses reliable broadcast, but we could also use uniform reliable broadcast to obtain uniform causal broadcast.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Implements: 
    ReliableCausalOrderBroadcast (rcb)
Uses: 
    ReliableBroadcast (rb)
Events:
    Request: <rcbBroadcast, m>: broadcasts a message m to all processes
    Indication: <rcbDeliver, sender, m>: delivers a message m sent by sender
Properties:
    RB1, RB2, RB3, RB4, CO

upon event <rcb, Init> do:
    delivered := ;
    past := ; # contains all past [src, m] pairs

upon event <rcbBroadcast, m> do:
    trigger <rbBroadcast, [Data, past, m]>
    past := past  {[self, m]};

upon event <rbDeliver, src_m, [Data, past_m, m]> do:
    if m  delivered:
        # Deliver all undelivered, past messages that caused m:
        forall [src_n, n]  past_m do: # in list order
            if n  delivered:
                trigger <rcbDeliver, src_n, n>;
                delivered := delivered  {n};
                past := past  {[src_n, n]};
        # Then deliver m:
        trigger <rcbDeliver, src_m, m>;
        delivered := delivered  {m};
        past := past  {[src_m, m]};

This algorithm ensures causal reliable broadcast. The idea is to re-broadcast all past messages every time, making sure we don’t deliver twice. This is obviously not efficient, but it works in theory.

An important point to note here is that this algorithm doesn’t wait. At no point is the rcbDelivery delayed in order to respect causal order.

Garbage collection

A problem with this algorithm is that the size of the past grows linearly. A simple optimization is to add a kind of distributed garbage collection to clean the past.

The idea is that we can delete the past when all other processes have delivered. To do this, whenever a process rcbDelivers, we also need to send an acknowledgment to all other processes. When we have received an acknowledgment from all correct processes, then we can purge the corresponding message from the past.

This implies using a perfect failure detector, as the implementation below shows.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Implements:
    GarbageCollection, ReliableCausalOrderBroadcast (rcb)
Uses:
    ReliableBroadcast (rb)
    PerfectFailureDetector (P)
Events:
    Request: <rcbBroadcast, m>: broadcasts a message m to all processes
    Indication: <rcbBroadcast, sender, m>: delivers a message m sent by sender
Properties:
    RB1, RB2, RB3, RB4, CO

upon event <rcb, Init> do:
    delivered := ;
    past := ;
    correct := Π;
    ack[m] := ; # for all possible messages m

upon event <crash, p> do:
    correct := correct \ {p};

# Broadcast as before:
upon event <rcbBroadcast, m> do:
    trigger <rbBroadcast, [Data, past, m]>
    past := past  {[self, m]};

# Deliver messages as before:
upon event <rbDeliver, src_m, [Data, past_m, m]> do:
    if m  delivered:
        # Deliver all undelivered, past messages that caused m:
        forall [src_n, n]  past_m do: # (in list order)
            if n  delivered:
                trigger <rcbDeliver, src_n, n>;
                delivered := delivered  {n};
                past := past  {[src_n, n]};
        
        # Then deliver m:
        trigger <rcbDeliver, src_m, m>;
        delivered := delivered  {m};
        past := past  {[src_m, m]};

# Ack delivered messages that haven't been acked yet:
upon event (exists m  delivered) such that (self  ack[m]) do:
    ack[m] := ack[m]  {self};
    trigger <rbBroadcast, [ACK, m]>;

# Register delivered acks:
upon event <rbDeliver, sender, [ACK, m]> do:
    ack[m] := ack[m]  {sender};

# Delete past once everybody has acked:
upon event correct  ack[m] do:
    forall [src_n, n]  past such that n = m:
        past := past \ {[src_n, m]};

We need the perfect failure detector’s strong accuracy property to prove the causal order property.

However, we don’t need the failure detector’s completeness property; if we don’t know that a process is crashed, it has no impact on correctness, only on performance, since it just means that we won’t delete the past.

Waiting Algorithm

Another algorithm is given below. It uses a “vector clock” (VC) as an alternative, more efficient encoding of the past.

A VC is simply a vector with one entry for each process in . Each entry is a sequence number (also called logical clock) for the corresponding process. Each process maintains its own VC. Its own VC entry counts the number of times it has rcbBroadcast. The entries for other processes count the number of times has rcbDelivered from .

A VC is updated under the following rules:

  • Initially all clocks are empty
  • Each time a process sends a message, it increments its own logical clock in the vector by one, and sends a copy of its own vector.
  • Each time a process receives a message, it increments its own logical clock in the vector by one and updates each element in its vector by taking the maximum of the value in its own vector clock and the value in the vector in the received message (for every element).
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Implements:
    ReliableCausalOrderBroadcast (rcb)
Uses:
    ReliableBroadcast (rb)

upon event <rcb, Init> do:
    pending := ;
    for all p  Π:
        VC[p] := 0;

upon event <rcbBroadcast, m> do:
    trigger <rcbDeliver, self, m>;
    trigger <rbBroadcast, [Data, VC, m]>; 
    VC[self] := VC[self] + 1;

upon event <rbDeliver, src, [Data, VC_m, m]>:
    if src != self:
        pending := pending  {(src, VC_m, m)};
        # Deliver pending:
        while exists (src_n, VC_n, n)  pending
          such that VC_n <= VC do:
            pending := pending \ {(src_n, VC_n, n)};
            trigger <rcbDeliver, self, n>; # self, can this be true?
            VC[src_n] := VC[src_n] + 1

The comparison operation on vector clocks is defined as follows: iff it is less or equal in all positions, and at least one position is strictly less.

Total order broadcast (TOB)

In reliable broadcast, the processes are free to deliver in any order they wish. In causal broadcast, we restricted this a little: the processes must deliver in causal order. But causal order is only partial: some messages are causally unrelated, and may therefore be delivered in a different order by the processes.

In total order broadcast (TOB), the processes must deliver all messages according to the same order. Note that this is orthogonal to causality, or even FIFO ordering. It can be made to respect causal or FIFO ordering, but at its core, it is only concerned with all processes delivering in the same order, no matter the actual ordering of messages.

TOB is also sometimes called atomic broadcast, as the delivery occurs as if broadcast was an indivisible, atomic action

An application using TOB would be Bitcoin; for the blockchain, we want to make sure that everybody gets messages in the same order, for consistency. More generally though, total ordering is useful for any replicated state machine where replicas need to treat requests in the same order to preserve consistency.

Properties

The properties are the same as those of (uniform) reliable broadcast, but with an added total order property.

  • TOB1. Validity = RB1 = BEB1
  • TOB2. No duplication = RB2 = BEB2
  • TOB3. No creation = RB3 = BEB3
  • (U)TOB4. (Uniform) Agreement = (U)RB4
  • (U)TOB5. (Uniform) Total Order: Let and be any two messages. Let be any (correct) process that delivers without having delivered before. Then no (correct) process delivers before .

Consensus-based Algorithm

The algorithm can be implemented with consensus, which is the next section1.

The intuition of the algorithm is that we first disseminate messages using RB. This imposes no particular order, so the processes simply store the messages in unordered. At this point, we have no guarantees of dissemination or ordering; it’s even possible that no processes have the same sets.

To solve this, we use consensus to decide on a single set; we order the messages in that set, and then deliver.

There are multiple rounds of this consensus, which we count in the round variable. The consensus helps us decide on a set of messages to deliver in that round. We use the wait variable to make sure that we only hold one instance of consensus at once.

Note that while one consensus round is ongoing, we may amass multiple messages in unordered. This means that consensus may lead us to decide on multiple messages to deliver at once.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Implements:
    TotalOrderBroadcast (tob)
Uses:
    ReliableBroadcast (rb)
    Consensus (cons)
Events:
    Request: <tobBroadcast, m>: broadcasts a message m to all processes
    Indication: <tobDeliver, src, m>: delivers a message m broadcast by src
Properties:
    TOB1, TOB2, TOB3, TOB4, TOB5

upon event <tob, Init>:
    unordered := ;
    delivered := ;
    wait := false;
    round := 1;

upon event <tobBroadcast, m> do:
    trigger <rbBroadcast, m>

# Save received broadcasts for later:
upon event <rbDeliver, src_m, m> and (m  delivered) do:
    unordered := unordered  {(src_m, m)};

# When no consensus is ongoing and we have 
# unordered messages to propose:
upon (unordered != ) and (not wait) do:
    wait := true;
    initialize instance of consensus;
    trigger <propose, unordered>;

# When consensus is done:
upon event <decide, decided> do:
    unordered := unordered \ {decided};
    ordered = sort(decided);
    for (src_m, m) in ordered:
        trigger <tobDeliver, src_m, m>;
        delivered := delivered  {m};
    round := round + 1;
    wait = false;

We assume that the sort function is deterministic and that all processes run the exact same sorting routine. We run this function to be sure that all processes traverse and deliver the decided set in the same order (usually, sets do not offer any ordering guarantees, though this is somewhat of an implementation detail).

Our total order broadcast is based on consensus, which we describe below.

Consensus (CONS)

In the (uniform) consensus problem, the processes all propose values, and need to agree on one of these propositions. This gives rise to two basic events: a proposition (<propose, v>), and a decision (<decide, v>). Solving consensus is key to solving many problems in distributed computing (total order broadcast, atomic commit, …).

Blockchain is based on consensus. Bitcoin mining is actually about solving consensus: a leader is chosen to decide on the broadcast order, and this leader gains 50 bitcoin. Seeing that this is a lot of money, many people want to be the leader; but we only want a single leader. Nakamoto’s solution is to choose the leader by giving out a hard problem. The computation can only be done with brute-force, there are no smart tricks or anything. So people put enormous amounts of energy towards solving this. Usually, only a single person will win the mining block; the probability is small, but the original Bitcoin paper specifies that we should wait a little before rewarding the winner, in case there are two winners.

Properties

The properties that we would like to see are:

  • C1. Validity: if a value is decided, it has been proposed
  • (U)C2. (Uniform) Agreement: no two correct (any) processes decide differently
  • C3. Termination: every correct process eventually decides
  • C4. Integrity: every process decides at most once

Termination and integrity together imply that every correct process decides exactly once. Validity ensures that the consensus may not invent a value by itself. Agreement is the main feature of consensus, that every two correct processes decide on the same value.

When we have uniform agreement (UC2), we want no processes to decide differently, no matter if they are faulty or correct. In this case, we talk about uniform consensus.

We can build consensus using total order broadcast, which is described above. But total broadcast can be built with consensus. It turns out that consensus and total order broadcast are equivalent problems in a system with reliable channels.

Algorithm 1: Fail-Stop Consensus

Suppose that there are processes in , with IDs . At the beginning, every process proposes a value; to decide, the processes go through rounds incrementally. At each round, the process with the ID corresponding to the round number is the leader of the round.

The leader decides its current proposal and broadcasts it to all. A process that is not the leader waits. This means that in a given round , only the leader process is broadcasting. Additionally, a process only decides when it is the leader.

The non-leader processes can either deliver the proposal of the leader to adopt it, or detect that the leader has crashed. In any case, we can move on to the next round at that moment.

Now that we understand the properties of the algorithm, let’s take a look at an example run.

Process 1 is the first to be the leader. Once it gets a proposal from the application layer, it decides on it, and broadcasts it to the others. However, let’s suppose it crashes before getting to broadcast (BEB fails in this case). Then, the other processes will detect the crash with , and go to the next round.

Process 2 is now the leader. Since it doesn’t have a proposal from process 1, it will have to get one from the application layer. Once it has it, it can broadcast it to the others. Let’s assume this goes smoothly, and all processes receive it. They can all go to the next round; from now on, whatever the application layer proposes, they have to obey the decision from the previous leader.

This should make it clear why the algorithm is also known as “hierarchical consensus”: every process must obey the decisions of the process above it (with a smaller index), as long as they don’t crash. We can think of this as a sort of line to the throne: we use the proposal of whoever is on the throne. If they die, number 2 in line decides, and so on.

Note that the rounds are not global time; we may make them so in examples for the sake of simplicity, but rounds are simply a local thing, which are somewhat synchronized by message passing from the leader.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Implements:
    Consensus (cons)
Uses:
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <propose, v>: proposes value v for consensus
    Indication: <decide, v>: outputs a decided value v of consensus
Properties:
    C1, C2, C3, C4

upon event <cons, Init> do:
    suspected := ;     # list of suspected processes
    round := 1;         # current round number
    proposal := nil;    # current proposal
    broadcast := false; # whether we've already broadcast
    delivered := [];    # whether we've received a proposal from a process

upon event <crash, p> do:
    suspected := suspected  {p};

# If we don't already have a decision from the leader,
# take our own proposal
upon event <Propose, v> do:
    if proposal = nil:
        proposal := v;

# When we receive a decision from the leader, use that:
upon event <bebDeliver, leader, [Decided, v]> do:
    proposal := v;
    delivered[leader] := true;

# If we've received a proposal from the leader, 
# or if the leader has crashed, go to the next round:
upon event delivered[round] = true or round  suspected do:
    round := round + 1;

# When we are the leader and we have a value to propose
# (which may be ours, or one that we have from the previous leader),
# we broadcast it, and deliver the decision to the application layer
upon event round = self and broadcast = false and proposal != nil do:
    trigger <decide, proposal>;
    trigger <bebBroadcast, [Decided, proposal]>;
    broadcast = true;

Since this algorithm doesn’t aim for uniform consensus (but only regular consensus), it can tolerate failures; as long as one process remains correct, it will decide on a value.

Let’s formulate a short correctness argument for the algorithm:

  • Validity follows from the algorithm and BEB1 (validity)
  • Agreement can be proven as follows. Let with ID be the correct process with the smallest ID in a run. Suppose it decides on some value .
    • If , then is the only correct process
    • Otherwise, in round , ATTA, all correct processes with receive and will not decide differently from
  • Termination follows from PFD1 (strong completeness) and BEB1 (validity): no process will remain indefinitely blocked in a round; every correct process will eventually reach round and decide in that round
  • Integrity follows from the algorithm and BEB1 (validity)

Algorithm 2: Fail-Stop Uniform Consensus

The previous algorithm does not guarantee uniform agreement. The problem is that that some of the processes decide too early, without making sure that their decision has been seen by enough processes (remember that if the broadcaster fails in BEB, then we have no guarantee that all processes receive the broadcast). It could therefore decide on a value, and then crash before anybody receives it, which would violate uniform agreement (UC2). The other processes might then have no choice but to decide on a different value.

To fix this, the idea is to do the same thing as before, but instead of deciding at round , we wait until the last round . The resulting algorithm is simply called “hierarchical uniform consensus”.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Implements:
    UniformConsensus (ucons)
Uses:
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <propose, v>: proposes value v for consensus
    Indication: <decide, v>: outputs a decided value v of consensus
Properties:
    C1, UC2, C3, C4

upon event <ucons, Init> do:
    suspected := ;     # list of suspected processes
    round := 1;         # current round number
    proposal := nil;    # current proposal
    broadcast := false; # whether we've already broadcast
    decided := false;   # whether we've already decided
    delivered := [];    # whether we've received a proposal from a process

upon event <crash, p> do:
    suspected := suspected  {p};

# If we don't already have a decision from the leader,
# take our own proposal
upon event <Propose, v> do:
    if proposal = nil:
        proposal := v;

# When we receive a decision from the leader, use that:
upon event <bebDeliver, leader, [Decided, v]> do:
    proposal := v;
    delivered[leader] := true;

# If we've received a proposal from the leader,  
# or if the leader has crashed, go to the next round.
# If it's the last round, we can deliver the decision 
# to the application layer.
upon event delivered[round] = true or round  suspected do:
    if round = N and not decided:
        trigger <decide, proposal>;
        decided := true;
    else:
        round := round + 1;

# When we are the leader and we have a value to propose
# (which may be ours, or one that we have from the previous leader),
# we broadcast it
upon event round = self and broadcast = false and proposal != nil do:
    trigger <bebBroadcast, [Decided, proposal]>;
    broadcast = true;

For the correctness argument, we’ll need to introduce a short lemma: if completes round without receiving any message from , and , then crashes by the end of round .

Proof of the lemma

We’ll do a proof by contradiction: suppose completes round without receiving a message from , and completes round .

Since completed round without hearing from , ATTA, it must be because suspects in round . We’re using a perfect failure detector . So in round , we either have:

  • suspects , which is impossible because crashes before
  • receives the round message from , which is also impossible because crashed before completes round

We have proved the contradiction in the inverse, and thus the lemma.

  • Validity follows from the algorithm and BEB1 (validity)
  • Termination follows from PFD1 (strong completeness) and BEB1 (validity): no process will remain indefinitely blocked in a round; every correct process will eventually reach round and decide in that round
  • Uniform Agreement can be proven as follows.

    Let with ID be the process with the smallest ID which decides on some value. This implies that it completes round .

    By the above lemma, in round , every with receives and adopts the proposal of . Thus, every process which sends a message after round , or which decides, has the same proposal at the end of round .

  • Integrity follows from the algorithm and BEB1 (validity)

Algorithm 3: Uniform Consensus with Eventually Perfect Failure Detector

The two previous algorithms relied on perfect failure detectors. What happens if we use an eventually perfect failure detector instead?

The problem is that that only has eventual strong accuracy (EPFD2). This means that correct processes may be falsely suspected a finite number of time, which breaks the two previous algorithms: if a process is falsely suspected by everyone, and it falsely suspects everyone, then all the others would do consensus without it, and decide differently from it.

This algorithm relies on a majority of processes being correct (i.e. it can handle failures). The solution is a little involved, so we won’t give pseudo-code for it. Instead, we’ll just try to get an overarching idea of what goes on.

The algorithm is also round-based: processes still move incrementally from one round to the next. Process is the leader at round , where . In such a round, tries to decide:

  • It succeeds if it is not suspected.
  • It fails if it is suspected. Processes that suspect inform it (with a negative acknowledgment, NACK, message), and everybody moves on to the next round (including ).

If it succeeds, it uses RB to send the decision to all. It’s important to use RB at this step (not BEB) to preclude the case where crashes while broadcasting. This would allow for a situation where some nodes have delivered, and others haven’t.

Within a round , decides on a value in three steps:

  1. It collects propositions from the other processes, and chooses a value proposed by the majority.
  2. It broadcasts the chosen value back, and processes change their proposal to the value broadcast by . The processes send an acknowledgment.
  3. If everyone acks, it decides, and broadcasts the decision. When others receive the decision, they decide on the given value.

If a process suspects it at any point, it sends a NACK and everyone moves on. But the decided value may still have disseminated among the processes, since broadcasts the value before deciding. Thus, we have progress (because we went with the majority value, so we’re advancing towards consensus, or at worst, not moving).

Let’s take a look at a correctness argument:

  • Validity is trivial
  • Uniform agreement: Let be the first round in which some leader process decides on a value . This means that, in round , a majority of processes have adopted . ATTA, no value other than will be proposed, and therefore decided, henceforth.
  • Termination states that every correct process eventually decides. If a correct process decides, it uses RB to send the decision to all, so every correct process decides.
  • Integrity is trivial

Atomic commit

The unit of data processing in a distributed system is the transaction. A transaction describes the actions to be taken, and can be terminated either by committing or aborting.

Non-Blocking Atomic Commit (NBAC)

The nonblocking atomic commit (NBAC) abstraction is used to solve this problem in a reliable way. As in consensus, every process proposes an initial value of 0 or 1 (no or yes), and must decide on a final value 0 or 1 (abort or commit). Unlike consensus, the processes here seek to decide 1, but every process has a veto right.

The properties of NBAC are:

  • NBAC1. Uniform Agreement: no two processes decide differently
  • NBAC2. Termination: every correct process eventually decides
  • NBAC3. Commit-validity: 1 can only be decided if all processes propose 1
  • NBAC4. Abort-validity: 0 can only be decided if some process crashes or votes 0

Note that here, NBAC must decide to abort if some process crashes, even though all processes have proposed 1 (commit).

We can implement NBAC using three underlying abstractions:

  • A perfect failure detector
  • Uniform consensus
  • Best-effort broadcast BEB

It works as follows: every process broadcasts its initial vote (0 or 1, abort or commit) to all other processes using BEB. It waits to hear something from every process in the system; this is either done through beb-delivery from , or by detecting the crash of . At this point, two situations are possible:

  • If gets 0 (abort) from any other process, or if it detects a crash, it invokes consensus with a proposal to abort (0).
  • Otherwise, if it receives the vote to commit (1) from all processes, then it invokes consensus with a proposal to commit (1).

Once the consensus is over, every process NBAC-decides according to the outcome of the consensus.

We can write this more formally:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Implements:
    nonBlockingAtomicCommit (nbac)
Uses:
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
    UniformConsensus (ucons)
Events:
    Request: <nbacPropose, v>
    Indication: <nbacDecide, v>
Properties:
    NBAC1, NBAC2, NBAC3, NBAC4


upon event <nbac, Init> do:
    prop := 1;
    delivered := ;
    correct := Π;

upon event <crash, p> do:
    correct := correct \ {p};

# Broadcast proposals to others:
upon event <propose, v> do:
    trigger <bebBroadcast, v>;

# Register proposal broadcasts from others:
upon event <bebDeliver, src, v> do:
    delivered := delivered  {src};
    prop := prop * v;

# When all correct processes have delivered,
# initialize consensus by proposing prop
upon event correct  delivered do:
    if correct != Π:
        prop := 0;
    trigger <uconsPropose, prop>;

upon event <uconsDecide, decision> do:
    trigger <nbacDecide, decision>;

We use multiplication to factor in the decisions we get from other processes; if we get a single 0, the final proposition will be 0 too. If we only get ones, the final proposition will be 1 too. Otherwise, this should be a fairly straight-forward implementation of the description we gave.

We need a perfect failure detector . An eventually perfect failure detector is not enough, because we may suspect a process: this leads us to run uniform consensus with a proposal to abort, and consequently decide to abort. After this whole ordeal, we may find out that it wasn’t crashed after all, and the previously suspected process would never decide, which violates termination.

2-Phase Commit (2PC)

This is a blocking algorithm, meaning that a crash will result in the algorithm being stuck. Unlike NBAC, this algorithm does not use consensus. It operates under a relaxed set of constraints; the termination property has been replaced with weak termination, which just says that if a process doesn’t crash, then all correct processes eventually decide.

In 2PC, we have a leading coordinator process which takes the decision. It asks everyone to vote, makes a decision, and notifies everyone of the decision.

As the name indicates, there are two phases in this algorithm:

  1. Voting phase: As before, proposals are sent with best-effort broadcast. A process collects all these proposals.
  2. Commit phase: Again, just as before, it decides to abort if it receives any abort proposals, or if it detects any crashes with its perfect failure detector. Otherwise, if it receives proposals to commit from everyone, it will decide to commit. It then sends this decision to all processes with BEB.

If crashes, all processes are blocked, waiting for its response.

Terminating reliable broadcast (TRB)

Like reliable broadcast, terminating reliable broadcast (TRB) is a communication primitive used to disseminate a message among a set of processes in a reliable way. However, TRB is stricter than URB.

In TRB, there is a specific broadcaster process , known by all processes. It is supposed to broadcast a message . We’ll also define a distinct message . The other processes need to deliver if is correct, but may deliver if crashes.

The idea is that if crashes, the other processes may detect that it’s crashed, without having ever received . But this doesn’t mean that wasn’t sent; may have crashed while it was in the process of sending , so some processes may have delivered it while others might never do so.

For a process , the following cases cannot be distinguished:

  • Some other process has delivered ; this means that should keep waiting for it
  • No process will ever deliver ; this means that should not keep waiting for it

TRB solves this by adding this missing piece of information to (uniform) reliable broadcast. It ensures that every process either delivers the message or sends a failure indicator .

Properties

The properties of TRB are:

  • TRB1. Integrity: If a process delivers a message , then either , or that was broadcast by
  • TRB2. Validity: If the sender is correct and broadcasts a message , then eventually delivers
  • (U)TRB3. (Uniform) Agreement: For any message , if a correct process (any process) delivers , then every correct process delivers
  • TRB4. Termination: Every correct process eventually delivers exactly one message

Unlike reliable broadcast, every correct process delivers a message, even if the broadcaster crashes. Indeed, with (uniform) reliable broadcast, when the broadcaster crashes, the other processes may deliver nothing.

Algorithm

The following algorithm implements consensus-based uniform terminating reliable broadcast. To implement regular (non-uniform) TRB, we can just use regular (non-uniform) consensus.

All processes wait until they receive a message from the source , or until they detect that it has crashed. By the validity property of BEB, and the properties of a perfect failure detector, no process is ever left waiting forever.

They then invoke uniform consensus to know whether to deliver or .

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Implements:
    trbBroadcast (trb)
Uses:
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
    Consensus (ucons)
Events:
    Request: <trbBroadcast, m>:  broadcasts a message m to all processes
    Indication: <trbDeliver, m>: delivers a message m, or the failure ϕ
Properties:
    TRB1, TRB2, TRB3, TRB4

upon event <trb, Init>:
    proposal := nil;
    correct := Π;

# When application broadcasts:
upon event <trbBroadcast, m> do:
    trigger <bebBroadcast, m>;

# When the perfect failure detector detects that
# the broadcaster p_src has crashed:
upon event <crash, p> and (proposal = nil) do:
    if p = p_src:
        proposal := ϕ;

# Otherwise, if we successfully receive from p_src:
upon event <bebDeliver, p, m> and (proposal = nil) do:
    if p = p_src:
        proposal := m;

# Start consensus as soon as we have a proposal:
upon event (proposal != nil) do:
    trigger <propose, proposal>;

# Deliver results of consensus:
upon event <decide, decision> do:
    trigger <trbDeliver, p_src, decision>;

Let’s take a look at the scenario where broadcasts to processes , and . It broadcasts using BEB, so it can crash while broadcasting, and some processes wouldn’t receive the message. Suppose and got the message, but did not; instead it detects that has crashed. In the consensus round, will propose , while the two other processes propose . Since they are in the majority, the result of the consensus will be a decision to deliver (remember algorithm 3 for uniform consensus, which picks the majority value). But in this scenario, would also have been a valid result.

Failure detector

The TRB algorithm uses the perfect failure detector , which means that is is sufficient. Is it also sufficient? We’ll argue that it is, because we can implement with TRB (meaning that it’s necessary):

Assume that every process is the broadcaster , and can use an infinite number of instances of TRB. The algorithm is as follows:

  1. Every process keeps broadcasting messages with TRB
  2. If a process delivers , it suspects

This algorithm uses non-uniform TRB, i.e. just respecting agreement (not uniform agreement).

Group membership (GM)

Many of the algorithms we’ve seen so far require some knowledge of the state of other processes in the network . In other words, we need to know which processes are participating in the computation and which are not. So far, we’ve used failure detectors to get this information.

The problem with failure detectors is that they are not coordinated, even when the failure detector is perfect. The outputs of failure detectors in different processes are not always the same: we may get notifications about crashes in different orders and at different times (because of delays in the network), and thus obtain different perspectives of the system’s evolution.

The group membership abstraction solves this problem, giving us consistent, accurate and better coordinated information about the state of processes.

In this course, we’ll only use group membership to give coordinated information about crashes, but it’s useful to know that it can also be used to coordinate processes joining or leaving the set explicitly (i.e. without crashing, but instead leaving voluntarily). This enables dynamic changes in the set of processes . So far, we’ve assumed that is a static set of processes, but group membership allows us to handle dynamic sets.

Properties

A group is the set of processes participating in the computation. The current membership is called a view. A view is a pair , where is the numbering of the view, and is a set of processes.

The views are numbered by the number of changes the set of processes has gone through previously. As such, the first view is identified by , and (so ).

When the view changes, we get an indication event <membView, V>; we say that processes install this new view.

The properties for the group membership abstraction in this course are:

  • Memb1. Local Monotonicity: If a process installs view after , then and (the only reason to change a view is to remove a process from the set when it crashes).
  • Memb2. Uniform Agreement: No two processes install views and such that .
  • Memb3. Completeness: If a process crashes, then there is an integer such that every correct process installs view in which
  • Memb4. Accuracy: If some process installs a view and then has crashed.

Algorithm

The implementation uses uniform consensus and a perfect failure detector.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Implements:
    GroupMembership (memb)
Uses:
    UniformConsensus (ucons)
    PerfectFailureDetector (P)
Events:
    Indication: <membView, V>
Properties:
    Memb1, Memb2, Memb3, Memb4

upon event <memb, Init> do:
    view := (0, Π);
    correct := Π;
    wait := true;

upon event <crash, p> do:
    correct := correct \ {p};

# When we've detected a crash and we aren't waiting for
# consensus, trigger new consensus for view.
upon event (correct  view.memb) and (not wait) do:
    wait := true;
    trigger <uconsPropose, (view.id + 1, correct)>;

# When consensus is done, install the new view:
upon event <uconsDecide, (id, memb)> do:
    view := (id, memb);
    wait := false;
    trigger <membView, view>;

We use a wait variable: this allows to prevent a process from triggering a new view installation before the previous one has been done.

View-Synchronous broadcast (VS)

View-synchronous broadcast is the abstraction resulting from the combination of group membership and reliable broadcast. It ensures that the delivery of messages is coordinated by the installation of views.

Properties

We aim to ensure all the properties of group membership (Memb1, Memb2, Memb3, Memb4) and of reliable broadcast (RB1, RB2, RB3, RB4). On top of this, we also aim to ensure the following property:

  • VS1. View inclusion: A message is vsDelivered in the view where it is vsBroadcast.

Unfortunately, this property doesn’t come for free. Combining VS and GM introduces a subtle problem that we’ll have to solve, justifying the introduction of a solution as a new abstraction. Indeed, if a message is broadcast right as we’re installing a view, we’re breaking things.

Consider that a group of processes are exchanging messages, and process crashes. This failure is detected, and the other processes install a new view , with . After that, suppose that process delivers a message that was originally broadcast by (this can happen because of delays in the network). But it doesn’t make sense to deliver messages from processes that aren’t in the view to the application layer.

At this point, the solution may seem straightforward: allow to discard messages from . Unfortunately, it’s possible that a third process has delivered before the view was installed. At this point, process must essentially chose between two conflicting goals: either deliver to ensure agreement (RB4), or discard it and guarantee view inclusion (VS1).

To solve this, we must introduce some notion of phases in which messages can or cannot be sent.

Algorithm 1: TRB-based VS

VS broadcast extends both RB and GM, so its interface must have events of both primitives. In addition to that, we need to add two more events for blocking communications when we’re about to install a view.

Note that these events for blocking communications aren’t between processes: they’re a contract between the VS algorithm and the layer above (i.e. the application layer). If the application layer keeps broadcasting messages, installing a view may be postponed indefinitely. Therefore, when we need to install a view, we ask the application layer to stop broadcasting in the current view by indicating a <vsBlock> event. When the higher level module agrees, it replies by the requests of <vsBlockOk>.

We assume that the application layer indeed is well-behaved, and does not broadcast any further in the current view after the <vsBlockOk>. It can start broadcasting again once a new view is installed (<vsView, V>).

The key element of this algorithm is a flush procedure, which the processes execute when the GM changes the view. This procedure uses uniform TRB to rebroadcast messages that it has vsDelivered in the current view.

For normal data transfer within a view, we attach the view id to each message, and use BEB to broadcast. On the opposite side, when messages are BEB delivered (with a view id matching the current view), it can immediately vsDeliver. It’s also important the receiver saves the message to delivered, so that it can replay it during the flush procedure.

We start the flush procedure when GM installs a view. We first ask the application to stop broadcasting; when we receive the OK, we stop vsDelivering, and discard all BEB messages. We can then resend all messages we vsDelivered previously (which are saved in delivered) using an instance of TRB for each destination process.

We then receive all flush messages from the other processes. When we have received all flushes, we can move on to the next view.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
Implements:
    ViewSynchrony (vs)
Uses:
    GroupMembership (memb)
    UniformTerminatingReliableBroadcast (utrb)
    BestEffortBroadcast (beb)
Events:
    Request: <vsBroadcast, m>: broadcasts m to all processes
    Indication: <vsDeliver, src, m>: delivers message m broadcast by src
    Indication: <vsView, V>: Installs a view V = (id, M)
    Indication: <vsBlock>: requests that no new messages are 
                           broadcast temporarily, until next view is installed
    Request: <vsBlockOk>: confirms that no new messages will be
                          broadcast until next view is installed
Properties:
    RB1, RB2, RB3, RB4
    Memb1, Memb2, Memb3, Memb4
    VS1

upon event <vs, Init> do:
    view := (0, Π);    # currently installed view
    nextView := nil;   # next view to install after flushing
    pending := [];     # FIFO queue of pending views
    delivered := ;    # set of delivered messages in current view
    trbDone := ;      # set of processes done flushing with uTRB
    flushing := false; # whether we're currently flushing
                       # messages in order to install a view
    blocked := false;  # whether the application layer is blocked

#############################
# Part 1: Data transmission #
#############################

# Attach view ID to all messages we will broadcast:
upon event <vsBroadcast, m> and (not blocked) do:
    delivered := delivered  {m};
    trigger <vsDeliver, self, m>;
    trigger <bebBroadcast, [Data, view.id, m]>;

# Deliver new messages from same view:
upon event <bebDeliver, src, [Data, view_id, m]> do:
    if (view.id = view_id) and (m  delivered) and (not blocked):
        delivered := delivered  {m};
        trigger <vsDeliver, src, m>;

#######################
# Part 2: View change #
#######################

# Append new view to pending:
upon event <membView, V> do:
    pending.append(V);

# When we need to switch view, initiate flushing by
# requesting vsBlock from application layer:
upon event (pending != ) and (not flushing) do:
    nextView := pending.pop(); # get head of queue
    flushing := true;
    trigger <vsBlock>;

# When application layer replies OK, block and flush:
upon event <vsBlockOk> do:
    blocked := true;
    trbDone := ;
    trigger <trbBroadcast, self, (view.id, delivered)>;

# Get flushes and deliver missing messages:
upon event <trbDeliver, src, (view_id, view_delivered)> do:
    trbDone := trbDone  {src};
    forall m  view_delivered and m  delivered do:
        delivered := delivered  {m};
        trigger <vsDeliver, src, m>;

# If we get ϕ, we can consider the process to be done flushing:
upon event <trbDeliver, src, ϕ> do:
    trbDone := trbDone  {src};

# Once we have all flushes, we can go to the next view:
upon event (trbDone = view.memb) and (blocked = true) do:
    view := nextView;
    flushing := false;
    blocked := false;
    delivered := ;
    trigger <vsView, view>;

Algorithm 2: Consensus-based VS

The previous algorithm is uniform in the sense that no two processes install different views. But it isn’t uniform in terms of message delivery, as one process may vsDeliver a message and crash, while no other processes deliver that message.

So we need to revise the previous algorithm to get uniform VS. Instead of launching parallel instances of TRB, plus a group membership, we can use a consensus instance and parallel broadcasts for every view change.

The idea is that when detects a failure, the processes exchange the messages they have delivered, and use consensus to agree on the membership and message set.

The data transmission works as previously. However, for the view change, we use consensus to agree on the message set (stored in dset).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
Implements:
    ViewSynchrony (vs)
Uses:
    UniformConsensus (ucons)
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <vsBroadcast, m>: broadcasts m to all processes
    Indication: <vsDeliver, src, m>: delivers message m broadcast by src
    Indication: <vsView, V>: Installs a view V = (id, M)
    Indication: <vsBlock>: requests that no new messages are 
                           broadcast temporarily, until next view is installed
    Request: <vsBlockOk>: confirms that no new messages will be
                          broadcast until next view is installed
Properties:
    RB1, RB2, RB3, RB4
    Memb1, Memb2, Memb3, Memb4
    VS1

upon event <vs, Init> do:
    view := (0, Π);
    correct := Π;
    flushing := false;
    blocked := false;
    delivered := ;
    dset := ;

#############################
# Part 1: Data transmission #
#############################

# Same as before

upon event <vsBroadcast, m> and (not blocked) do:
    delivered := delivered  {m};
    trigger <vsDeliver, self, m>;
    trigger <bebBroadcast, [Data, view.id, m]>;

upon event <bebDeliver, src, [Data, view_id, m]> do:
    if (view.id = view_id) and (m  delivered) and (not blocked):
        delivered := delivered  {m};
        trigger <vsDeliver, src, m>;

#######################
# Part 2: View change #
#######################

upon event <crash, p> do:
    correct := correct \ {p};
    if not flushing:
        flushing := true;
        trigger <vsBlock>;

upon event <vsBlockOk> do:
    blocked := true;
    trigger <bebBroadcast, [DSET, view.id, delivered]>;

upon event <bebDeliver, src, [DSET, view_id, m_set]> do:
    dset := dset  (src, m_set);
    if forall p  correct, (p, _)  dset:
        trigger <uconsPropose, view.id + 1, correct, dset>;

upon event <uconsDecide, view_id, view_members, view_dset> do:
    forall (p, mset)  view_dset such that p  view_members do:
        forall (src, m)  mset such that m  delivered do:
            delivered := delivered  {m};
            trigger <vsDeliver, src, m>;
    view := (view_id, view_members);
    flushing := false;
    blocked := false;
    dset := ;
    delivered := ;
    trigger <vsView, view>;

Algorithm 3: Consensus-based Uniform VS

Using URB instead of BEB does not ensure uniformity. Therefore, a few changes are necessary.

As in algorithm 1 and 2, to vsBroadcast, we simply bebBroadcast and attach the view ID in a Data message. But now, when receiving these bebBroadcast, we mark the source as having acknowledged, and we acknowledge ourselves by re-broadcasting the message. We also add the message to the set of messages that have been broadcast in pending. This variable contains all messages that have been received in the current view. The set of processes that have acknowledged a message is stored in ack[m].

We also maintain a variable delivered containing all messages ever vsDelivered. We can vsDeliver and add to delivered when all processes in the current view are contained in ack[m] (this is similar to what we did for URB).

When detects a crash, we initiate a flush. This process first bebBroadcasts the contents of pending (which contains all messages from the current view). It’s possible that not all messages in this set have been vsDelivered, so as soon as we’ve collected all other uncrashed processes’ pending, we can initiate a consensus about the new view, and about the union of all the pending sets it has received.

When consensus decides, we vsDeliver all the pending messages in the consensus decision, and install the new view.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
Implements:
    UniformViewSynchrony (uvs)
Uses:
    UniformConsensus (ucons)
    BestEffortBroadcast (beb)
    PerfectFailureDetector (P)
Events:
    Request: <uvsBroadcast, m>: broadcasts m to all processes
    Indication: <uvsDeliver, src, m>: delivers message m broadcast by src
    Indication: <uvsView, V>: Installs a view V = (id, M)
    Indication: <uvsBlock>: requests that no new messages are 
                            broadcast temporarily, until next view is installed
    Request: <uvsBlockOk>: confirms that no new messages will be
                           broadcast until next view is installed
Properties:
    URB1, URB2, URB3, URB4
    Memb1, Memb2, Memb3, Memb4
    VS1

upon event <uvs, Init> do:
    view := (0, Π);
    correct := Π;
    flushing := false;
    blocked := false;
    pending := ;
    delivered := ;
    dset := ;
    ack[m] := ; # set of processes having ack'ed m

#############################
# Part 1: Data transmission #
#############################

upon event <uvsBroadcast, m> and (not blocked) do:
    pending := pending  {(self, m)};
    # do not vsDeliver to self yet!
    trigger <bebBroadcast, [Data, view.id, self, m]>;

upon event <bebDeliver, sender, [Data, view_id, src, m]> and (not blocked) do:
    if view.id = view_id:
        ack[m] := ack[m]  {sender};
        if m  pending:
            pending := pending  {(src, m)};
            trigger <bebBroadcast, [Data, view.id, src, m]>; # ack! 

# When all processes have acked a pending, undelivered message:
upon exists (src, m)  pending 
  such that (view.members  ack[m]) and (m  delivered) do:
    delivered := delivered  {m};
    trigger <uvsDeliver, src, m>;

#######################
# Part 2: View change #
#######################

upon event <crash, p> do:
    correct := correct \ {p};
    if not flushing:
        flushing := true;
        trigger <uvsBlock>;

upon event <uvsBlockOk> do:
    blocked := true;
    trigger <bebBroadcast, [DSET, view.id, pending]>;

upon event <bebDeliver, src, [DSET, view_id, m_set]> do:
    dset := dset  (src, m_set);
    if forall p  correct, (p, _)  dset:
        trigger <uconsPropose, view.id + 1, correct, dset>;

upon event <uconsDecide, view_id, view_members, view_dset> do:
    forall (p, mset)  view_dset such that p  view_members do:
        forall (src, m)  mset such that m  delivered do:
            delivered := delivered  {m};
            trigger <uvsDeliver, src, m>;
    view := (view_id, view_members);
    flushing := false;
    blocked := false;
    dset := ;
    pending := ;
    delivered := ;
    trigger <uvsView, view>;

Shared Memory (SM)

In this section, we’ll take a look at shared memory through a series of distributed algorithms that enable distributed data storage through read and write operations. These shared memory abstractions are called registers, since they resemble one.

The variations we’ll look at vary in the number of processes that can read or write. Specifically, we’ll look at:

  • regular register
  • atomic register
  • atomic register

The tuple notation above represents the supported number of writers and readers, respectively, so means one process can write, and can read. As we’ll see, the difference between regular and atomic registers lies in the concurrency guarantees that they offer.

(1, N) Regular register

This register assumes only one writer, but an arbitrary number of readers. This means that one specific process can write, but any process (including ) can read.

Properties

A (1, N) regular register provides the following properties:

  • ONRR1 Termination: if a correct process invokes an operation, then the operation eventually completes
  • ONRR2 Validity:
    • Any read not concurrent with a write returns the last value written
    • Reads concurrent with a write return the last value written or the value concurrently being written

A note about ONRR2: if the writer crashes, the failed write is considered to be concurrent with all concurrent and future reads

Therefore, a read after a failed write can return the value that was supposed to be written, or the last value written before that.

In any case, reads always return values that have been, are being, or have been attempted to be written. In other words, read values can’t be created out of thin air, they must come from somewhere.

Algorithm 1: fail-stop with perfect failure detection

Let’s take a look at how we can implement this with a message passing model. The following fail-stop algorithm is fairly simple. It uses a perfect failure detector (eventually perfect would not be enough).

To read, the algorithm simply returns the locally stored value. To write, it bebBroadcasts a Write message with the new value. Other processes can acknowledge this with a PL, and update the value (including the process that is being written to, as it also broadcasts the Write to itself). Once every process has ack’ed, we can complete the write operation by triggering <onrrWriteReturn>.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Implements:
    (1, N)-RegularRegister (onrr)
Uses:
    BestEffortBroadcast (beb)
    PerfectLinks (pp2p)
    PerfectFailureDetector (P)
Events:
    Request: <onrrRead>: invokes a read on the register
    Request: <onrrWrite, v>: invokes a write with value v on the register
    Indication: <onrrReadReturn, v>: completes a read, returns v
    Indication: <onrrWriteReturn>: completes a write on the register
Properties:
    ONRR1, ONRR2

upon event <onrr, Init> do:
    val := nil;    # register value
    correct := Π;  # set of correct processes 
    acked := ;    # set of processes that have ACK'ed the write

upon event <crash, p> do:
    correct := correct \ {p};

upon event <onrrRead> do:
    trigger <onrrReadReturn, val>;

upon event <onrrWrite, v> do:
    trigger <bebBroadcast [Write, v]>;

upon event <bebDeliver, src, [Write, v]> do:
    val := v;
    trigger <plSend, src, ACK>;

upon event <plDeliver, src, ACK> do:
    acked := acked  {src};

upon correct  acked do:
    acked := ;
    trigger <onrrWriteReturn>;

The above algorithm is correct, as:

  • ONRR1 Termination
    • ATTA, all reads are local and eventually return, so termination for reads is trivial.
    • ATTA, writes eventually return, because and any process that doesn’t send back an ack crashes, and any process that crashes is detected. ATTA, both cases are handled, so we will eventually return. This is proven by:
      • PFD1, the strong completeness property of
      • PL1, the reliable delivery of the perfect link channels
  • ONRR2 Validity:
    • In the absence of concurrent or failed operation, a read returns the last value written. To prove this, assume that a write(x) terminates, and no other write is invoked.

    By PFD2 (strong accuracy of ), the value of the register at all processes that didn’t crash is x. Any subsequent read() at process returns the value at , which is the last written value.

    • A read returns the value concurrently written or last value written. This should be fairly clear ATTA, but the book has a more detailed proof.

Since we used PFD2 of in the above proof, we need a perfect failure detector. Without that, we may violate the ONRR2 validity property of the register.

Algorithm 2: Fail-silent without failure detectors

Under the assumption that a majority of the processes are correct, we can actually implement (1, N) regular registers without failure detectors. This majority assumption is needed for this algorithm, even if we were to add an eventually perfect failure detector.

The key idea is that the writer process and all reader processes should use a set of witnesses that keep track of the most recent value of the register. Each set of witnesses must overlap: this forms quorums (defined as a collection of sets so that no two sets’ intersection is empty). In our case, we consider a very simple form of quorum, namely a majority.

Like the previous algorithm, we store the register value in val; in addition to it, we also store a timestamp ts, counting the number of write operations.

When the writer process writes, it increments the timestamp, and bebBroadcasts a Write message to all processes. The processes can adopt the value by storing it locally if the timestamp is larger than the current one, and acknowledging through a PL. Once has such an acknowledgment from a majority of processes, it completes the write.

To read a value, the reader bebBroadcasts a Read message to all processes. Every process replies through a PL with its current value and timestamp. Once the reader has replies from a majority of processes, it selects the one with the highest timestamp, which ensures that the last value written is returned.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Implements:
    (1, N)-RegularRegister (onrr)
Uses:
    BestEffortBroadcast (beb)
    PerfectLinks (pp2p)
Events:
    Request: <onrrRead>: invokes a read on the register
    Request: <onrrWrite, v>: invokes a write with value v on the register
    Indication: <onrrReadReturn, v>: completes a read, returns v
    Indication: <onrrWriteReturn>: completes a write on the register
Properties:
    ONRR1, ONRR2

upon event <onrr, Init> do:
    val := nil;    # register value
    ts := 0;       # register timestamp (counts number of writes)
    write_ts := 0; # timestamp of the pending write
    acks := 0;     # number of processes having ack'ed pending write
    read_id := 0;  # id of the currently pending read operation
    readlist := [nil] * N; # replies from the Read message

##################
# Part 1: Writes #
##################

upon event <onrrWrite, v> do:
    write_ts := write_ts + 1;
    acks := 0;
    trigger <bebBroadcast, [Write, write_ts, v]>;

upon event <bebDeliver, src, [Write, value_ts, value]> do:
    if value_ts > ts:
        ts := value_ts;
        val := value;
    trigger <pp2pSend, src, [ACK, value_ts]>;

upon event <pp2pDeliver, q, [ACK, value_ts]> such that value_ts = write_ts do:
    acks := acks + 1;
    if acks > N/2:
        acks := 0;
        trigger <onrrWriteReturn>

#################
# Part 2: Reads #
#################

upon event <onrrRead> do:
    read_id := read_id + 1;
    readlist := [nil] * N;
    trigger <bebBroadcast, [Read, rid]>;

upon event <bebDeliver, src, [Read, read_id]> do:
    trigger <pp2pSend, src, [Value, read_id, ts, val]>;

upon event <pp2pDeliver, q, [Value, read_id, read_ts, read_val]> do:
    readlist[q] := (read_ts, read_val);
    if size(readlist) > N/2:
        v := highest_timestamp_val(readlist);
        readlist := [nil] * N;
        trigger <onrrReadReturn, v>;

(1, N) Atomic register

Properties

With regular registers, the guarantees we gave about reads that are concurrent to writes are a little weak. For instance, suppose that a writer invokes write(v). The specification allows for concurrent reads to return nil then v then nil again, and so on, until the write is done or if the writer crashes while writing, this can go on forever. An atomic register prevents such behavior.

An atomic register provides an additional guarantee compared to a regular register: ordering. The guarantee is that even when there is concurrency and failures, the execution is linearizable, i.e. it is equivalent to a sequential and failure-free execution. This means that we can now think of the write happening at a single atomic point in time, sometime during the execution of the write.

This means that both of the following are true:

  • Every failed write operation appears to be either complete or not to have been invoked at all
  • Every complete operation appears to have been executed at some instant between its invocation and the reply event.

Roughly speaking, atomic registers prevent “old” values from being read by a process once a newer value has been read by . The properties are:

  • ONAR1. Termination = ONRR1
  • ONAR2. Validity = ONRR2
  • ONAR3. Ordering: If a read returns a value and a subsequent read returns a value , then the write of does not precede the write of

From (1, N) regular to (1, 1) atomic

First, let’s convert (1, N) regular into (1, 1) atomic. As before, we’ll keep a timestamp for writes, which we increment every time we write. But this time, we’ll also introduce a timestamp for reads, which contains the highest write-timestamp it has read so far. This allows the reader to avoid returning old values once it has read a new one.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Implements:
    (1, 1)-AtomicRegister (ooar)
Uses:
    (1, N)-RegularRegister (onrr)

upon event <ooar, Init> do:
    val := nil;      # register value
    ts := 0;         # reader timestamp
    write_ts := nil; # writer timestamp

upon event <ooarWrite, v> do:
    write_ts := write_ts + 1;
    trigger <onrrWrite, (write_ts, v)>;

upon event <onrrWriteReturn> do:
    trigger <ooarWriteReturn>;

upon event <ooarRead> do:
    trigger <onrrRead>;

upon event <onrrReadReturn, (value_ts, value)> do:
    if value_ts > ts:
        val := value;
        ts := value_ts;
    trigger <ooarReadReturn, val>;

Algorithm 1: From (1, 1) atomic to (1, N) atomic

We can construct a (1, N) atomic register from underlying (1, 1) atomic registers. These are organized in a matrix, with instances called for and .

Register instance is used to inform process about the last value read by reader . This establishes a sort of one-way communication channel, where writes to the register, and reads from it.

The writer places written values into registers , so that all readers can read. This means that every read and write requires registers to be updated.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Implements:
    (1, N)-AtomicRegister (onar)
Uses:
    (1, 1)-AtomicRegister (multiple instances ooar[][])
Events:
    Request: <onarRead>: invokes a read on the register
    Request: <onarWrite, v>: invokes a write with value v on the register
    Indication: <onarReadReturn, v>: completes a read, returns v
    Indication: <onarWriteReturn>: completes a write on the register
Properties:
    ONAR1, ONAR2, ONAR3

upon event <onar, Init> do:
    ts := 0;          # timestamp
    acks := 0;        # number of updated registers in write and read
    writing := false; # whether we are currently reading or writing
    readval := nil;   # last read value
    readlist := [nil] * N; # reads from all other processes
    forall q  Π, r  Π:
        ooar[q][r] = new ooar with writer r, reader q;

# To write, write to all q processes through (1, 1)-atomic registers
upon event <onarWrite, v> do:
    ts := ts + 1;
    writing := true;
    forall q  Π:
        trigger <ooar[q][self]Write, (ts, v)>;

# When all (1, 1)-atomic writes are done, WriteReturn or ReadReturn
upon event <ooar[q][self]WriteReturn> do:
    acks := acks + 1;
    if acks = N:
        acks := 0;
        if writing:
            trigger <onarWriteReturn>;
            writing := false;
        else:
            trigger <onarReadReturn, readval>;

# To read, read from all r processes through (1, 1)-atomic registers
upon event <onarRead> do:
    forall r  Π do:
        trigger <ooar[self][r]Read>;

# When all reads are done, select the highest timestamp value,
# and write to all q processes
upon event <ooar[self][r]ReadReturn, (value_ts, value)> do:
    readlist[r] := (value_ts, value);
    if size(readlist) = N:
        (maxts, readval) := highest_timestamp_pair(readlist);
        readlist := [nil] * N;
        forall q  Π do:
            trigger <ooar[q][self]Write, (maxts, readval)>;

Algorithm 2: Read-impose Write-all (1, N) atomic

We assume a fail-stop model, where any number of processes can crash, channels are reliable, and failure detection is perfect. We won’t go into the algorithm in detail, but its signature is:

1
2
3
4
5
6
7
8
9
10
11
12
13
Implements:
    (1, N)-AtomicRegister (onar)
Uses:
    BestEffortBroadcast (beb)
    PerfectLinks (pp2p)
    PerfectFailureDetector (P)
Events:
    Request: <onarRead>: invokes a read on the register
    Request: <onarWrite, v>: invokes a write with value v on the register
    Indication: <onarReadReturn, v>: completes a read, returns v
    Indication: <onarWriteReturn>: completes a write on the register
Properties:
    ONAR1, ONAR2, ONAR3

Guest Lectures

Guest Lecture 1: Mathematically robust distributed systems

Some bugs in distributed systems can be very difficult to catch (it could involve long and costly simulation; with computers, it takes time to simulate all possible cases), and can be very costly when it happens.

The only way to be sure that there are no bugs is to prove it formally and mathematically.

Definition of the distributed system graph

Let be a graph, where is the set of process nodes, and is the set of channel edges connecting the processes.

Two nodes and are neighbors if and only if there is an edge .

Let be the set of crashed nodes. The other nodes are correct nodes.

We’ll define the path as the sequence of nodes such that , and are neighbors.

Two nodes and are connected if we have a path such that and .

They are n-connected if there are disjoint paths connecting them; two paths and are disjoint if (i.e. and are the two only nodes in common in the path).

The graph is k-connected if, there are disjoint paths between and .

Example on a simple algorithm

Each node holds a message and a set . The goal is for two nodes and to have and ; that is, they want to exchange messages, to communicate reliably. The algorithm is as follows:

1
2
3
4
5
6
7
for each node p:
  initially:
    send (p, m(p)) to all neighbors

  upon reception of of (v, m):
    add (v, m) to p.R
    send (v, m) to all neighbors

Reliable communication

Now, let’s prove that if two nodes and are connected, then they communicate reliably. We’ll do this by induction; formally, we’d like to prove that the proposition , defined as “”, is true for .

  • Base case

    According to the algorithm, initially sends to . So receives from , and is true.

  • Induction step

    Suppose that the induction hypothesis is true for .

    Then, according to the algorithm, sends to , meaning that receives from , which means that is true.

Thus is true.

Robustness property

If at most nodes are crashed, and the graph is -connected, then all correct nodes communicate reliably.

We prove this by contradiction. We want to prove , so let’s suppose that the opposite, is true; to prove this, we must be able to conclude that the graph is -connected, but there are 2 correct nodes and that do not communicate reliably. Hopefully, doing so will lead us to a paradoxical conclusion that allows us to assert .

As we are -connected, there exists paths paths connecting any two nodes and . We want to prove that and do not communicate reliably, meaning that all paths between them are “cut” by at least one crashed node. As the paths are disjoint, this requires at least crashed nodes to cut them all.

This is a contradiction: we were working under the assumption that nodse were crashed, and proved that nodes were crashed. This disproves and proves .

Random failures

Let’s assume that and are connected by a single path of length 1, only separated by a node . If each node has a probability of crashing, then the probability of communicating reliably is .

Now, suppose that the path is of length ; the probability of communicating reliably is the probability that none of the nodes crashing; individually, that is , so for the whole chain, the probability is .

However, if we have paths of length 1 (that is, instead of setting them up serially like previously, we set them up in parallel), the probability of not communicating reliably is that of all intermediary nodes crashing, which is ; thus, the probability of actually communicating reliably is .

If our nodes are connecting by paths of length , the probability of not communicating reliably is that of all lines being cut. The probability of a single line being cut is . The probability of any line being cut is one minus the probability of no line being cut, so the final probability is .

Example proof

Assume an infinite 2D grid of nodes. Nodes and are connected, with the distance in the shortest path being . What is the probability of communicating reliably when this distance tends to infinity?

First, let’s define a sequence of grids . is a single node, is built from 9 grids .

is correct if at least 8 of its 9 grids are correct.

We’ll introduce the concept of a “meta-correct” node; this is not really anything official, just something we’re making up for the purpose of this proof. Consider a grid . A node is “meta-correct” if:

  • It is in a correct grid , and
  • It is in a correct grid , and
  • It is in a correct grid , …

For the sake of this proof, let’s just admit that all meta-correct nodes are connected; if you take two nodes and that are both meta-correct, there will be a path of nodes connecting them.

Step 1

If is the probability that is correct, what is the probability that is correct?

is built up of 9 subgrids . Let be the probability of nodes failing; the probability of being correct is the probability at most one subgrid being incorrect.

Step 2

Let , and .

We will admit the following: if then .

Let be the result of applying (as defined in step 1) to , times: . We will prove that , by induction:

  • Base case: and , so .
  • Induction step:

    Let’s suppose that . We want to prove this for , namely .

This proves the result that .

Step 3

Todo.

Guest lecture 2: Byzantine failures

So far, we’ve only considered situations in which nodes crash. In this section, we’ll consider a new case: the one where nodes go “evil”, a situation we call byzantine failures.

Suppose that our nodes are arranged in a grid. sends a message to by broadcasting . With a simple broadcast algorithm, we just broadcast the message to the neighbor, which may be a byzantine node that alters the message before rebroadcasting it. Because can simply do that, we see that this simple algorithm is not enough to deal with byzantine failures.

To deal with this problem, we’ll consider some other algorithms.

First, consider the case where there are intermediary nodes between and (this is not a daisy chain of nodes, but instead just paths of length 2 between and ). We assume that and are both correct (non-Byzantine) nodes, but the intermediary nodes may be.

For this algorithm, we define if is even, and if it is odd. The idea is to have be the smallest number of nodes to have a majority among the intermediary nodes. Let’s also assume that has a set that acts as its memory, and a variable , initially set to . Our goal is to have .

simply sends out the message to its neighbors. The intermediary nodes forward messages that they receive to . Finally, when receives a message from , it adds it to the set . When there are nodes in the set, it can set (essentially, deliver the message).

We’ll prove properties on this. The main point to note is that these proofs make no assumption on the potentially Byzantine nodes.

  • Safety: if the number of Byzantine nodes is , then or .

    The proof is by contradiction. Let’s suppose that the opposite is true, i.e. that , where . Then, according to the algorithm, this means that there must be nodes such that , we have . But according to the algorithm, there are only two reasons for such a message being in the set; that is, either operates in good faith, receiving from , or it operates in bad faith, being a Byzantine node. The first case is impossible, as is correct. The alternative case can only happen if there are byzantine nodes, which is also impossible (since by assumption . This contradiction proves the safety property.

  • Liveness: if , we eventually have .

    To prove this, we first define a set of correct (non-Byzantine) intermediary nodes. These nodes all receive from , send it to , which places it in . Eventually, we’ll have nodes in the set, and then .

    By the liveness and safety property, we know that initially , eventually , and we never have .

  • Optimality: if , it is impossible to ensure the safety property.

    Assume we have Byzantine nodes sending to . According to the algorithm, we get , so no safety.

    We can conclude that we can tolerate at most Byzantine nodes.

But here we only considered the specific case of length 2 paths. Let’s now consider the general case, which is the connected graph. In this case, we consider any graph, and each node needs to broadcast a message . Every node has a set to send messages, and a set of received messages.

The algorithm is as follows. Initially, the nodes send to their neighbors. When a node receives from a neighbor , with and , the node sends to its neighbors, and add that to . When there exists a node , a message and sets such that , and we have message in , we can add to .

We’ll prove the following properties under the hypotheses that we have at most Byzantine nodes (a minority), and that the graph is connected (otherwise we couldn’t broadcast messages between the nodes)

  • Safety: If and are two correct nodes, we never have (where ). In other words, no fake messages are accepted.

    The proof is by contradiction, in which we use induction to arrive to a contradictory conclusion. We’ll try to prove the opposite of our claim, namely that there are two correct nodes and such that .

    According to our algorithm, we have disjoint sets whose intersection is , and elements .

    To prove this, we’ll need to prove a sub-property: that each set contains at least one byzantine node. We prove this by contradiction. We’ll suppose the opposite, namely that contains no byzantine node (i.e. that they are all correct). I won’t write down the proof of this, but it’s in the lecture notes if ever (it’s by induction).

  1. Consensus and TOB are very interdependent, so it can be a good idea to read both twice. 

« Back