绑定完请刷新页面
取消
刷新

分享好友

×
取消 复制
架构师技能树之——kafka
2020-05-28 16:23:04

每天更新各种好玩的好用的java知识及面试资讯,欢迎小伙伴们关注哈,点关注,不迷路

Java架构杂货铺zhuanlan.zhihu.com图标

什么是Kafka

Apache Kafka® is a distributed streaming platform

A streaming platform has three key capabilities:

Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
Store streams of records in a fault-tolerant durable way.
Process streams of records as they occur.
Kafka is generally used for two broad classes of applications:

Building real-time streaming data pipelines that reliably get data between systems or applications
Building real-time streaming applications that transform or react to the streams of data
To understand how Kafka does these things, let's dive in and explore Kafka's capabilities from the bottom up.

First a few concepts:

Kafka is run as a cluster on one or more servers that can span multiple datacenters.
The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.

Kafka是一款分布式消息发布和订阅系统,它的特点是高性能、高吞吐量。

早设计的目的是作为LinkedIn的活动流和运营数据的处理管道。这些数据主要是用来对用户做用户画像分析以及服务器性能数据的一些监控所以kafka一开始设计的目标就是作为一个分布式、高吞吐量的消息系统,所以适合运用在大数据传输场景。如的开源分布式处理系统如cloudera 、Storm 、Spark、 Flink 等都支持与 Kafka 集成

Kafka 起初是由 Linkedin 公司采用 Scala 语言开发的 个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统
目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
  • 消息系统: kafka 和传统的消息系统(也称作消息中间件〉都具备系统解稿、冗余存储、流量削峰、缓冲、异步通信、扩展性、 可恢复性等功能。与此同时, Kafka供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能
  • 存储系统: Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险 也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“”或启用主题的日志压缩功能即可
  • 流式处理平台: Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操

Kafka的企业应用场景

由于kafka具有更好的吞吐量、内置分区、冗余及容错性的优点(kafka每秒可以处理几十万消息),让kafka成为了一个很好的大规模消息处理应用的解决方案。所以在企业级应用长,主要会应用于如下几个方面

行为跟踪:kafka可以用于跟踪用户浏览页面、搜索及其他行为。通过发布-订阅模式实时记录到对应的topic中,通过后端大数据平台接入处理分析,并做更进一步的实时处理和监控

日志收集:日志收集方面,有很多比较的产品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示从服务器上收集日志文件,然后放到一个集中的平台(文件服务器)进行处理。在实际应用开发中,我们应用程序的log都会输出到本地的磁盘上,排查问题的话通过linux命令来搞定,如果应用程序组成了负载均衡集群,并且集群的机器有几十台以上,那么想通过日志快速定位到问题,就是很麻烦的事情了。所以一般都会做一个日志统一收集平台管理log日志用来快速查询重要应用的问题。所以很多公司的套路都是把应用日志集中到kafka上,然后分别导入到es和hdfs上,用来做实时检索分析和离线统计数据备份等。而另一方面,kafka本身又提供了很好的api来集成日志并且做日志收集

kafka架构

一个典型的kafka集群包含若干Producer(可以是应用节点产生的消息,也可以是通过Flume收集日志产生的事件),若干个Broker(kafka支持水平扩展)、若干个Consumer Group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置及服务协同。Producer使用push模式将消息发布到broker,consumer通过监听使用pull模式从broker订阅并消费消息。

多个broker协同工作,producer和consumer部署在各个业务逻辑中。三者通过zookeeper管理协调请求和转发。这样就组成了一个高性能的分布式消息发布和订阅系统。

图上有一个细节是和其他mq中间件不同的点,producer 发送消息到broker的过程是push,而consumer从broker消费消息的过程是pull,主动去拉数据。而不是broker把数据主动发送给consumer

名词解释

  • 1)Broker 服务代理节点

Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

  • 2)Producer

负责发布消息到Kafka broker

  • 3)Consumer

消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。

  • 4)Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • 5)Partition

Parition是物理上的概念,每个Topic包含一个或多个Partition.

  • 6)Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

  • 7)Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把 这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)。

Kafka安装部署

安装kakfa之前需要安装jdk和zookeeper
版本:
jdk8
zookeeper 3.4.12
kafka 2.11-2.0.0.tgz

1 jdk安装

kafka是scale语言编写,JVM系语言,所以需要安装jdk
# 下载jdk-8u181-linux-x64.tar.gz
# 解压到/usr/java
tar -zxvf jdk-8u181-linux-x64.tar.gz
# 环境变量 vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_181
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/bin:$JRE_HOME/lib
# 配置生效
source /etc/profile
# 验证
java -version

2 zookeper安装

Zoo Keeper 是安装 Kafka 集群的必要组件, afka 通过 ZooKeeper 来实施对元数据信息,包括集群 broker 、主题、 分区等内容。
ZooKeeper 是一个开源的分布式协调服务,是 google Chubby 个开源实现。分布式应用程序可基ZooKeeper 实现诸如数据发布/订阅 、负载均衡、 命名服务、分布式协调/通知,集群管理、 Master 举、配置维护等功能。在 ZooKeeper 中共有3个角色: leader follower和observer,同一时刻 ZooKeeper 集群中只会有一个 leader ,其他的都 follower和observer。observer不参与投票,默认情况下 ZooKeeper 中只有 leader和follewer两个角色
# 下载 zookeeper-3.4.12.tar.gz  /opt
# 解压
tar -zxvf zookeeper-3.4.12.tar.gz
# 进入zookeepr目录
cd zookeeper-3.4.12
# 配置环境变量 /etc/profile
export ZOOKEEPER_HOME=/opt/zookeeper-3.4.12
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# 使其生效
source /etc/profile

# 配置文件 进入$ZOOKEEPER_HOME/conf
cd conf
cp zoo_sample.cfg zoo.cfg
# 配置zoo.cfg
#####################zoo.cfg#####################
# Zookeeper服务器心跳时间,ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader于follower心跳检测大容忍时间响应超过syncLimit*ticeTimeleader认为
# follower 死掉",从服务器列表中删除follower
syncLimit=5
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# Zookeeper对外服务端口
clientPort=2181
#################################################
# 创建数据目录和日志目录
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log
# 在${dataDir}目录(/tmp/zookeeper/data)下创建一个myid文件并写入
# 一个数值比如0myid文件里存放的是服务器的编号
# 启动服务
zkServer.sh start
# 查看服务状态 
zkServer.sh status
# zk集群 生成环境都是使用集群
# /etc/hosts 
192.168..111 node1
192.168..112 node2
192.168..113 node3
# 在3台机器的zoo.cfg中添加以下配置
server.=192.168..111:2888:3888
server.1=192.168..112:2888:3888
server.2=192.168..112:2888:3888

3 kafka安装

# 下载 kafka_2.11-2.0.0.tgz
# 解压到/opt
tar -zxvf kafka_2.11-2.0.0.tgz
cd kafka_2.11-2.0.0
# Kafka的根目录$KAFKA_HOME,即/opt/kafka_2.11-2.0.0 
# 环境配置 /etc/profile
export KAFKA_HOME=/opt/kafka_2.11-2.0.0
export PATH=$PATH:$KAFKA_HOME/bin
# 使其生效
source /etc/profile
# 修改配置文件 $KAFKA_HOME/conf/server.properties
###################conf/server.properties################
# broker的编号,如果有多个broker,则每个broker的编号需要设置不同
broker.id=0
# broker对外提供的服务入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# kafka所需要的Zooper集群地址,单机为了实验kafka和zk安装本机上
zookeeper.connect=localhost:2181/kafka
#########################################################
# 启动kafka,在$KAFKA_HOME目录
bin/kafka-server-start.sh config/server.properties
# 后台启动
bin/kafka-server-start.sh -daemon config/server.properties 
# 或
bin/kafka-server-start.sh config/server.properties & 
# 停止kafka
bin/kafka-server-stop.sh -daemon config/server.properties 
########################集群##########################
192.168.0.111
192.168.0.112
192.168.0.113
# 三台环境都如上面下操作kafka
# 分别修改三台机器的server.properties,在同一个集群中每台机器id必须
broker.id=0
broker.id=1
broker.id=2
# 修改zooper.connect 验证zk安装在111机器上
zookeeper.connect=192.168.0.111:2181
# 修改listeners配置
# 如果配置了listeners,那么消息生产者和消费者都会使用listeners的配置来进行消息的收发,否则
# 会使用localhost
# PLAINTEXT表示协议,默认明文,可以选择其他加密协议
listeners=PLAINTEXT://192.168.0.111:9092
# 分别启动三台服务器
sh kafka-server-start.sh -daemon ../config/server.properties

kafka的基本操作

创建topic

sh kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 -- partitions 1 --topic test

Replication-factor 表示该topic需要在不同的broker中保存几份,这里设置成1,表示在两个broker中保存两份

Partitions 分区数

查看topic

sh kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic first_topic

消费消息

sh kafka-console-consumer.sh --bootstrap-server 192.168.13.106:9092 --topic test --from-beginning

发送消息

sh kafka-console-producer.sh --broker-list 192.168.244.128:9092 --topic first_topic

————————————————

作者:「向着风奔跑」

原文链接:blog.csdn.net/liulong10

分享好友

分享这个小栈给你的朋友们,一起进步吧。

Kafka
创建时间:2020-05-22 09:55:12
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
展开
订阅须知

• 所有用户可根据关注领域订阅专区或所有专区

• 付费订阅:虚拟交易,一经交易不退款;若特殊情况,可3日内客服咨询

• 专区发布评论属默认订阅所评论专区(除付费小栈外)

栈主、嘉宾

查看更多
  • ?
    栈主

小栈成员

查看更多
  • wangdabin1216
  • 小雨滴
  • chenglinjava0501
  • 时间不说话
戳我,来吐槽~