绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
[Hazelcast系列 四] 分布式map
2022-04-18 14:13:28

Hazelcast的IMap 扩展了java.util.concurrent.ConcurrentMap 和java.util.Map·两个接口,是Java map的分布式实现。

1. 获取Map并存储数据


调用Hazelcast实例的getMap 方法可以获取一个分布式map,并可以通过put 方法存储数据。Hazelcast堆map数据和备份数据进行分区,并将数据平均分配给集群所有的节点。每个节点存储的数量大约为Map.size()*2/n,其中n为集群节点数量。

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer, String> fruits = instance.getMap("fruits");
fruits.put(1, "apple");
fruits.put(2, "banana");
fruits.put(3, "pear");

Hazelcast默认有271个分区,但是fruits 这个map只有三条数据,因此大多数的分区中没有保存数据。

2. 创建一个节点备份Map数据


当集群中节点数量大于1时,map中的数据会被自动分配给集群所有节点,每个节点存储的数据量约为data count/n + backups。

3. Map备份


Hazelcast会把map中的数据分布在集群的多个节点上,每个节点只存储一部分数据。分布式map的默认备份数为一。如果有一个节点故障,可以使用集群中的备份数据进行恢复。Hazelcast中的map有两种类型的备份:同步和异步。

同步备份

通过设置备份数,Hazelcast提供了更高的数据安全性,集群一个节点的数据会被拷贝到其他节点。创建同步备份只需要设置backup-count :

<hazelcast>
    ...
    <map name="fruits">
        <backup-count>1</backup-count>
    </map>
    ...
</hazelcast>

如果backup-count 的值为1,map的备份数据会全部存储在另外一个节点上。如果backup-count 的值为2,集群中的两个节点会存储map的备份数据。如果不希望对数据进行备份,可以设置backup-count 的值为0。如果性能的重要性超过数据的可靠性,backup-count 可设置的大值为6。

Hazelcast同时支持同步和异步备份,默认为同步备份,并使用backup-count 参数配置。在同步备份场景下,备份操作会阻塞其他操作直到备份数据已经同步到集群节点并收到确认,因此备份会在put 操作完成前更新,从而提供了集群数据更高的可靠性。同步备份操作可能带来额外的阻塞成本并产生时延问题。

异步备份

与同步备份呢不同,异步备份不会阻塞对map的操作,异步备份不需要确认,而且备份动作在特定的时间点执行。可以通过参数asyn-backup-count 设置异步备份,下面是一个简单声明式配置:

<hazelcast>
    ...
    <map name="default">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
    </map>
    ...
</hazelcast>
  • 数据备份会增加内存的使用。
  • map可以同时拥有同步备份和异步备份。
备份可读

Hazelcast默认只有一个同步备份。如果backup-count 的值大于1,那么集群中的节点需要保存属于本节点的数据和备份数据。因此,在某个节点上调用map.get(key) 方法时,该节点可能拥有该key对应的备份数据,默认map.get(key) 方法会从真正拥有该数据的节点读取数据(这点和kafka中从partition的leader读取数据类似)。

通过设置read-backup-data 为true,可以允许备份数据可读,Hazelcast从数据一致性考虑触发,将该参数的默认值为false。备份数据可读可以提高读操作性能,但是可能产生脏读问题。

设置备份可读的简单声明式配置如下:

<hazelcast>
    ...
    <map name="default">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
        <read-backup-data>true</read-backup-data>
    </map>
    ...
</hazelcast>

只有在至少有一个同步或异步备份的条件下备份数据可读这个功能才可用。如果从备份读取数据,需要考虑key命中问题,key 在备份数据中命中,可能在真正的数据成员节点不会命中。这会影响IMap统计中的大空闲时间和过期时间。因此在备份中命中的key,在真正拥有数据的成员可能已经过期。

4. 驱逐map数据

Hazelcast使用了新的基于数据采样的驱逐机制。

map中的数据会一直存在除非手动删除或使用驱逐策略驱逐。IMap支持基于策略的数据驱逐,目前支持两种驱逐策略:LRU和LFU。

理解map驱逐

Hazelcast实现了基于分区的map驱逐。例如,如果PER_NODE的值为max-size,Hazelcast使用下面的公式计算每个分区的大数据量:

partition-maximum-size = max-size * member-count / partition-count

如果计算出partition-maximum-size 的值小于1,partition-maximum-size 会被设置为1.

当向map中插入数据时,根据上面公式计算出的分区大数量启动驱逐。如果该分区存储的数据量超过大值,Hazelcast会在该分区启动驱逐。

假设map的配置信息如下:

  • 分区数: 200
  • 每个分区数据量: 100
  • max-size (PER_NODE): 20000

map总的存储数据量为20000,也就是说驱逐的数据量阈值为20000.当向map插入数据时,驱逐的过程如下:

  1. 计算插入的数据需要存储的分区位置
  2. 检查分区是否达到驱逐阈值
  3. 驱逐一条数据

上述驱逐过程的结果是map的大小变为19999,下一次操作不会触发新的驱逐操作直到map存储的数量再次到达驱逐阈值。

Map驱逐配置

下面是一个简单的map驱逐配置:

<hazelcast>
    ...
    <map name="default">
        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>0</max-idle-seconds>
        <eviction eviction-policy="LRU" max-size-policy="PER_NODE" size="5000"/>
    </map>
    ...
</hazelcast>

配置参数的含义如下:

  • time-to-live-seconds:该参数设置每条数据在map中的保存时间(TTL)。该参数限制了一条数据自后一次被操作的存活时间,如果该参数的值不为0,数据的存活时间超过参数值时会被自动驱逐。每条数据可以设置自己的time-to-live-seconds参数,如果不设置则使用map的参数值。该参数值的取值范围为[0,Integer.MAX_VALUE]。默认值为0,表示数据永不过期。如果参数的值不为0,数据驱逐将不会受eviction-policy的影响。

  • max-idle-seconds: 该参数设置数据在map中的大空闲时间(自后一次操作以来的时间)数据的空闲时间超过该值时会自动被驱逐。参数取值范围[0,Integer.MAX_VALUE],默认值为0,表示无穷大,数据一直不被访问也可以保存在map中。

  • eviction-policy: 数据量超过设置的大值时采用的驱逐策略:

    • NONE: 默认策略,如果使用该策略,当数据量超过大值时不会驱逐数据。可以和 time-to-live-seconds 、 max-idle-seconds 配合使用。
    • LRU: 近少使用策略.
    • LFU: 少使用策略.
  • size: map存储数据的大值。当map存储的数据量超过该参数的值时,map会基于设置的驱逐策略对数据驱逐。参数的取值范围为[0,Integer.MAX_VALUE],默认值为0,表示可以存储无穷多数据。如果希望该参数可用, eviction-policy 只能设置为LRU或LFU。

    • PER_NODE: 集群节点存储的大数据量,该策略为默认策略。

      <eviction max-size-policy="PER_NODE" size="5000"/>

    • PER_PARTITION: 每个分区存储的大数据量。

      <eviction max-size-policy="PER_PARTITION" size="27100" />

    • USED_HEAP_SIZE: 每个Hazelcast实例使用的大堆大小(MB)

      <eviction max-size-policy="USED_HEAP_SIZE" size="4096" />

    • USED_HEAP_PERCENTAGE: 每个Hazelcast实例使用的堆内存大小比例。

      <eviction max-size-policy="USED_HEAP_PERCENTAGE" size="10" />

    • FREE_HEAP_SIZE: 小空闲堆内存(MB)

      <eviction max-size-policy="FREE_HEAP_SIZE" size="512" />

    • FREE_HEAP_PERCENTAGE: 小空闲内存比例。

      <eviction max-size-policy="FREE_HEAP_PERCENTAGE" size="10" />

    • USED_NATIVE_MEMORY_SIZE: 每个Hazelcast使用的大直接内存。

      <eviction max-size-policy="USED_NATIVE_MEMORY_SIZE" size="1024" />

    • USED_NATIVE_MEMORY_PERCENTAGE: 每个实例使用的大直接内存比例。

      <eviction max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" size="65" />

    • FREE_NATIVE_MEMORY_SIZE:每个实例的小空闲直接内存。

      <eviction max-size-policy="FREE_NATIVE_MEMORY_SIZE" size="256" />

    • FREE_NATIVE_MEMORY_PERCENTAGE:每个实例的直接内存小空闲比例。

      <eviction max-size-policy="FREE_NATIVE_MEMORY_PERCENTAGE" size="5" />

数据过期后,数据不能再从map中获取,在某个时间点该数据可能会被清理以释放内存。基于过期时间的驱逐策略,可以通过 time-to-live-seconds 和 max-idle-seconds 两个参数设置。

驱逐配置
<hazelcast>
    ...
    <map name="documents">
        <eviction eviction-policy="LRU" max-size-policy="PER_NODE" size="10000"/>
        <max-idle-seconds>60</max-idle-seconds>
    </map>
    ...
</hazelcast>

在上面的配置中。名为documents map将在集群成员存储的数据量超过10000时启动驱逐,近少被使用的数据将会被驱逐(LRU),此外,60s内没有被使用的数据也会被驱逐。

下面是一个关于直接内存使用比例的配置:

<hazelcast>
    ...
    <map name="nativeMap*">
        <in-memory-format>NATIVE</in-memory-format>
        <eviction max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" eviction-policy="LFU" size="99"/>
    </map>
    ...
</hazelcast>
驱逐特定数据

上面介绍的驱逐策略和配置适用于map中的所有数据,map中满足驱逐条件的所有数据都将会被驱逐。如果想驱逐map中一些特定的数据,可以通过V put(K var1, V var2, long var3, TimeUnit var5) 和V put(K var1, V var2, long var3, TimeUnit var5, long var6, TimeUnit var8) 两个方法设置ttl 和大空闲时间:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer, String> fruits = instance.getMap("fruits");
fruits.put(1, "apple", 60, TimeUnit.SECONDS);
fruits.put(2, "banana", 60, TimeUnit.SECONDS, 30, TimeUnit.SECONDS);
驱逐所有数据

使用evictAll() 方法可以驱逐map中所有没被锁的key 。 如果map定义存储的MapStore,evictAll() 不会调用deleteAll() 方法,如果想调用deleteAll() 方法,可以使用clear() 方法。

下面是一个简单的样例:

HazelcastInstance node1 = Hazelcast.newHazelcastInstance();
HazelcastInstance node2 = Hazelcast.newHazelcastInstance();
IMap<Integer, Integer> map1 = node1.getMap("data");
for (int i = 0; i < 100; i++) {
    map1.put(i, i);
}
for (int i = 0; i < 4; i++) {
    map1.lock(i);
}
IMap<Integer, Integer> map2 = node2.getMap("data");
System.out.println("map size before evict all = " + map2.size());
map2.evictAll();
System.out.println("map size after evict all = " + map1.size());

5. 设置内存存储格式

IMap 和一些其他的数据结构,比如ICache 都有一个in-memory-format 参数。Hazelcast默认在内存中以二进制的格式存储数据。有时候以对象的形式存储数据可以加速本地处理,尤其对于查询操作。

Hazelcast支持以下三中数据格式:

  • BINARY (default): 数据包括key和value都是序列化的二进制格式存储在内存中。如果操作都是一些map的常规操作put 和 get 可以使用这种格式存储数据。
  • OBJECT: 数据以对象存储在内存中。以OBJECT格式存储数据可以减少反序列化的开销,适用于数据复杂和需要处理大量数据的场景。尽管value可以以OBJECT的格式存储,但是key依然以二进制的格式存储。
  • NATIVE: (Hazelcast 企业版特性) 这种格式和BINARY格式类型,但是存储在直接内存中。

像get这样的常规操作依赖于对象实例。当使用OBJECT 存储格式时,调用get方法,map不会返回存储的实例,而是返回存储实例的一个克隆。一次get操作需要序列化和反序列,但是使用BINARY 格式存储只需要一次反序列化,因此BINARY 更快。类似的,对于put 操作使用BINARY 存储格式也更快。

6. 元数据策略

IMap可以在更新时自动预处理多种数据类型,以加速对数据的查询,当前只有HazelcastJsonValue这种类型支持。启用创建元数据创建后,IMap会创建有关受支持类型对象的元数据,并在查询时使用此元数据。这不影响除支持的类型外,操作其他任何类型的对象的时延和吞吐量。

Hazelcast已默认开启该功能,可以通过metadata-policy 关闭该功能,该参数的可选值为:OFF,CREATE_ON_UPDATE。

关闭元数据的声明式配置:

<hazelcast>
    ...
    <map name="map-a">
        <meta>>OFF</meta>>
    </map>
    ...
</hazelcast>

代码配置:

MapConfig mapConfig = new MapConfig();
mapConfig.setMetadataPolicy(MetadataPolicy.OFF);

7. 锁map

IMap实现是线程安全的,可以满足对线程安全的基本需求。不过随着需求不断增长或者你想更多的对并发进行控制,可以考虑下面Hazelcast提供的解决方案。

考虑下面一个修改map值的代码:

public class RacyUpdateMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            if ( k % 100 == 0 ) System.out.println( "At: " + k );
            Value value = map.get( key );
            Thread.sleep( 10 );
            value.amount++;
            map.put( key, value );
        }
        System.out.println( "Finished! Result = " + map.get(key).amount );
    }

    static class Value implements Serializable {
        public int amount;
    }
}

当多个实例同时运行上述代码时,就有可能产生“竞态”,可以使用Hazelcast提供的悲观锁和乐观锁来解决这个"竞态"问题。

悲观锁

解决竞态问题的一种方法是使用悲观锁:锁住要操作map的entry直到操作完成。要使用悲观锁可以调用IMap提供的map.lock 和 map.unlock 方法。下面是一个简单的样例:

public class PessimisticUpdateMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            map.lock( key );
            try {
                Value value = map.get( key );
                Thread.sleep( 10 );
                value.amount++;
                map.put( key, value );
            } finally {
                map.unlock( key );
            }
        }
        System.out.println( "Finished! Result = " + map.get( key ).amount );
    }

    static class Value implements Serializable {
        public int amount;
    }
}

IMap 的锁在已经释放而且没有其他线程等待时,可以被垃圾收集器自动回收。IMap 的悲观锁是可重入的但是不是公平锁。

乐观锁

Hazelcast中,IMap的replace 方法使用乐观锁。replace 根据数据在内存的存储格式比较值,如果值相等,则使用新的值替换旧的值(和CSA类似)。如果想使用自定义的equals 方法进行相等性比较,数据在内存中的存储格式必须是OBJECT ,否Hazelcast首先将数据序列化然后进行比较。

下面是乐观锁的一个简单样例:

public class OptimisticMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            if ( k % 10 == 0 ) System.out.println( "At: " + k );
            for (; ; ) {
                Value oldValue = map.get( key );
                Value newValue = new Value( oldValue );
                Thread.sleep( 10 );
                newValue.amount++;
                if ( map.replace( key, oldValue, newValue ) )
                    break;
            }
        }
        System.out.println( "Finished! Result = " + map.get( key ).amount );
    }

    static class Value implements Serializable {
        public int amount;

        public Value() {
        }

        public Value( Value that ) {
            this.amount = that.amount;
        }

        public boolean equals( Object o ) {
            if ( o == this ) return true;
            if ( !( o instanceof Value ) ) return false;
            Value that = ( Value ) o;
            return that.amount == this.amount;
        }
    }
}
悲观锁 vs. 乐观锁

悲观锁和乐观锁没有的优劣,需要根据业务场景选择不同的锁策略。对于大多数只读系统,乐观锁更加合适,和悲观锁相比乐观锁有更高的性能。对于同一个key存在大量更新的场景使用悲观锁更好,从数据一致性来看悲观锁比乐观锁更加可靠。对于任务,使用IExecutorService 比使用悲观锁或乐观锁技术更加合适,IExecutorService 有更少的网络跃点和输出传输,任务会在更加靠近数据的地方被执行。

解决 ABA 问题

什么是ABA问题可以参考Wikipedia中的定义 什么是ABA问题

在多个线程更新共享资源的场景就会引发ABA问题。即使一个线程在连续读取一个特定key的值时看到的值是相同的,但是这并意味着在读之间数据没有发生变化。另一个线程可能会更改该值,执行业务逻辑后并将其更改回原来的值,而个线程认为没有任何更改。为了解决这类问题,可以给每个数据增加一个版本,在操作之前检查版本以确保数据没有被更改。尽管数据的其他部分全部相同,但是版本不同也认为是数据是不同的。这其实就是乐观锁的机制,这种机制在对特定key更新不频繁的场景使用更加合适。

给数据增加版本是常用的解决问题的方法。

使用悲观锁避免锁脑裂

可以配置在锁之前先检查集群成员数,如果检查失败,锁操作抛出SplitBrainProtectionException 并失败。悲观锁内部也使用了锁操作,因此也可以配置对锁脑裂的保护。这意味着您可以使用相同名称或与映射名称匹配的模式配置锁裂脑保护。 请注意,针对IMap锁定操作的裂脑保护可能不同于针对其他IMap方法的裂脑保护。

下面的操作支持在使用前进行脑裂检查:

  • IMap.lock(K) 、IMap.lock(K, long, java.util.concurrent.TimeUnit)
  • IMap.isLocked()
  • IMap.tryLock(K), IMap.tryLock(K, long, java.util.concurrent.TimeUnit) and IMap.tryLock(K, long, java.util.concurrent.TimeUnit, long, java.util.concurrent.TimeUnit)
  • IMap.unlock()
  • IMap.forceUnlock()
  • MultiMap.lock(K) and MultiMap.lock(K, long, java.util.concurrent.TimeUnit)
  • MultiMap.isLocked()
  • MultiMap.tryLock(K), MultiMap.tryLock(K, long, java.util.concurrent.TimeUnit) and MultiMap.tryLock(K, long, java.util.concurrent.TimeUnit, long, java.util.concurrent.TimeUnit)
  • MultiMap.unlock()
  • MultiMap.forceUnlock()

一个简单的声明式配置如下:

<hazelcast>
    ...
    <map name="myMap">
        <split-brain-protection-ref>map-actions-split-brain-protection</split-brain-protection-ref>
    </map>
    <lock name="myMap">
        <split-brain-protection-ref>map-lock-actions-split-brain-protection</split-brain-protection-ref>
    </lock>
    ...
</hazelcast>

map-lock-actions-split-brain-protection 配置用于map锁定,map-actions-split-brain-protection用于其他map操作。

9. 获取map统计信息

可以使用 getLocalMapStats() 方法获取map的统计信息,比如entry的主备数量,后更新时间以及被锁的entry数量。如果需要集群范围内的map统计信息,需要获取每个集群成员的map统计信息并将信息合并,或者从Hazelcast管理中心获取。

为了获取map的统计信息需要配置statistics-enabled 的值为true :

<hazelcast>
    ...
    <map name="myMap">
        <statistics-enabled>true</statistics-enabled>
    </map>
    ...
</hazelcast>

如果statistics-enabled 设置为false,Hazelcast将不会收集map的统计信息,统计信息也无法从Hazelcast管理中心获取,方法 getLocalMapStats() 也无法获取(统计数据都没有从何获取,哈哈)。

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Order> map = instance.getMap("data");
map.put("first order", new Order("car"));
map.get("first order");
LocalMapStats stats = map.getLocalMapStats();
System.out.println ( "size in memory  : " + stats.getHeapCost() );
System.out.println ( "creationTime    : " + stats.getCreationTime() );
System.out.println ( "number of hits  : " + stats.getHits() );
System.out.println ( "lastAccessedTime: " + stats.getLastAccessTime() );
System.out.println ( "lastUpdateTime  : " + stats.getLastUpdateTime() );

Hazelcast还保存了map中entry的统计信息,包括创建时间,后更新时间,后访问时间,命中次数和版本等。可以使用 IMap.getEntryView(key) 方法获取map中entry的统计信息:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Order> map = instance.getMap("data");
map.put("first order", new Order("car"));
map.get("first order");
EntryView entry = map.getEntryView("first order");
System.out.println ( "size in memory  : " + entry.getCost() );
System.out.println ( "creationTime    : " + entry.getCreationTime() );
System.out.println ( "expirationTime  : " + entry.getExpirationTime() );
System.out.println ( "number of hits  : " + entry.getHits() );
System.out.println ( "lastAccessedTime: " + entry.getLastAccessTime() );
System.out.println ( "lastUpdateTime  : " + entry.getLastUpdateTime() );
System.out.println ( "version         : " + entry.getVersion() );
System.out.println ( "key             : " + entry.getKey() );
System.out.println ( "value           : " + entry.getValue() );

10. 使用谓词监听map数据

可以监听map中对特定数据的操作,可以认为是使用谓词的监听(监听满足所有谓词的数据)。从3.7版本开始,Hazelcast提供了hazelcast.map.entry.filtering.natural.event.* 属性,下表展示配置参数的值为true 和不配置参数或值为false时Hazelcast行为区别:

旧值满足谓词,新值不满足谓词 无事件发送 发送REMOVED 新旧值均满足谓词 发送UPDATED 事件 发送UPDATED 事件 新旧值均不满足谓词 无事件发送 无事件发送 旧值不满足谓词,新值满足谓词 发送UPDATED 事件 发送ADDED 事件
Default True

作为一个例子,我们监听订单Order 的变化,Order 类的定义如下:

public class Order implements Serializable {
    private String name;

    public Order(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

创建一个监听Order添加、更新和删除的监听器:CustomizeEntryListener

public class CustomizeEntryListener implements EntryAddedListener<String, Order>,
        EntryUpdatedListener<String, Order>,
        EntryRemovedListener<String, Order> {
    @Override
    public void entryAdded(EntryEvent<String, Order> event) {
        System.out.println(event.getValue().getName() + " order added");
    }

    @Override
    public void entryUpdated(EntryEvent<String, Order> event) {
        System.out.println(event.getValue().getName() + " order updated");
    }

    @Override
    public void entryRemoved(EntryEvent<String, Order> event) {
        System.out.println("order removed");
    }
}

创建一个检查订单名为car 的谓词,并和CustomizeEntryListener 配合使用实现监听:

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.setProperty("hazelcast.map.entry.filtering.natural.event.*", "true");
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IMap<String, Order> map = instance.getMap("data");
        map.addEntryListener(new CustomizeEntryListener(), (Predicate<String, Order>) entry -> "car".equals(entry.getValue().getName()), true);
        map.put("first order", new Order("car"));
        map.put("first order", new Order("car"));
        map.remove("first order");

    }

上述代码的输出如下:

car order added
car order updated
order removed

11. 使用谓词批量删除

Hazelcast提供了removeAll() 方法以实现根据谓词删除所有数据,方法的定义如下:

void removeAll(Predicate<K, V> predicate);

一般来说,要获取所有满足谓词的数据需要扫描map全部数据。如果map中的数据添加了索引,Hazelcast可以使用索引来查询所有满足谓词的数据,使用索引会加速查询(有木有感觉和数据库一样?)。

调用removeAll()方法会同时移除Near Cache中的数据。

下面的代码样例向map中添加了8条数据,调用removeAll 删除所有key以hazelcast 开始的数据:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Integer> map = instance.getMap("data");
for (int i = 0; i < 4; i++) {
    map.put("hazelcast" + i, i);
}
for (int i = 0; i < 4; i++) {
    map.put("map" + i, i);
}
System.out.println("map size = " + map.size());
map.removeAll((Predicate<String, Integer>) entry -> entry.getKey().startsWith("hazelcast"));
System.out.println("map size = " + map.size());

代码输出如下:

map size = 8
map size = 4

12. 添加拦截器

你可以添加拦截操作,并执行自定义的业务逻辑以同步阻塞操作。可以修改get 方法的返回值,改变put 方法的值或者通过抛出一个异常来取消操作。拦截器不同于监听器,使用监听器可以在操作完成以后执行一些操作,拦截器是同步的可以修改操作的行为,改变操作的值甚至取消操作。

map的拦截器组成一个链,因此多次添加一个拦截器会导致相同的拦截器被执行多次。在成员初始化的时候添加拦截器会轻易的导致这种场景,因为多个成员会添加相同的拦截器。当以这种方式添加拦截器时确保拦截器实现hashCode() 方法以保证每个成员的拦截器都可以返回相同的值。虽然实现equals() 方法不是必须的,但是这是一个更好的实践,可以确保map可以安全的删除拦截器。

IMap 提供了两个方法用于添加和删除拦截器:addInterceptor 和 removeInterceptor 。下面是一个使用拦截器的简单样例:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Integer> map = instance.getMap("data");
map.addInterceptor(new MapInterceptor() {
    @Override
    public Object interceptGet(Object o) {
        System.out.println("begin get " + o);
        return o;
    }

    @Override
    public void afterGet(Object o) {
        System.out.println("get finished " + o);
    }

    @Override
    public Object interceptPut(Object oldVal, Object newVal) {
        System.out.println("old value = " + oldVal + ",new value = " + newVal);
        return newVal;
    }

    @Override
    public void afterPut(Object o) {
        System.out.println("after put value = " + o);
    }

    @Override
    public Object interceptRemove(Object o) {
        System.out.println("begin remove " + o);
        return null;
    }

    @Override
    public void afterRemove(Object o) {
        System.out.println("remove " + o + " finished");
    }
});
map.put("hazelcast", 2);
map.get("hazelcast");
map.remove("hazelcast");

13. 防止内存溢出

使用map的查询方法很容易触发内存溢出异常,尤其在集群规模较大或堆很大的条件下。例如,一个集群有5个节点,每个节点堆大值为25GB,每个成员保存10GB的数据,调用IMap.entrySet() 方法将会拉取50GB的数据,进而导致实例内存溢出而故障。对于单个节点来说IMap.values() 返回了太多的数据,一个真正的查询或谓词选择错误的查询也肯能导致这种情况,尤其在应用选择参数的时候(应用自定义查询等场景)。

为了阻止这种异常的发生,可以配置每个查询返回的大数据量。这和SQL中的SELECT * FROM map LIMIT 100 不同,在SQL中你可以使用分页查询获得全部数据。基于查询的操作的大结果限制旨在作为后一道防线,以防止检索超出其处理能力的数据。Hazelcast 中的 QueryResultSizeLimiter组件负责计算这个大小限制。

设置查询结果大小限制

如果 QueryResultSizeLimiter 组件被激活,它将计算每个分区的结果大小限制。每个 QueryResultSizeLimiter组件运行在集群成员所有的分区之上,因此只要集群成员没有超过限制组件就会一直收集信息。如果超过限制会返回客户端一个QueryResultSizeExceededException异常。该功能依赖数据在集群成员之间的均等分布,依赖计算每个成员的大小限制,因此在QueryResultSizeLimiter.MINIMUM_MAX_RESULT_LIMIT中定义了一个小值。设置低于小的值将会被加到小值之上,比如小值为5,设置的值为3,则小值变为8.

本地预检查

除去 QueryOperations中分布式的结果大小检查,还可以在被调用实例上执行本地预检查。如果客户端调用一个方法,本地预检查会在调用QueryOperations的成员上执行。由于本地预检查会增加QueryOperation的延迟,因此可以配置本地多少个分区执行本地预检查或者完全关闭该功能。

结果大小限制范围

除了指定的查询操作外,在内部还有一些使用谓词的其他操作。这些操作也会抛出 QueryResultSizeExceededException 异常,下面的表格展示了那些操作受查询结果大小的限制:

Methods Covered by Query Result Size Limit
通过系统属性配置结果大小限制

可以通过下面的两个系统属性配置查询结果的大小限制:

  • hazelcast.query.result.size.limit: map查询返回结果的大值。该值定义了单次查询返回的大数据量,如果单次查询返回的数据量超过了该值则会抛出一个QueryResultSizeExceededException 异常。
  • hazelcast.query.max.local.partition.limit.for.precheck: 本地分区大值。
作者:大哥你先走链接:https://www.jianshu.com/p/a015ffb2dd8f来源:简书著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
分享好友

分享这个小栈给你的朋友们,一起进步吧。

Hazelcast
创建时间:2022-04-18 14:01:59
Hazelcast
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

技术专家

查看更多
  • 飘絮絮絮丶
    专家
戳我,来吐槽~