Riak VClock
关于向量时钟的概念。在这里就多讲了,大家能够參照一下Dynamo的论文了解一下,向量时钟在分布式主要用于解决一致性性问题。能够和CRDTs
一起看。
以下的源码是參照riak
中的,就是把它翻译为elixir
格式而已。基本不变。
时钟主要出现的情况有网络分区和并行更新。
这样仅仅会丢掉一些向量时钟的信息,即数据更新过程的信息,可是不会丢掉实实在在的数据。仅仅有当一种情况会有问题,就是一个client保持了一个非常久之前的向量时钟,然后继承于这个向量时钟提交了一个数据,此时就会有冲突。由于服务器这边已经没有这个非常久之前的向量时钟信息了,已经被剪枝掉了可能,所以client提交的此次数据,在服务端无法找到一个祖先。此时就会创建一个sibling。 所以这个剪枝的策略是一个权衡tradeoff,一方面是无限增长的向量时钟的空间。还有一方面是偶尔的会有"false merge"。对,但肯定的是,不会悄无声息的丢数据。综上。为了防止向量时钟空间的无限增长,剪枝还是比用server标识向量时钟工作的更好。
- 结构:
主要有3个元祖{node, {opCount, TS}}
,分布为节点(协调器)。操作数和操作时间。
- 基本的方法:
merge(合并):
合并的规则是,opCount>TS
:当节同样时,谁的opCount大,谁赢;假设opCount一样时,谁的时间大谁赢。
- @doc """
- Combine all VClock in the input list into their least possible common descendant
- """
- @spec merge(list, list) :: list
- def merge([]), do: []
- def merge([singevclock]), do: singevclock
- ## first is a list, eg [:a, {1, 1234}]
- # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]]
- def merge([first|rest]) do
- merge(rest, :lists.keysort(1, first))
- end
- def merge([], nclock), do: nclock
- def merge([aclock|vclocks], nclock) do
- merge(vclocks, merge(:lists.keysort(1, aclock), nclock, []))
- end
- def merge([], [], accclock), do: :lists.reverse(accclock)
- def merge([], left, accclock), do: :lists.reverse(accclock, left)
- def merge(left, [], accclock), do: :lists.reverse(accclock, left)
- def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock],
- n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do
- cond do
- node1 < node2 ->
- merge(vclock, n, [nct1|accclock]);
- node1 > node2 ->
- merge(v, nclock, [nct2|accclock]);
- true ->
- ({_ctr, _ts} = ct) = cond do
- ctr1 > ctr2 ->
- ct1;
- ctr1 < ctr2 ->
- ct2;
- true ->
- {ctr1, :erlang.max(ts1, ts2)}
- end
- merge(vclock, nclock, [{node1, ct}|accclock])
- end
- end
prune(裁剪):
裁剪的法则主要是空间
和时间
双方面.
!()[../pic/riak_4.png]
终于的裁剪函数prune_vclock1(v, now, bprops, headtime)
.
- @doc """
- Possibly shrink the size of a vclock, depending on current age and size
- """
- @spec prune(v :: list, now :: integer, bucketprops :: any) :: list
- def prune(v, now, bucketprops) do
- ## This sort need to be deterministic, to avoid spurious merge conflicts later,
- # We achieve this by using the node ID as secondary key
- sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v)
- prune_vclock1(sortv, now, bucketprops)
- end
-
- def prune_vclock1(v, now, bprops) do
- case get_property(:small_vclock, bprops) >= :erlang.length(v) do
- true -> v;
- false ->
- {_, {_, headtime}} = hd(v)
- case (now - headtime) < get_property(:young_vclock, bprops) do
- true -> v;
- false -> prune_vclock1(v, now, bprops, headtime)
- end
- end
- end
-
- def prune_vclock1(v, now, bprops, headtime) do
- # has a precondition that v is longer than small and older than young
- case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do
- true -> prune_vclock1(tl(v), now, bprops);
- false -> v
- end
- end
-
- def get_property(key, pairlist) do
- case :lists.keyfind(key, 1, pairlist) do
- {_key, value} ->
- value;
- false ->
- :undefined
- end
- end
- source
- defmodule VClock do
- @moduledoc """
- this is !!!!!!!!
- """
- @vsn 0.1
-
- @spec fresh() :: []
- def fresh do
- []
- end
-
- # return true if va is a direct descendant of vb, else false -- remember, a vclock is its own descendant!
- @spec descends(any, []) :: (true|false)
- def descends(_, []) do
- true
- end
- @type va :: list()
- @spec descends(any, any) :: (false|true)
- def descends(va, vb) do
- [{nodeb, {ctrb, _}} | resetb] = vb
- case :lists.keyfind(nodeb, 1, va) do
- false ->
- false;
- {_, {ctra, _tsa}} ->
- (ctra >= ctrb) && descends(va, resetb)
- end
- end
- @doc """
- Combine all VClock in the input list into their least possible common descendant
- """
- @spec merge(list, list) :: list
- def merge([]), do: []
- def merge([singevclock]), do: singevclock
- ## first is a list, eg [:a, {1, 1234}]
- # rest is list of list, eg [[{:a, {1, 233}}, {:b, {3, 124}}]]
- def merge([first|rest]) do
- merge(rest, :lists.keysort(1, first))
- end
- def merge([], nclock), do: nclock
- def merge([aclock|vclocks], nclock) do
- merge(vclocks, merge(:lists.keysort(1, aclock), nclock, []))
- end
- def merge([], [], accclock), do: :lists.reverse(accclock)
- def merge([], left, accclock), do: :lists.reverse(accclock, left)
- def merge(left, [], accclock), do: :lists.reverse(accclock, left)
- def merge(v = [{node1, {ctr1, ts1} = ct1} = nct1 | vclock],
- n = [{node2, {ctr2, ts2} = ct2} = nct2 | nclock], accclock) do
- cond do
- node1 < node2 ->
- merge(vclock, n, [nct1|accclock]);
- node1 > node2 ->
- merge(v, nclock, [nct2|accclock]);
- true ->
- ({_ctr, _ts} = ct) = cond do
- ctr1 > ctr2 ->
- ct1;
- ctr1 < ctr2 ->
- ct2;
- true ->
- {ctr1, :erlang.max(ts1, ts2)}
- end
- merge(vclock, nclock, [{node1, ct}|accclock])
- end
- end
-
- @doc """
- get the counter value in vclock set from node
- """
- @spec get_counter(node :: atom, vclock::list) :: (integer|:undefined)
- def get_counter(node, vclock) do
- case :lists.keytake(node, 1, vclock) do
- {_, {c, _}} -> c;
- false -> :undefined
- end
- end
- @doc """
- Get the timestamp value in a VClock set from node
- """
- @spec get_timestamp(node :: atom, vclock :: list) :: (integer | :undefined)
- def get_timestamp(node, vclock) do
- case :lists.keytake(node, 1, vclock) do
- {_, {_, ts}} -> ts;
- false -> :undefined
- end
- end
- @doc """
- increment VClock at node
- """
- @spec increment(atom, list) :: integer
- def increment(node, vclock) do
- increment(node, timestamp(), vclock)
- end
- @spec increment(atom, integer, list) :: list
- def increment(node, incts, vclock) do
- IO.puts "#{inspect node}, #{inspect incts}, #{inspect vclock}"
- {{_ctr, _ts} = c1, newv} = case :lists.keytake(node, 1, vclock) do
- false ->
- {{1, incts}, vclock};
- {:value, {_n, {c, _t}}, modv} ->
- {{c + 1, incts}, modv}
- end
- [{node, c1} | newv]
- end
- # retrun the list of all nodes that have ever incremented VClock
- @spec all_nodes(vclock :: list) :: (list)
- def all_nodes(vclock) do
- vclock |> Enum.map(fn({x, {_, _}}) -> x end)
- end
- @days_from_gergorian_base_to_epoch (1978 * 365 + 478)
- @seconds_from_gergorian_base_to_epoch (@days_from_gergorian_base_to_epoch * 24 * 60 * 60)
- @spec timestamp() :: integer
- def timestamp do
- {megaseconds, seconds, _} = :os.timestamp()
- @days_from_gergorian_base_to_epoch + megaseconds * 1000000 + seconds
- end
-
- @doc """
- Compares two VClock for equality
- """
- @spec equal(va :: list, vb :: list) :: (true | false)
- def equal(va, vb) do
- Enum.sort(va) === Enum.sort(vb)
- end
- @doc """
- Possibly shrink the size of a vclock, depending on current age and size
- """
- @spec prune(v :: list, now :: integer, bucketprops :: any) :: list
- def prune(v, now, bucketprops) do
- ## This sort need to be deterministic, to avoid spurious merge conflicts later,
- # We achieve this by using the node ID as secondary key
- sortv = :lists.sort(fn({n1, {_, t1}}, {n2, {_, t2}}) -> {t1, n1} < {t2, n2} end, v)
- prune_vclock1(sortv, now, bucketprops)
- end
- def prune_vclock1(v, now, bprops) do
- case get_property(:small_vclock, bprops) >= :erlang.length(v) do
- true -> v;
- false ->
- {_, {_, headtime}} = hd(v)
- case (now - headtime) < get_property(:young_vclock, bprops) do
- true -> v;
- false -> prune_vclock1(v, now, bprops, headtime)
- end
- end
- end
- def prune_vclock1(v, now, bprops, headtime) do
- # has a precondition that v is longer than small and older than young
- case (:erlang.length(v) > get_property(:big_vclock, bprops)) or ((now - headtime) > get_property(:old_vclock, bprops)) do
- true -> prune_vclock1(tl(v), now, bprops);
- false -> v
- end
- end
- def get_property(key, pairlist) do
- case :lists.keyfind(key, 1, pairlist) do
- {_key, value} ->
- value;
- false ->
- :undefined
- end
- end
- end
本文转自mfrbuaa博客园博客,原文链接:http://www.cnblogs.com/mfrbuaa/p/5229840.html,如需转载请自行联系原作者