论文笔记:[SOSP 2007] Dynamo: Amazon's Highly Available Key-value Store

如题所述,该论文讲述了一种构建高可用 Key-Value 存储的方案,高可用主要是针对于写请求,存储的环境是可信环境,存储的对象的大小一般不超过 1MB。实现方法类似于 Chord + MVRs(multi-valued registers),但是另有不少针对性能的优化。Dynamo 提供 observable causal consistency。根据 论文笔记:[PODC 2015] Limitations of Highly-Available Eventually-Consistent Data Stores,这已经是这一类系统所能提供的最高一致性了。[3]

Dynamo 的主要需求和限制如下:

  1. need an “always writable” data store where no updates are rejected due to failures or concurrent writes
  2. all nodes are assumed to be trusted
  3. do not require support for hierarchical namespaces or complex relational schema
  4. 99.9% of read and write operations to be performed within a few hundred milliseconds
  5. zero-hop routing for DHT

此外还有个隐性需求藏在论文中:Periodical archiving of the dataset is a mandatory requirement for most of Amazon storage services.

为了满足写请求的高可用性,Dynamo 不在写时处理冲突,而是通过 vector versioning 同时保存多个结果,在读的时候如果能够通过 vector version 确定序关系则进行合并,并将合并后的结果写回源,否则只能选择简单的 LWW (last write wins) 或者是由客户自行处理。

与一般 Key-Value 存储不同的是,Dynamo 的 get/put 请求需要额外携带一个 context,这个 context 对用户不透明。从论文来看,这个 context 至少包括两个内容:vector version (when a client wishes to update an object, it must specify which version it is updating. This is done by passing the context it obtained from an earlier read operation, which contains the vector clock information) 以及这次请求中各个节点的响应速度 (the coordinator for a write is chosen to be the node that replied fastest to the previous read operation which is stored in the context information of the request)。

Dynamo 的 replication 策略混合了 proactive 和 passive 两种策略 [3]。其 proactive 策略是一种 “sloppy quorum” 策略,需要预先配置好 N/R/W 值,并满足 R + W > N(一般取 N = 3, R = W = 2)。首先按照 chord 的方法定位要 get/put 的位置,从此位置向后数 N 个健康的物理节点。get 或 put 时,需要向所有 N 个节点发起请求,但是只需有 R 或 W 个节点成功,即可向客户端返回结果。其 passive 策略是这样的,每个节点将自己所负责的区间 build 出一棵 Merkle 树,并通过 gossip 协议进行传播。按照 proactive replica 策略,会有 N 个节点负责同一个区间,这些节点通过 Merkle 树可以快速的比较出差异部分并进行同步。

当有节点失效时,按照上面的 proactive replication 策略会将本属于该节点 A 的值写入其他节点 B。此时 B 会将此值(持久化的)存储到一个特殊列表中,并记录 hinted handoff 标记,以了解该值应该被传输给节点 A。这一纪录会在 B 的后台任务检测到 A 恢复后,将该值传递给 A。

可以看出,这种读写请求在系统内部处理起来还是比较复杂的。Dynamo 对于每个请求都在 coordinator 节点上创建一个状态机。The state machine contains all the logic for identifying the nodes responsible for a key, sending the requests, waiting for responses, potentially doing retries, processing the replies and packaging the response to the client.Each state machine instance handles exactly one client request. For instance, a read operation implements the following state machine:

  1. send read requests to the nodes
  2. wait for minimum number of required responses
  3. if too few replies were received within a given time bound, fail the request
  4. otherwise gather all the data versions and determine the ones to be returned and
  5. if versioning is enabled, perform syntactic reconciliation and generate an opaque write context that contains the vector clock that subsumes all the remaining versions.

For the sake of brevity the failure handling and retry states are left out.

系统优化

Dynamo 在 Chord + MVRs 的基础上做了大量的优化:

  1. zero-hop routing
  2. virtual node on DHT ring
  3. coordinator choosing optimization
  4. client-drive coordinating
  5. resolve conflicts on read
  6. async write to disk
  7. vector versioning 的大小控制
  8. vector versioning merge result write-back (read repair)
  9. feedback controlled background task

下面分别讲解上面提到的优化措施。

zero-hop routing

zero-hop routing 是为了减少路由造成的延迟。Chord routing 的时间复杂度是 $O(\log n)$,这对于一个在线系统而言是比较难接受的。单机从磁盘中顺序读取 1MB 数据也只需要 10ms,而一次 RPC 查询可能就需要 1ms。Dynamo 选择通过 gossip 协议,使得每个节点都(弱一致性)知道全局所有节点的信息。这样,zero-hop routing 就成为可能。对于一般情况,用户请求通过 SLB 均匀的抵达某一个节点,然后该节点立刻知道该将这个请求转发给哪个节点。另一种情况,即上面提到的 client-drive coordinating,client 周期性的从任意节点 pull 整个系统的节点信息,从而在接下来的通信中,直接知道该将请求发送到哪个节点。这一全局信息的大小可用以下数据进行估计:

  1. IP 地址:4 bytes
  2. 端口号:2 bytes
  3. 一系列 virtual node id:Q / S * 16 bytes
    • 每个 virtual node id 的大小取决于 hash 空间的大小,论文中使用 MD5,因此每个 id 的大小为 16 bytes
    • 根据中提到的策略 3,virtual node per physical node = 区间总量/系统中的节点数量

一共有 S 个节点,每个节点需要 (6 + Q / S * 16) bytes,即整体需要 (6S + 16Q) bytes,其中 Q 为 hash 空间中预先划分好的片段数量,并且 Q 远大于 S。假设 S = 1k,则 Q = 100k,这样需要维护约 1.6MB 的路由信息。

但是考虑到 Q 个区间是预先划分的,属于每个节点都预先知道的公共信息,因此可以不用传输 virtual node id,而是传输 virtual node id index 即可。此时每个节点的信息需要 (6 + Q / S * 4) bytes,即整体需要 (6S + 4Q) bytes。假设 S = 1k, Q = 100k,则需要维护约 406KB 的路由信息。这个量级应该是可以接受的。

论文中还提到可以用 [4] 中提到的算法进行平均 $O(1)$ 的路由,然而其最坏情况下仍然是 $O(\log n)$ 的。而且在确定 preference list 的时候,需要知道 physical node 和 virtual node 的对应关系,否则选择 N 个节点的时候就会选到少于 N 个物理节点(每个节点只需维护其之后的 N 个物理节点信息即可)。个人怀疑并不能使用这种方法。

virtual node on DHT ring

一个物理节点可以具有多个虚拟节点,在 DHT 环中,每个虚拟节点被视为 DHT 环上的一个节点。

Using virtual nodes has the following advantages:

  • If a node becomes unavailable, the load handled by this node is evenly dispersed across the remaining available nodes.
  • When a node becomes available again, or a new node is added to the system, the newly available node accepts a roughly equivalent amount of load from each of the other available nodes.
  • The number of virtual nodes that a node is responsible can decided based on its capacity, accounting for heterogeneity in the physical infrastructure.

Chord DHT ring 中,每个区间的范围是不固定的,并且节点的加入或离去都可能会影响这个区间范围。对于 Dynamo 系统而言会带来以下问题:

  1. the nodes handing the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items.
  2. the Merkle trees for the new ranges need to be recalculated
  3. there was no easy way to take a snapshot of the entire key space due to the randomness in key ranges.

这些问题对于采用 RocksDB 作为单机引擎而言可能不是个大问题,但是当时还没有这种系统。Dynamo 选择的是 Berkeley Database for 10KB objects, MySQL for larger size。

Dynamo 系统采用以下策略来确定 virtual node 和划分 DHT 空间。In this strategy, the hash space is divided into Q equally sized partitions/ranges and each node is assigned Q/S tokens(virtual node id) where S is the number of nodes in the system, Q is usually set such that Q >> N. Each node needs to maintain the information regarding the partitions assigned to each node.

coordinator choosing optimization

any of the top N nodes in the preference list is allowed to coordinate the writes. In particular, since each write usually follows a read operation, the coordinator for a write is chosen to be the node that replied fastest to the previous read operation which is stored in the context information of the request. This optimization enables us to pick the node that has the data that was read by the preceding read operation thereby increasing the chances of getting “read-your-writes” consistency

client-drive coordinating

上面讲 zero-hop routing 的时候大略讲过这一点。具体而言,client 的 get/put 请求首先要路由到一个合适的 coordinator 节点上,这个节点可以是在 preference list 中任选一个节点,或者是干脆就由 client 来负责。这个 coordinator 需要建立状态机来维护整套工作流程。当 client 和 Dynamo 在同一个 data center 内时,这是可行的,并且能显著减少延迟。论文中 Table 2 显示,99.9% latency 从 60ms+ 减少到了 30ms+,平均 latency 从 ~4ms 减少到了 1-2ms。

A client periodically picks a random Dynamo node and downloads its current view of Dynamo membership state. Using this information the client can determine which set of nodes form the preference list for any given key. (Currently clients poll a random Dynamo node every 10 seconds for membership updates.)

resolve conflicts on read

这一优化是针对写请求的高可用性,而非针对性能的优化。具体的实现细节参照论文中的 4.4 节以及 论文笔记:Time, clocks, and the ordering of events in a distributed system。可以理解的,对于一个最终一致性系统,一定有可能发生两个不同的客户端同时写两个不同的节点这种事情。对于这种情况,如果要在写的时候控制一致性,要不然就拒绝掉其中的一个写请求,要不一个写请求会被另一个写请求所覆盖。采用前者所述策略,我们会丧失高可用性。采用后者所述策略,相当于 LWW (last write wins),但是我们丧失了一些信息。LWW 对于有些业务场景来说可能是不合适的,毕竟有时业务方可以根据内容进行无损合并,见 论文笔记:[Inria RR-7506] A comprehensive study of Convergent and Commutative Replicated Data Types

async write to disk

Dynamo provides the ability to trade-off durability guarantees for performance. In the optimization each storage node maintains an object buffer in its main memory. Each write operation is stored in the buffer and gets periodically written to storage by a writer thread. In this scheme, read operations first check if the requested key is present in the buffer. If so, the object is read from the buffer instead of the storage engine.

This optimization has resulted in lowering the 99.9th percentile latency by a factor of 5 during peak traffic even for a very small buffer of a thousand objects. Also, as seen in the figure, write buffering smooths out higher percentile latencies. Obviously, this scheme trades durability for performance. In this scheme, a server crash can result in missing writes that were queued up in the buffer. To reduce the durability risk, the write operation is refined to have the coordinator choose one out of the N replicas to perform a “durable write”. Since the coordinator waits only for W responses, the performance of the write operation is not affected by the performance of the durable write operation performed by a single replica.

vector versioning 的大小控制

A possible issue with vector clocks is that the size of vector clocks may grow if many servers coordinate the writes to an object. In practice, this is not likely because the writes are usually handled by one of the top N nodes in the preference list. In case of network partitions or multiple server failures, write requests may be handled by nodes that are not in the top N nodes in the preference list causing the size of vector clock to grow. In these scenarios, it is desirable to limit the size of vector clock. To this end, Dynamo employs the following clock truncation scheme: Along with each (node, counter) pair, Dynamo stores a timestamp that indicates the last time the node updated the data item. When the number of (node, counter) pairs in the vector clock reaches a threshold (say 10), the oldest pair is removed from the clock. Clearly, this truncation scheme can lead to inefficiencies in reconciliation as the descendant relationships cannot be derived accurately. However, this problem has not surfaced in production and therefore this issue has not been thoroughly investigated.

vector versioning merge result write-back (read repair)

After the read response has been returned to the caller the state machine waits for a small period of time to receive any outstanding responses. If stale versions were returned in any of the responses, the coordinator updates those nodes with the latest version. This process is called read repair because it repairs replicas that have missed a recent update at an opportunistic time and relieves the anti-entropy protocol from having to do it.

feedback controlled background task

Each of the background tasks uses this controller to reserve runtime slices of the resource shared across all background tasks. A feedback mechanism based on the monitored performance of the foreground tasks is employed to change the number of slices that are available to the background tasks. For example, the background controller checks to see how close the 99th percentile database read latency (over the last 60 seconds) is to a preset threshold (say 50ms).

其他问题

在阅读论文的过程中,我遇到一些问题,其中有些问题在论文中找到了答案,但是另一些问题论文中并没有给出具体的方案。

论文 4.6 节提到:In essence, the preference list of a key is constructed such that the storage nodes are spread across multiple data centers. This scheme of replicating across multiple datacenters allows us to handle entire data center failures without a data outage. 这个做法感觉并不容易做到。直觉上,我们需要总共 S / N 个位于其他 DC (datacenter) 的节点,这些节点均匀的分布在整个 DHT 空间上。但是这一方案难免会有 N 个节点都在同一个 DC 的情况。

论文中提到 coordinator 需要对每个请求创建一个状态机来处理整个读写流程。若此时 coordinator 失效了会怎么样?个人认为是不要紧的,首先用户没有得到回复,相当于请求超时,此时用户应认为这次请求有两种可能的状态:失败或成功。如果一个节点都没写成功,那么这个写请求就相当于丢失了,即请求失败。如果至少有一个节点写成功了,那么之后通过 Merkle tree 同步的时候,会将这一记录同步到所有需要存储的节点。如果这一记录所在的节点在同步之前就失效了,则这个记录就没了,相当于请求失败了。如果在失效之前被用户读到了,那么根据 read repair 优化,这个记录会被扩散到所有需要存储的节点上。节点失效但是之后又回来了,产生了写冲突,也会被 vector versioning 所处理。

接下来的是论文没有提到,但是我想到的几个问题。

节点的加入和退出会导致 virtual node tokens 的重新分配,这一过程是如何进行的呢?如果新加入的节点主动从其他节点中偷取一个 token,则有可能同时新加入的节点选择了相同的 token。如果系统中的节点发现了新加入的节点后,主动给出一个 token,也许是比较不错的一种选择,但是可能需要记录自己把 token 给出过给谁,或者以某一个概率给出。比如说当发现自己的 token 数大于 Q / S 时,把自己的 token 给当前 token 数量最少的节点,并且将对应的记录全都标记上 hand-off hint。token 可以在记录传输得差不多了的时候再给出去,此时新记录的信息标记 hand-off hint,读取的时候不差这一份数据。当节点退出系统时,其所拥有的 tokens 该怎么处理?显然不能由这一节点主动给别人,因为节点失效退出时没有机会执行动作。看来只能等系统中的节点发现自己的 token 数小于 Q / S 时,看看有没有漏的 token 捡一个。那么问题来了,如果两个节点取到了同一个 token 怎么办?怎么保证我取的 token 和现有的 tokens 组成的 tokens 是均匀分布在整个 DHT 空间中的?后者可能不好办,我们姑且认为等概率的就行。前者的话,如果不借助共识算法的话估计很难搞定。可能最简单的方法还是在系统外安排一组协调者,采用共识算法选主保证没有单点,然后发现并且均匀分配这些 tokens。

Reference

[1] Giuseppe DeCandia, Deniz Hastorun, Madan Jampani, Gunavardhan Kakulapati, Avinash Lakshman, Alex Pilchin, Swaminathan Sivasubramanian, Peter Vosshall, and Werner Vogels. 2007. Dynamo: Amazon’s Highly Available Key-value Store. Proc. Symp. Oper. Syst. Princ. (2007), 205–220. DOI:https://doi.org/10.1145/1323293.1294281

[2] Hagit Attiya, Faith Ellen, and Adam Morrison. 2017. Limitations of Highly-Available Eventually-Consistent Data Stores. IEEE Trans. Parallel Distrib. Syst. 28, 1 (2017), 141–155. DOI:https://doi.org/10.1109/TPDS.2016.2556669

[3] Stephanos Androutsellis-Theotokis and Diomidis Spinellis. 2004. A survey of peer-to-peer content distribution technologies. ACM Comput. Surv. 36, 4 (December 2004), 335–371. DOI:https://doi.org/10.1145/1041680.1041681

[4] Venugopalan Ramasubramanian and Emin Gun Sirer. 2004. Beehive: O(1)lookup performance for power-law query distributions in peer-to-peer overlays. System 1, 1 (2004), 8. Retrieved from http://portal.acm.org/citation.cfm?id=1251175.1251183

[5] Leslie Lamport. 1978. Time, clocks, and the ordering of events in a distributed system. Commun. ACM 21, 7 (July 1978), 558–565. DOI:https://doi.org/10.1145/359545.359563

[6] Marc Shapiro, Nuno Preguiça, Carlos Baquero, and Marek Zawirski. 2011. A comprehensive study of Convergent and Commutative Replicated Data Types. (2011).