绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
kafka可插拔增强如何实现?
2020-05-25 15:09:20

导弹拦截,精准防御。

背景

拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链; 类比springMVC的拦截器:

这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;

kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。

配置方法:配置参数

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;

通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |

生产者拦截器

拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor

消费者拦截器

org.apache.kafka.clients.consumer.ConsumerInterceptor

实操

实现端到端的性能监控:

处理过程:

生产者代码:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


    private Jedis jedis; // 省略Jedis初始化


    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }

消费者代码:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }


    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }


    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs) 

配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。

小结

类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。 对Kafka进行一些可插拔的功能增强可以通过拦截器实现。

本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。

我会持续分享Java软件编程知识和程序员发展职业之路! 原创不易,关注诚可贵,转发价更高!转载请注明出处,让我们互通有无,共同进步,欢迎沟通交流。

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~