Tuesday, September 29, 2015

SWIM Distributed Group Membership Protocol

Overview


Many distributed applications require reliable and scalable implementation of membership protocol. Membership protocol provides each process (“member”) of the group with a locally-maintained list of other non-faulty processes in the group. The protocol ensures that the membership list is updated with changes resulting from new members joining the group, or dropping out either voluntarily or through a failure. The SWIM (Scalable Weakly-consistent Infection-style Process Group Membership Protocol) is motivated by unscalability of traditional heart-beating protocols, which either impose network loads that grow quadratically with group size, or compromise response times or false positive frequency of failure detection. It is aiming to implement a membership protocol that provides stable failure detection time, stable rate of false positives and low message load per group member, thus allowing distributed applications that use it to scale well.

Applications which rely on reliability and scalability of the distributed membership maintenance protocol:
  • Reliable multicast
  • Epidemic-style information dissemination (gossip)
  • Distributed databases
  • Publish-subscribe systems
  • Large scale peer-to-peer systems
  • Large scale cooperative gaming
  • Other collaborative distributed applications

Requirements to protocol:
  • Low probability of false positive detection of process fail
  • Do not rely on central server
  • Low network and computational load
  • Low time to detect failure
  • Weakly consistent
Heartbeating-based protocol do not fit well under such requirements since it suffers from scalability limitations. The unscalability of the popular class of all-to-all heartbeating protocols arises from the implicit decision to join two principal functions of the membership problem:
  • Membership update dissemination
  • Failure detection

SWIM address non-scalability problems of traditional protocols for membership maintenance by:
  • Designing the failure detection and membership update dissemination components separately
  • Using a non-heartbeat based strategy for failure detection. It uses random-probing based failure detector protocol.

Properties of SWIM protocol:
  1. Imposes a constant message load per group member;
  2. Detects a process failure in an expected constant time at some non-faulty process in the group;
  3. Provides a deterministic bound (as a function of group size) on the local time that a non-faulty process takes to detect failure of another process;
  4. Propagates membership updates, including information about failures, in gossip-style; the dissemination latency in the group grows slowly (logarithmically) with the number of members;
  5. Provides a mechanism to reduce the rate of false positives by “suspecting” a process before “declaring” it as failed within the group.
While 1-2 are properties of the used failure detection protocol, 3-5 represents properties introduced in membership protocol.

SWIM Protocol Design


SWIM protocol consists of two main components:
  • Failure Detector Component
  • Dissemination Component

Failure Detector Component

Quality of service properties of failure detection:
  • Strong completeness
  • Speed of failure detection
  • Accuracy
  • Network message load
It is proved that strong completeness and strong accuracy is impossible to achieve at the same time over asynchronous unreliable network. However, since a typical distributed application relies on Strong Completeness (in order to maintain up to date information in dynamic groups), most failure detectors, including heartbeating-based solutions, guarantee this property while attempting to maintain a low rate of false positives. SWIM takes the same approach.

SWIM uses Failure detection protocol described at [2] (see Fig. 1).


Fig. 1 SWIM failure detection: Example protocol period at Mi. This shows all the possible messages that a protocol period may initiate. 

Dissemination Component

Basic implementation of Membership Dissemination component consist of:
  • On detecting failed member by failure detector process multicast it to the group. Other members delete failed member.
  • Joined or voluntarily leaving member multicast it in the same way. But on join need to know at least one contact member of the group. 
In order for a new member to join the group it can use one of the following approaches:
  • If the group is associated with a well known server or IP multicast address, all joins could be directed to the associated address.
  • In the absence of such infrastructure, join messages could be broadcasted. Group members will reply with some probability to this broadcast request.

The basic implementation described above shows the general idea of the protocol, but in order to make protocol robust and fault tolerant following improvements may be applied:
  • Do not use multicast to disseminate membership updates, but spread membership information by piggybacking it on top of failure detection protocol (ping, ping-req and ack messages). This approach results in an infection-style of membership updates dissemination, with the associated benefits of robustness to packet losses, and of low latency.
  • Use suspicion mechanism. Process that unresponsive to ping marked as “suspected”. Suspected member treated as normal for ping selection, but if respond then spread alive information. Only after predefined timeout suspected member declared as faulty. This timeout effectively trades off an increase in failure detection time for a reduction in frequency of false failure detections. Suspicion mechanism is applicable to any system with the distinct Failure Detector and Membership Dissemination components.
  • Basic implementation of SWIM protocol guarantee only eventual detection of the failure. Round-Robin probe target selection will guarantee Time Bounded Completeness property; the time interval between the occurrence of a failure and its detection at member Mj is no more than two times the group size in number of protocol periods. This can be solved by the following modification to the protocol. The failure detection protocol at member Mi selecting ping targets not randomly from the list of members, but in a round robin fashion. A newly joining member is inserted in the membership list at a position that is chosen uniformly at random. On completing a traversal of the entire list, Mi rearranges the membership list to a random reordering.

In order to implement gossip-style membership update dissemination each group member Mi maintains a buffer of recent membership updates with count of each buffered element how much it was piggybacked (sent) so far by Mi and is used to choose which element to piggyback next. Each membership update is piggybacked at most some a*log(n) [e.g. 3 * |log(n + 1)|] in order to ensure reliable arrival of gossip to each member of the group.

SWIM can be extended to a Wide-area network (WAN) or a virtual private network (VPN), by weighing ping target choices with topological information, thus reducing bandwidth usage on core network elements inside the network.

SWIM Implementations


Consul

Consul is a distributed, highly available, and datacenter-aware service discovery and configuration management system. It uses SWIM as a basis for implementing Gossip Protocol and for managing membership information.

ScaleCube

ScaleCube (disclaimer: I am one of the authors) is an open source microservices framework for a rapid development of a distributed, resilient, reactive applications that scales. It uses a variant of SWIM protocol implementation in Java and uses it as a basis for managing the cluster of connected microservices. It uses suspicion mechanism over the failure detector events and also separate membership updates dissemination component. But it introduces separate Gossip Protocol component instead of piggybacking membership updates on top of failure detector messages. It is done in order to reuse gossip component for other platform events and have more fine grained control over time intervals used for gossiping and failure detection pinging. New members to the cluster joins via the configuration provided seed members addresses (it is an analogous of well known servers described above). And also it extends SWIM protocol with the introduction of periodic SYNC messages in order to improve recovery from network partitioning and message losses.

Reference:
  1. SWIM: Scalable Weakly-consistent Infection-style Process Group Membership Protocol (2002)
  2. Scalable and Efficient Distributed Failure Detectors
  3. Quality of Service of Failure Detectors
  4. Consul
  5. ScaleCube

Thursday, September 10, 2015

Time, Clocks and Ordering in a Distributed System

A distributed system consist of a collection of distinct processes which are spatially separated, and which communicate via messaging. Here considered the concept of events ordering in a distributed systems. First, discussed the partial ordering defined by "happened before" relation and give a distributed algorithm for extending it to a consistent total ordering of all events.

Partial Ordering


The relation "happened before" (->) on the set of events of a system is the smallest relation satisfying the following three conditions:
  • If a and b are events in the same process, and a comes before b, then a -> b.
  • If a is the sending of a message by one process and b is the receipt of the same message by another process, then a -> b.
  • If a -> b and b -> c then a -> c.
Two distinct events a and b are said to be concurrent if a !-> b and b !-> a.

The "happens before" relation describes partial ordering of the events in the system.

Logical Clock can be represented by the following condition (it doesn't specifically means physical clock):

      Clock Condition. For any events a, b: if a -> b then C(a) < C(b). 

Note that converse condition doesn't hold so if C(a) < C(b) then we cannot expect that a -> b and two cases are possible either a -> b or a and b are concurrent events. Which means that by knowing just the clock values you cannot distinguish concurrent events.

From the definition of "happens before", Clock Condition is satisfied if the following two conditions hold:
  • C1. If a and b are events in process Pi, and a comes before b, then Ci(a) < Ci(b).
  • C2. If a is the sending of a message by process Pi and b is the receipt of that message by process Pj, then Ci(a) < Cj(b).

Implementation rules:
  • IR1. Each process Pi increments Ci between any two successive events.
  • IR2. (a) If event a is the sending of a message m by process Pi, then the message m contains a timestamp Tm = Ci(a). (b) Upon receiving a message m, process Pj sets Cj greater than or equal to its present value and greater than Tm.

Java implementation of Clock Condition as an illustration to this post can be found at Logical Clocks repository. The base classes are LogicalTimestamp which represent specific moment of time and LogicalClock class which provides thread-safe methods to store and update time according to implementation rules and conditions described above during events at each process of distributed system. 

Total Ordering


We can use a system of clocks that satisfying Clock Condition to define a total ordering (=>).
If a is an event in process Pi and b is an event in process Pj, then a => b if and only if either:
  • (i) Ci(a) < Cj(b) or 
  • (ii) Ci(a) = Cj(b) and Pi < Pj. 

We can use use the concept of total ordering to solve simple synchronization problem. We wish to find the algorithm for granting access to shared resource to the process which satisfy following conditions:
  • (I) A process which has been granted resource must release it before it can be granted to another process;
  • (II) Different requests for the resource must be granted in the order in which they are made;
  • (III) If every process which is granted the resource eventually release it, then every request is eventually granted.

Algorithm which satisfy conditions I-III and uses concept of total ordering can be described with the following 5 rules:
  1. To request the resource, process Pi sends the request resource message Tm:Pi to every other process, and puts that message on its request queue, where Tm is the timestamp of the message. 
  2. When process Pj receives the request resource message Tm:Pi, it places it on its request queue and sends a (timestamped) acknowledgment message to Pi. 
  3. To release the resource, process Pi removes any Tm:Pi request resource message from its request queue and sends a (timestamped) Pi release resource message to every other process.
  4. When process Pj receives a Pi release resource message, it removes any Tm:Pi request resource message from its request queue.
  5. Process Pi is granted the resource when the following two conditions are satisfied: (i) There is a Tm:Pi request resource message in its request queue which is ordered before any other request in its queue by the total ordering relation (=>); (ii) Pi has received a message from every other process timestamped later than Tm.
It is a distributed algorithm. Each process independently follows these rules to perform an emergent function (in this case granting synchronized access to the shared resource) , and there is no central coordinating process or central storage.

Such approach and principles can be generalized and allow to implement any desired form of multiprocess synchronization in a distributed system. But it depends from operating of all processes and failure of one process will halt other processes. The problem of failure not considered here. 

Anomalous Behavior


There can be an anomalous behaviour if some events which satisfy condition a -> b happens outside the system then system can assign resource access in the wrong order since from the point of view of the system a !-> b which can contradict to expected behaviour by the users. This can be prevented by the use of properly synchronized physical clocks. It can be formally shown how closely the clocks can be synchronized.

References:
  1. Time, Clocks and the Ordering of Events in a Distributed System (1978)
  2. Logical Clocks github repository

Monday, September 7, 2015

Monolithic vs. Microservices Architecture

Monolithic Architecture

When developing a server-side application you can start it with a modular hexagonal or layered architecture which consists of different types of components:
  • Presentation - responsible for handling HTTP requests and responding with either HTML or JSON/XML (for web services APIs).
  • Business logic - the application’s business logic.
  • Database access - data access objects responsible for access the database.
  • Application integration - integration with other services (e.g. via messaging or REST API).
Despite having a logically modular architecture, the application is packaged and deployed as a monolith.

Benefits of Monolithic Architecture
  • Simple to develop.
  • Simple to test. For example you can implement end-to-end testing by simply launching the application and testing the UI with Selenium.
  • Simple to deploy. You just have to copy the packaged application to a server.
  • Simple to scale horizontally by running multiple copies behind a load balancer.
In the early stages of the project it works well and basically most of the big and successful applications which exist today were started as a monolith.

Drawbacks of Monolithic Architecture
  • This simple approach has a limitation in size and complexity. 
  • Application is too large and complex to fully understand and made changes fast and correctly. 
  • The size of the application can slow down the start-up time.
  • You must redeploy the entire application on each update.
  • Impact of a change is usually not very well understood which leads to do extensive manual testing.
  • Continuous deployment is difficult.
  • Monolithic applications can also be difficult to scale when different modules have conflicting resource requirements.
  • Another problem with monolithic applications is reliability. Bug in any module (e.g. memory leak) can potentially bring down the entire process. Moreover, since all instances of the application are identical, that bug will impact the availability of the entire application.
  • Monolithic applications has a barrier to adopting new technologies. Since changes in frameworks or languages will affect an entire application it is extremely expensive in both time and cost.

Microservices Architecture

The idea is to split your application into a set of smaller, interconnected services instead of building a single monolithic application. Each microservice is a small application that has its own hexagonal architecture consisting of business logic along with various adapters. Some microservices would expose a REST, RPC or message-based API and most services consume APIs provided by other services. Other microservices might implement a web UI.

The Microservice architecture pattern significantly impacts the relationship between the application and the database. Instead of sharing a single database schema with other services, each service has its own database schema. On the one hand, this approach is at odds with the idea of an enterprise-wide data model. Also, it often results in duplication of some data. However, having a database schema per service is essential if you want to benefit from microservices, because it ensures loose coupling. Each of the services has its own database. Moreover, a service can use a type of database that is best suited to its needs, the so-called polyglot persistence architecture.

Some APIs are also exposed to the mobile, desktop, web apps. The apps don’t, however, have direct access to the back-end services. Instead, communication is mediated by an intermediary known as an API Gateway. The API Gateway is responsible for tasks such as load balancing, caching, access control, API metering, and monitoring.



The Microservice architecture pattern corresponds to the Y-axis scaling of the Scale Cube model of scalability.

Benefits of Microservices Architecture
  • It tackles the problem of complexity by decomposing application into a set of manageable services which are much faster to develop, and much easier to understand and maintain.
  • It enables each service to be developed independently by a team that is focused on that service.
  • It reduces barrier of adopting new technologies since the developers are free to choose whatever technologies make sense for their service and not bounded to the choices made at the start of the project.
  • Microservice architecture enables each microservice to be deployed independently. As a result, it makes continuous deployment possible for complex applications.
  • Microservice architecture enables each service to be scaled independently.

Drawbacks of Microservices Architecture
  • Microservices architecture adding a complexity to the project just by the fact that a microservices application is a distributed system. You need to choose and implement an inter-process communication mechanism based on either messaging or RPC and write code to handle partial failure and take into account other fallacies of distributed computing.
  • Microservices has the partitioned database architecture. Business transactions that update multiple business entities in a microservices-based application need to update multiple databases owned by different services. Using distributed transactions is usually not an option and you end up having to use an eventual consistency based approach, which is more challenging for developers.
  • Testing a microservices application is also much more complex then in case of monolithic web application. For a similar test for a service you would need to launch that service and any services that it depends upon (or at least configure stubs for those services).
  • It is more difficult to implement changes that span multiple services. In a monolithic application you could simply change the corresponding modules, integrate the changes, and deploy them in one go. In a Microservice architecture you need to carefully plan and coordinate the rollout of changes to each of the services.
  • Deploying a microservices-based application is also more complex. A monolithic application is simply deployed on a set of identical servers behind a load balancer. In contrast, a microservice application typically consists of a large number of services. Each service will have multiple runtime instances. And each instance need to be configured, deployed, scaled, and monitored. In addition, you will also need to implement a service discovery mechanism. Manual approaches to operations cannot scale to this level of complexity and successful deployment a microservices application requires a high level of automation.

Summary

Building complex applications is inherently difficult. A Monolithic architecture better suits simple, lightweight applications. There are opinions which suggest to start from the monolith first and others which recommend not to start with monolith when your goal is a microservices architecture. But anyway it is important to understand Monolithic architecture since it is the basis for microservices architecture where each service by itself is implemented according to monolithic architecture. The Microservices architecture pattern is the better choice for complex, evolving applications. Actually the microservices approach is all about handling a complex system, but in order to do so the approach introduces its own set of complexities and implementation challenges.

References:
  1. Introduction to Microservices
  2. The Scale Cube
  3. Microservices Architecture Pattern
  4. Monolithic Architecture Pattern
  5. Microservices Guide
  6. Monolith First
  7. Don't start with a monolith
  8. The Evolving Panorama of Data
  9. API Gateway Pattern
  10. Testing Strategies in a Microservice Architecture
  11. Microservice Premium 
  12. Distributed vs. Non-Distributed Computing
  13. Fallacies of Distributed Computing

Friday, August 21, 2015

Netty Best Practices Distilled

  • Pipeline optimizations. Write do not make syscalls. Flush makes syscalls to flush out to the socket all previous writes. Limit flushes as much as possible, but also limit writes as well since it need to traverse pipeline.
  • GS-Pressure. Use VoidChannelPromise to reduce object creation if not interested in future result and no need to write listener in any channel outbound handler.
  • Correctly write with respect to slow receivers. Make use of Channel.isWritable() to prevent out of memory error.
  • Configure low and high write watermarks.
  • Pass custom events through pipeline. Good fit for handshake notifications and more.
  • Prefer ByteToMessageDecoder over ReplayingDecoder. ReplayingDecoder is slower because of more overhead in methods and needs to handle ReplayingError. Use ByteToMessageDecoder if it is possible without making things complicated.  
  • Use pooled direct buffers.
  • Write direct buffers… always.
  • Use ByteBufProcessor when need to find pattern in a ByteBuf. It is faster because it can eliminate range checks, can be created and shared, easier to inline by the JIT.
  • Prefer alloc() over Unpooled.
  • Prefer slice() and duplicate() over copy. Since they do not create extra copy of the buffer.
  • Prefer bulk operations over loops. Because otherwise need range checks on each get.
  • Use DefaultByteBufHolder for messages with payload. Gets reference-counting and release resources for free.
  • File transfer. Use zero-memory-copy for efficient transfer of raw file content with DefaultFileRegion.
  • Never block or perform computationally intensive operations on the EventLoop.
  • EventLoop extends ScheduledExecturoService, so use it! Schedule and execute tasks in EventLoop.
  • Reuse EventLoopGroup if you can. Sharing the same EventLoopGroup allows to keep the resource usage (like Thread-usage) to a minimum. 
  • Share EventLoop for proxy like applications to reduce context-switching.
  • Combine operations when call outside EventLoop. To reduce overhead of wakeups and object creation.
  • Operations from inside ChannelHandler. Use shortest path on pipeline if possible.
  • Share ChannelHandlers if stateless.
  • Remove ChannelHandler once not needed anymore. This keeps the pipeline as short as possible and so eliminate overhead of traversing as much as possible.
  • Use proper buffer type in MessageToByteEncoder. This saves extra byte copies.
  • Use auto-read flag to control flow. This can also be quite useful when writing proxy like applications.
  • Don't use JDKs SSLEngine if performance matters. Use Twitters OpenSSL based SSLEngine. Netty will ship it by its own.
  • Prefer inner static classes over anonymous classes for channel future listeners. You never know when ChannelFuture will be notified so it can prevent objects from garbage collection.
  • Native Transport (epoll) for less GC and lower latency. Only works on Linus as epoll is supported atm.

References:

  1. Netty Best Practices a.k.a. Faster == Better (slides) 
  2. Netty Best Practices with Norman Mauer (video)

Wednesday, August 19, 2015

Scalable and Efficient Distributed Failure Detectors

Failure detectors are a central component in fault-tolerant distributed systems running over unreliable asynchronous networks e.g., group membership protocols, supercomputers, computer clusters etc. The ability of the failure detector to detect process failures completely and efficiently, in the presence of unreliable messaging as well as arbitrary process crashes and recoveries, can have a major impact on the performance of these systems

Properties of failure detector:
  • {Strong/Weak} Completeness: is the guarantee that failure of group member will be eventually detected by {all/some} non-faulty members.
  • Strong Accuracy: no non-faulty group member is declared as failed by any other non-faulty member.
It is proved that achieving both strong completeness and strong accuracy is impossible on fault-prone networks. So if required strong completeness then need to accept weak accuracy (some possibility of false positive failure detection) while trying to reduce it to minimum.

Requirements to Failure Detector:
  1. Completeness: satisfy eventual Strong Completeness.
  2. Efficiency:
  • Speed: quick (within some given time T) detection of member failure by some (not all) non-faulty member
  • Accuracy: low probability (below given PM(T) which is much below of probability of message loss Pml) of failure detection mistakes.
 3. Scalabilityequal expected worst-case network load per member

Theorem. Any distributed failure detector algorithm for a group of size n (>> 1) that deterministically satisfies the Completeness, Speed and Accuracy requirements above, for a given values of T and PM(T) (<< Pml), imposes a minimal worst-case network load (messages per time unit, as defined above) of:

Furthermore, there is a failure detector that achieves this minimal worst-case bound while satisfying the Completeness, Speed, Accuracy requirements. L* is thus the optimal worst-case network load required to satisfy the Completeness, Speed, Accuracy requirements.

Heartbeat-based approaches provides completeness, but have shortcomings:
  • Centralized - creates hot-spots and prevent them from scaling
  • Distributed - inherently not very efficient and scalable.

Randomized Distributed Failure Detector Algorithm:
It takes as assumption that list of members are same and already known on the nodes. Member recovers from failure in distinguishable new incarnation. Each message also contains current incarnation number of the sender.

At each member Mi:

Integer pr; /* local period number */

Every T' time units at Mi:
0. pr := pr + 1
1. Select random member Mj from view.
     Send a ping (Mi, Mj, pr) message to Mj
     Wait for the worst case message round trip time for an ack(Mi, Mj, pr) message.
2. If not received ack yet:
     Select k members randomly from view.
     Send each of them a ping-req(Mi,Mj, pr) message
     Wait for an ack (Mi, Mj, pr) until the end of period pr.
3. If not received ack(Mi, Mj, pr) message yet then declare Mj as failed.

Anytime at Mi:
4. On receipt of a ping-req(Mm,Mj, pr) (Mj != Mi)
     Send a ping(Mi, Mj, Mm, pr) message to Mj
     On receipt of an ack(Mi, Mj, Mm, pr) message from Mj
     Send an ack(Mm, Mj, pr) message to Mm

Anytime at Mi:
5. On receipt of a ping(Mm, Mi, Ml, pr) message from member Mm
     Reply with an ack(Mm, Mi, Ml, pr) message to Mm

Anytime at Mi:
6. On receipt of a ping(Mm, Mi, pr) message from member Mm
    Reply with an ack(Mm, Mi, pr) message to Mm

Fig. 1 Example of failure detection protocol period at Mi. This shows all the possible messages that a protocol period may initiate.

Parameters T' and k of the algorithm can be formally calculated based on the given required quality of service parameters: average speed of failure detection T and accuracy PM(T). 

This algorithm has uniform expected network load at all members. The worst-case network load occurs when, every T' time units, each member initiates steps (1-6) in the algorithm. Steps (1, 6) involve at most 2 messages, while steps (2-5) involve at most 4 messages per ping-req target member. Therefore, the worst-case network load imposed by this protocol (in messages/time unit) is:

L = n * [2 + 4 * k] * 1/T'

Which means linear overall network load O(n) produced by running algorithm on all nodes and constant network load at one particular member independent from the group size n.

References:
  1. On Scalable and Efficient Distributed Failure Detectors (2001)

Monday, August 10, 2015

A Gossip-Style Failure Detector

Failure Detector is a valuable component for system management, replication, load balancing, group communication services, various consensus algorithms and many other distributed services which rely on reasonably accurate detection of failed members.

Algorithm:
  1. Each member maintains a list with each member's address and heartbeat counter. At each time period Tgossip it increments its own heartbeat counter and sends its list to one random member. Common communication delay (time for sending messages) is typically much smaller then interval between gossiping rounds (Tgossip).
  2. On receive of such gossip message, a member merges the list in the message with its own list by maximum heartbeat counter for each member.
  3. Each member occasionally broadcasts its list in order to be located initially and also recover from network partitions. Decides about broadcast in probabilistic way with increased probability after longer period of time from last received broadcast. Instead of broadcast can be used few gossip servers with well-known addresses. 
  4. Each member also maintains for each other member in the list, the last time that its corresponding heartbeat counter has increased.
  5. If the heartbeat counter wasn't increased for time Tfail the member is considered as failed. But failed members not removed from the list immediately.
  6. Member removed from membership lists after time period Tcleanup.
It is possible to analyse probabilities of gossiping and calculate parameters Tgossip and Tfail with given quality of service requirement using epidemic theory. Proposed algorithm may be optimized to utilize IP address structure in order to reduce an amount of traffics between sub networks.

References:

Wednesday, July 8, 2015

Quality of Service of Failure Detectors

Basic Quality of Service (QoS) properties of Failure Detector are:
  • How fast it detects failure
  • How well it avoids false detection
Assuming that process crash is permanent or in other words recovered processes will be the new identities.

Primary QoS metrics of Failure Detector:
  • Detection time (Td). How long it takes to detect failure.
  • Mistake recurrence time (Tmr). Time between two consecutive mistakes.
  • Mistake duration (Tm). Time it takes for failure detector to correct mistake.
Derived metrics (can be computed from primary metrics):
  • Average mistake rate.
  • Query accuracy probability. Probability that failure detector's output is correct at random moment of time.
  • Good period duration.
  • Forward good period duration.
The defined metrics do not depend on implementation-specific features of failure detection algorithm and can be used to compare failure detectors.

QoS requirements to the Failure Detector can be expressed via primary metrics:
  • Upper bound on the detection time (Tud)
  • Lower bound on the average mistake recurrence time (Tlmr)
  • Upper bound on the average mistake duration (Tum)
Together with probabilistic parameters of the network:
  • Message loss probability [Ploss]
  • Average message delay [E(D)]
  • Variance of message delay [V(D)]. Exponential distribution of message delays is very common.
Also at the paper [1] was proposed a modified heartbeating algorithm for failure detection with input parameters:
  • n -  delay between consecutive heartbeats
  • ro - time shift of the heartbeat after which process declared as failed
The goal was to compute based on (1) QoS requirements (Tud, Tlmr and Tum) and (2) the probabilistic parameters of network (Ploss, E(D) and V(D)), the optimal failure detector parameters n and ro for the proposed algorithm.

It is possible to estimate links quality parameters analyzing heartbeat messages. So we can adjust failure detector parameters dynamically using link quality estimator:
References:
  1. On the Quality of Service of Failure Detectors (2002)

Monday, June 22, 2015

The Twelve-Factor App

In the modern era, software is commonly delivered as a service: called web apps, or software-as-a-service. The twelve-factor app is a methodology for building software-as-a-service apps proposed by Heroku PaaS.

1. Codebase. One codebase tracked in revision control, many deploys.

One-to-one correlation between the codebase and the app:
  • If multiple codebases - it's not an app but distributed system. And each component in a distributed system is an app.
  • Multiple apps sharing the same code is violation of this principle. The solution is to factor code into libraries and include them via dependency management.
A deploy is the running instance of the app.

2. Dependencies. Explicitly declare and isolate dependencies.

Declare all dependencies completely and exactly via a dependency declaration manifest  and use dependency isolation tool during execution.

3. Config. Store config in the environment.

An app's config is everything that is likely to vary between deploys:
  • Resource handles to the database, Memcached, and other backing services
  • Credentials to external services such as Amazon S3 or Twitter
  • Per-deploy values such as canonical hostname for the deploy
Need strict separation of config from code. Config varies substantially across deploys, code doesn't. The twelve-factor app stores config in environment variables.

4. Backing Services. Treat backing services as attached resources.

It's an any service which app consumes over the network as part of its normal operation. The code of app makes no distinction between local (means deployed to localhost) and third party services.

5. Build, release, run. Strictly separate build and run stages.

For example, it is impossible to make changes to the code in the runtime. Release can not mutate once it is created and any changes must create a new release.

6. Processes. Execute the app as one or more stateless processes.

Twelve-factor processes are stateless and share-nothing. Any data that needs to persist must be stored in a stateful backing service, typically a database. Sticky sessions are violation of twelve-factor and should never be used or relied upon. Session state data is a good candidate for a datastore that offers time-expiration, such as Memcached or Redis.

7. Port binding. Export services via port binding.

The twelve-factor app is completely self-contained and does not rely on runtime injection of a webserver into the execution environment. The web app exports HTTP as a service by binding to a port, and listening to requests coming in on that port.

8. Concurrency. Scale out via the process model.

In the twelve-factor app, processes are a first class citizen. Developer can architect their app to handle diverse workloads by assigning each type of work to a process type. This does not exclude individual processes from handling their own internal multiplexing, via threads inside the runtime VM, or the async/evented model. But an individual VM can only grow so large (vertical scale), so the application must also be able to span multiple processes running on multiple physical machines.

The share-nothing, horizontally partitionable nature of twelve-factor app processes means that adding more concurrency is a simple and reliable operation.

9. Disposability. Maximize robustness with fast startup and graceful shutdown.

The twelve-factor app's processes are disposable, meaning they can be started or stopped at a moment's notice. Processes should strive to minimize startup time.

10. Dev/prod parity. Keep development, staging, and production as similar as possible.

Minimize gaps between development and production environments:
  • Make time gap small. A developer may write code and have it deployed hours or even just minutes later.
  • Make personnel gap small: developers involved in deploying app and watching its behavior in production.
  • Make the tools gap small: keep dev and prod as similar as possible.
The twelve-factor developer resists the urge to use different backing services between development and production.

11. Logs. Treat logs as event streams.

Logs are the stream of aggregated, time-ordered events collected from the output streams of all running processes and backing services.

A twelve-factor app never concerns itself with routing or storage of its output stream. Most significantly, the stream can be sent to a log indexing and analysis system, or a general-purpose data warehousing system such as Hadoop/Hive.

12. Admin Processes. Run admin/management tasks as one-off processes.

In a local deploy, developers invoke one-off admin processes by a direct shell command inside the app’s checkout directory. In a production deploy, developers can use ssh or other remote command execution mechanism provided by that deploy’s execution environment to run such a process.

References:
  1. The Twelve-Factor App

Friday, June 19, 2015

Criteria for Decomposing Systems into Modules

Benefits from decomposing system into modules:
  • Managerial - independent development
  • Flexibility - easier to change product
  • Comprehensibility - easier to understand product
Criteria for splitting system into modules:
  • BAD: Each module corresponds to separate subroutine
  • GOOD: Module hides ("chunk") information
What module should hide:
  • Important design decisions
  • Design decisions which is likely to change
What else:
  • Data structures
  • Sequence of instructions to call a given routine and the routine itself
  • The sequence of items processing
etc.

Hierarchical structure of dependencies and "clean" decomposition into modules are two desirable but independent properties of the system.

It is almost always incorrect to do decomposition of the system based on the flow chart (each block in the flow chart is a separate module). Instead begin with the list of difficult design decisions or decisions which is likely to change. Each module then designed to hide such decisions from others. So subroutines and programs (blocks in flow chart) is an assembled collection of code from various modules.

References:
  1. On the Criteria To Be Used in Decomposing Systems into Modules (1972)

Thursday, June 18, 2015

Fallacies of Distributed Computing

Above described assumptions which architects and designers of distributed systems are likely to make and which prove to be wrong in the long run:

  • The network is reliable. The network is unreliable and we need to address possible failure scenarios.
  • Latency is zero. Try to make few as possible network calls since latency is not zero and huge comparing to in-memory calls.
  • Bandwidth is infinite. Try to simulate real production environment.
  • The network is secure. You need to build security into your solution from Day 1. You need to be aware about security concerns and its implications even so the architect of the system should not be a security expert.
  • Topology doesn't change. In real environment topology can change. And you need to be aware regards it (e.g. use “Next Hop” routing or specify address by DNS name). 
  • There is one administrator. Administrators can constraint your options and you need to help them to manage your application.
  • Transport cost is zero. There are costs associated with both computational resources and money spent on network maintenance.
  • The network is homogeneous. Interoperability will be needed sooner or later. Do not rely on proprietary protocols but use standard technologies that are widely accepted.

References:
  1. Fallacies of Distributed Computing Explained

Tuesday, June 16, 2015

Distributed vs. Non-Distributed Computing

Distributed and non-distributed computing has conceptual differences:
  • Latency
  • Memory access
  • Partial failure
  • Concurrency
Major problems in distributed computing correspond to this differences:
  • Ensuring adequate performance
  • Dealing with differences in memory models between local and distributed entities
  • Dealing with partial failures and lack of a central resource manager
  • Dealing with problems of concurrency
Distributed application interface should reflect its distributed nature. Merging this two models leads to one of the following problems:
  • Making local computing looks like distributed makes local computing unnecessary difficult.
  • Making distributed computing looks like local leads to the unreliable system.

References:
  1. A Note on Distributed Computing (1994)