绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
分布式学习(5) ---- 初识消息系统kafka
2020-05-25 16:59:36
消息系统在分布式应用中有着不可或缺的地位,像是成产消费数据解耦,缓存未处理的消息等等。

那为什么不学习用Java写的ActiveMQ或RabbitMQ呢?

因为我看过卡夫卡写的变形记。

简单原理图


分布式消息系统就是生产者集群和消费者集群分离,通过中间的一个消息系统进行通信。

生产者异步生产东西,不用管消费者的反馈,消费者也不用死等着生产者生产,等有东西了来拿就好。就像是母鸡下蛋,母鸡(生产者)直接把蛋(消息)下在筐里,人(消费者)不用在一边等着,只用隔段时间来拿就行了。

概念介绍

producer

Kafka系统中的生产者,用于产生数据并发送给broker进行存储。由于需要与broker中的分区保持socket连接,因此需要在zk中维护生产者与分区broker的对应关系。同一个topic下的数据,会以某种负载均衡的方式发送到不同的分区中。

broker

Broker可以当做Kafka中的存储节点,数据按照topic组织,按照某种负载均衡方式分配到不同的分区中。一个Topic由多个分区组成,每个分区可以设置备份数量。分区由一个leader+多个followers组成,生产者直接与leader进行沟通,leader接收消息后,其他的followers会同步这个消息。所有的follwers同步消息后,该消息才会成为可消费的状态。

Broker中Topic与分区,分区与生产者,分区之间的选举备份等等信息都需要zookeeper进行协调。

consumer

Consumer是Kafka中的消费者,通常以组的形式存在,一个Group会包含多个Consumer。每个组对应一个Topic,该Topic内的分区只能对应一个消费者,也就是如果消费者很多的情况下,会出现有的消费者消费不到数据;如果消费者很少的情况下,会有消费者同时消费多个分区的数据。

Kafka仅仅会保证一个分区的消息的消费是有序的,多个分区并不保证有序性。

为了保证数据消费的可靠性,Kakka提供了几种消费的机制:

  • 1 at most once,即消费数据后,保存offset,就再也取不到这个数据了。
  • 2 at least once,即消费数据后,保存offset,如果保存出错,下次可能还会取到该数据

在Kafka中offset是由consumer维护的(实际可以由zookeeper来完成)。这种机制有两个好处,

  • 一个是可以依据consumer的能力来消费数据,避免产生消费数据的压力;
  • 另一个就是可以自定义fetch消费的数据数目,可以一次读取1条,也可以1次读取100条。

topic

Kafka中的数据的主题,所有的操作(如消息的存储和读取\消费)都是依据topic完成。

partition

每个Topic由多个分区组成,每个分区内部的数据保证了有序性,即是按照时间序列,append到分区的尾部。分区是有固定大小的,容量不够时,会创建新的分区。Kafka在一定时间内会定期清理过期的文件。

这种连续性的文件存储,一方面有效的利用磁盘的线性存取;另一方面减轻了内存的压力。

zookeeper

在Kafka中很多节点的调度以及资源的分配,都要依赖于zookeeper来完成。

  • 1 Broker的注册,保存Broker的IP以及端口;
  • 2 Topic注册,管理broker中Topic的分区以及分布情况
  • 3 Broker的负载均衡,讲Topic动态的分配到broker中,通过topic的分布以及broker的负载判断
  • 4 消费者,每个分区的消息仅发送给一个消费者
  • 5 消费者与分区的对应关系,存储在zk中
  • 6 消费者负载均衡,一旦消费者增加或者减少,都会触发消费者的负载均衡
  • 7 消费者的offset,High level中由zk维护offset的信息;Low Level中由自己维护offset

Demo实现

由于我租的乞丐版服务器,开伪集群有一些困难,所以以下demo均在单机上完成。

首先来看一下命令行实现简易demo:

  • 启动kafka服务

bin/kafka-server-start.sh config/server.properties &
  • 创建一个Topic,一个分区,一个备份

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  • 启动生产者控制台

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
  • 另开一个窗口,启动消费者控制台

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  • 在生产者控制台中生产数据

  • 这时换到消费者控制台,显示如下,表示数据已经被消费

用kafka的Java-api实现

  • maven
<!-- kafka -->
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>0.10.0.1</version>
</dependency>
  • 生产者
public class Produce {

    public static void main(String[] args) {
        System.out.println("begin produce");
        connectionKafka();
        System.out.println("finish produce");
    }

    public static void connectionKafka() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "your url:9092");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("test2", Integer.toString(i), Integer.toString(i)));
            System.out.println("send成功");
            try {
                Thread.currentThread().sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
}
  • 消费者
public class Consumer{

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "your url:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

先运行消费者,再运行生产者。

可以看到生产者2秒生成一条数据,随即发送给消息队列,消费者轮询,有消息来就消费。这种属于逐条发送,略微影响效率。也可以把生产者消息先放到缓存队列中,到达一定的数量一起发送,通过设置配置文件中的linger.ms参数大于0来实现。

控制台打印如下

参考:

Kafka主要参数详解

kafka生产者客户端(0.10.1.1API) - OrcHome

Kafka入门初探+伪集群部署 - xingoo - 博客园

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~