绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
Riak VClock
2022-04-26 11:17:47

Riak VClock

关于向量时钟的概念。在这里就多讲了,大家能够參照一下Dynamo的论文了解一下,向量时钟在分布式主要用于解决一致性性问题。能够和CRDTs一起看。

以下的源码是參照riak中的,就是把它翻译为elixir格式而已。基本不变。

时钟主要出现的情况有网络分区和并行更新。

这样仅仅会丢掉一些向量时钟的信息,即数据更新过程的信息,可是不会丢掉实实在在的数据。仅仅有当一种情况会有问题,就是一个client保持了一个非常久之前的向量时钟,然后继承于这个向量时钟提交了一个数据,此时就会有冲突。由于服务器这边已经没有这个非常久之前的向量时钟信息了,已经被剪枝掉了可能,所以client提交的此次数据,在服务端无法找到一个祖先。此时就会创建一个sibling。

所以这个剪枝的策略是一个权衡tradeoff,一方面是无限增长的向量时钟的空间。还有一方面是偶尔的会有"false merge"。对,但肯定的是,不会悄无声息的丢数据。综上。为了防止向量时钟空间的无限增长,剪枝还是比用server标识向量时钟工作的更好。

  • 结构:

主要有3个元祖{node, {opCount, TS}},分布为节点(协调器)。操作数和操作时间。

  • 基本的方法:

merge(合并):

合并的规则是,opCount>TS:当节同样时,谁的opCount大,谁赢;假设opCount一样时,谁的时间大谁赢。

  1. @doc """
  2. Combine all VClock in the input list into their least possible common descendant
  3. """
  4. @spec merge(list, list) :: list
  5. def merge([]), do: []
  6. def merge([singevclock]), do: singevclock
  7. ## first is a list, eg [:a, {1, 1234}]
  8. # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]]
  9. def merge([first|rest]) do
  10. merge(rest, :lists.keysort(1, first))
  11. end
  12. def merge([], nclock), do: nclock
  13. def merge([aclock|vclocks], nclock) do
  14. merge(vclocks, merge(:lists.keysort(1, aclock), nclock, []))
  15. end
  16. def merge([], [], accclock), do: :lists.reverse(accclock)
  17. def merge([], left, accclock), do: :lists.reverse(accclock, left)
  18. def merge(left, [], accclock), do: :lists.reverse(accclock, left)
  19. def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock],
  20. n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do
  21. cond do
  22. node1 < node2 ->
  23. merge(vclock, n, [nct1|accclock]);
  24. node1 > node2 ->
  25. merge(v, nclock, [nct2|accclock]);
  26. true ->
  27. ({_ctr, _ts} = ct) = cond do
  28. ctr1 > ctr2 ->
  29. ct1;
  30. ctr1 < ctr2 ->
  31. ct2;
  32. true ->
  33. {ctr1, :erlang.max(ts1, ts2)}
  34. end
  35. merge(vclock, nclock, [{node1, ct}|accclock])
  36. end
  37. end



prune(裁剪):

裁剪的法则主要是空间时间双方面.
!()[../pic/riak_4.png]

终于的裁剪函数prune_vclock1(v, now, bprops, headtime).

  1. @doc """
  2. Possibly shrink the size of a vclock, depending on current age and size
  3. """
  4. @spec prune(v :: list, now :: integer, bucketprops :: any) :: list
  5. def prune(v, now, bucketprops) do
  6. ## This sort need to be deterministic, to avoid spurious merge conflicts later,
  7. # We achieve this by using the node ID as secondary key
  8. sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v)
  9. prune_vclock1(sortv, now, bucketprops)
  10. end
  11. def prune_vclock1(v, now, bprops) do
  12. case get_property(:small_vclock, bprops) >= :erlang.length(v) do
  13. true -> v;
  14. false ->
  15. {_, {_, headtime}} = hd(v)
  16. case (now - headtime) < get_property(:young_vclock, bprops) do
  17. true -> v;
  18. false -> prune_vclock1(v, now, bprops, headtime)
  19. end
  20. end
  21. end
  22. def prune_vclock1(v, now, bprops, headtime) do
  23. # has a precondition that v is longer than small and older than young
  24. case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do
  25. true -> prune_vclock1(tl(v), now, bprops);
  26. false -> v
  27. end
  28. end
  29. def get_property(key, pairlist) do
  30. case :lists.keyfind(key, 1, pairlist) do
  31. {_key, value} ->
  32. value;
  33. false ->
  34. :undefined
  35. end
  36. end



  • source
  1. defmodule VClock do
  2. @moduledoc """
  3. this is !!!!!!!!
  4. """
  5. @vsn 0.1
  6. @spec fresh() :: []
  7. def fresh do
  8. []
  9. end
  10. # return true if va is a direct descendant of vb, else false -- remember, a vclock is its own descendant!
  11. @spec descends(any, []) :: (true|false)
  12. def descends(_, []) do
  13. true
  14. end
  15. @type va :: list()
  16. @spec descends(any, any) :: (false|true)
  17. def descends(va, vb) do
  18. [{nodeb, {ctrb, _}} | resetb] = vb
  19. case :lists.keyfind(nodeb, 1, va) do
  20. false ->
  21. false;
  22. {_, {ctra, _tsa}} ->
  23. (ctra >= ctrb) && descends(va, resetb)
  24. end
  25. end
  26. @doc """
  27. Combine all VClock in the input list into their least possible common descendant
  28. """
  29. @spec merge(list, list) :: list
  30. def merge([]), do: []
  31. def merge([singevclock]), do: singevclock
  32. ## first is a list, eg [:a, {1, 1234}]
  33. # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]]
  34. def merge([first|rest]) do
  35. merge(rest, :lists.keysort(1, first))
  36. end
  37. def merge([], nclock), do: nclock
  38. def merge([aclock|vclocks], nclock) do
  39. merge(vclocks, merge(:lists.keysort(1, aclock), nclock, []))
  40. end
  41. def merge([], [], accclock), do: :lists.reverse(accclock)
  42. def merge([], left, accclock), do: :lists.reverse(accclock, left)
  43. def merge(left, [], accclock), do: :lists.reverse(accclock, left)
  44. def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock],
  45. n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do
  46. cond do
  47. node1 < node2 ->
  48. merge(vclock, n, [nct1|accclock]);
  49. node1 > node2 ->
  50. merge(v, nclock, [nct2|accclock]);
  51. true ->
  52. ({_ctr, _ts} = ct) = cond do
  53. ctr1 > ctr2 ->
  54. ct1;
  55. ctr1 < ctr2 ->
  56. ct2;
  57. true ->
  58. {ctr1, :erlang.max(ts1, ts2)}
  59. end
  60. merge(vclock, nclock, [{node1, ct}|accclock])
  61. end
  62. end
  63. @doc """
  64. get the counter value in vclock set from node
  65. """
  66. @spec get_counter(node :: atom, vclock::list) :: (integer|:undefined)
  67. def get_counter(node, vclock) do
  68. case :lists.keytake(node, 1, vclock) do
  69. {_, {c, _}} -> c;
  70. false -> :undefined
  71. end
  72. end
  73. @doc """
  74. Get the timestamp value in a VClock set from node
  75. """
  76. @spec get_timestamp(node :: atom, vclock :: list) :: (integer | :undefined)
  77. def get_timestamp(node, vclock) do
  78. case :lists.keytake(node, 1, vclock) do
  79. {_, {_, ts}} -> ts;
  80. false -> :undefined
  81. end
  82. end
  83. @doc """
  84. increment VClock at node
  85. """
  86. @spec increment(atom, list) :: integer
  87. def increment(node, vclock) do
  88. increment(node, timestamp(), vclock)
  89. end
  90. @spec increment(atom, integer, list) :: list
  91. def increment(node, incts, vclock) do
  92. IO.puts "#{inspect node}, #{inspect incts}, #{inspect vclock}"
  93. {{_ctr, _ts} = c1, newv} = case :lists.keytake(node, 1, vclock) do
  94. false ->
  95. {{1, incts}, vclock};
  96. {:value, {_n, {c, _t}}, modv} ->
  97. {{c + 1, incts}, modv}
  98. end
  99. [{node, c1} | newv]
  100. end
  101. # retrun the list of all nodes that have ever incremented VClock
  102. @spec all_nodes(vclock :: list) :: (list)
  103. def all_nodes(vclock) do
  104. vclock |> Enum.map(fn({x, {_, _}}) -> x end)
  105. end
  106. @days_from_gergorian_base_to_epoch (1978 * 365 + 478)
  107. @seconds_from_gergorian_base_to_epoch (@days_from_gergorian_base_to_epoch * 24 * 60 * 60)
  108. @spec timestamp() :: integer
  109. def timestamp do
  110. {megaseconds, seconds, _} = :os.timestamp()
  111. @days_from_gergorian_base_to_epoch + megaseconds * 1000000 + seconds
  112. end
  113. @doc """
  114. Compares two VClock for equality
  115. """
  116. @spec equal(va :: list, vb :: list) :: (true | false)
  117. def equal(va, vb) do
  118. Enum.sort(va) === Enum.sort(vb)
  119. end
  120. @doc """
  121. Possibly shrink the size of a vclock, depending on current age and size
  122. """
  123. @spec prune(v :: list, now :: integer, bucketprops :: any) :: list
  124. def prune(v, now, bucketprops) do
  125. ## This sort need to be deterministic, to avoid spurious merge conflicts later,
  126. # We achieve this by using the node ID as secondary key
  127. sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v)
  128. prune_vclock1(sortv, now, bucketprops)
  129. end
  130. def prune_vclock1(v, now, bprops) do
  131. case get_property(:small_vclock, bprops) >= :erlang.length(v) do
  132. true -> v;
  133. false ->
  134. {_, {_, headtime}} = hd(v)
  135. case (now - headtime) < get_property(:young_vclock, bprops) do
  136. true -> v;
  137. false -> prune_vclock1(v, now, bprops, headtime)
  138. end
  139. end
  140. end
  141. def prune_vclock1(v, now, bprops, headtime) do
  142. # has a precondition that v is longer than small and older than young
  143. case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do
  144. true -> prune_vclock1(tl(v), now, bprops);
  145. false -> v
  146. end
  147. end
  148. def get_property(key, pairlist) do
  149. case :lists.keyfind(key, 1, pairlist) do
  150. {_key, value} ->
  151. value;
  152. false ->
  153. :undefined
  154. end
  155. end
  156. end





本文转自mfrbuaa博客园博客,原文链接:http://www.cnblogs.com/mfrbuaa/p/5229840.html,如需转载请自行联系原作者 
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Riak TS
创建时间:2022-04-26 11:11:26
Riak TS
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • gaokeke123
    专家
戳我,来吐槽~