• Welcome to the world's largest Chinese hacker forum

    Welcome to the world's largest Chinese hacker forum, our forum registration is open! You can now register for technical communication with us, this is a free and open to the world of the BBS, we founded the purpose for the study of network security, please don't release business of black/grey, or on the BBS posts, to seek help hacker if violations, we will permanently frozen your IP and account, thank you for your cooperation. Hacker attack and defense cracking or network Security

    business please click here: Creation Security  From CNHACKTEAM

Recommended Posts

1 Kafka简介

Kafka最初由Linkedin开发,是由zookeeper协调的分布式、分区、多副本的分布式日志系统。常用于web/nginx日志、访问日志、消息服务等。Linkedin在2010年向Apache Foundation捐款,成为顶级开源项目。主要应用场景是:日志收集系统和消息系统.

Kafka是一个分布式消息队列。它具有高性能、持久性、多副本备份和横向扩展能力。将消息写入生产者队列,消费者从队列中取出消息用于业务逻辑。卡夫卡是一种发布-订阅模式。将消息保存在磁盘中,顺序访问磁盘,避免随机读写带来的性能瓶颈。

消息(Message)

指应用程序之间传输的数据。消息可以非常简单,比如只包含文本字符串,也可以非常复杂,可能包含嵌入的对象。

消息队列(Message Queue)

应用程序之间的一种通信方式,在这种方式下,消息发送后可以立即返回,并通过消息系统保证信息的可靠传输。消息发布者只将消息发布到MQ而不管谁取,消息用户只从MQ取消息而不管谁发布,这样发布者和用户都不需要知道对方的存在。

2 Kafka特性

高吞吐量、低延迟

卡夫卡最大的特点就是收发消息非常快。kafka每秒可以处理几十万条消息,其最小延迟只有几毫秒。

高灵活性

每个主题包含多个分区,一个主题中的分区可以分布在不同的代理中。

持久性和可靠性

Kafka可以允许数据的持久存储,将消息持久存储到磁盘,并支持数据备份以防止数据丢失。

容错

允许集群中的节点出现故障,一个节点宕机,Kafka集群才能正常工作。

高并发性

支持数千个客户端同时读写。

3 Kafka集群架构

bm00uoc34jd5220.png

生产者

消息生产者,向Kafka集群的终端或服务发布消息。

经纪人

kafka集群中包含的服务器,borker表示Kafka集群中的一个节点。

主题

发布到Kafka cluster的每一条消息都属于该类别,即Kafka是面向主题的。

更通俗地说,主题就像一个消息队列,生产者可以编写消息,消费者可以读取消息。一个主题支持多个生产者或消费者同时订阅,因此具有良好的可扩展性。

划分

每个主题包含一个或多个分区。卡夫卡指定的单位是分区。

复制品

分区的副本,保证了分区的高可用性。

消费者

从Kafka集群中消费消息的终端或服务。

consumer group

每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

leader

每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。 producer 和 consumer 只跟 leader 交互。

follower

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。

controller

知道大家有没有思考过一个问题,就是Kafka集群中某个broker宕机之后,是谁负责感知到他的宕机,以及负责进行Leader Partition的选举?
如果你在Kafka集群里新加入了一些机器,此时谁来负责把集群里的数据进行负载均衡的迁移?
包括你的Kafka集群的各种元数据,比如说每台机器上有哪些partition,谁是leader,谁是follower,是谁来管理的?
如果你要删除一个topic,那么背后的各种partition如何删除,是谁来控制?
还有就是比如Kafka集群扩容加入一个新的broker,是谁负责监听这个broker的加入?
如果某个broker崩溃了,是谁负责监听这个broker崩溃?
这里就需要一个Kafka集群的总控组件,Controller。他负责管理整个Kafka集群范围内的各种东西。

zookeeper

(1)Kafka 通过 zookeeper 来存储集群的meta元数据信息。
(2)一旦controller所在broker宕机了,此时临时节点消失,集群里其他broker会一直监听这个临时节点,发现临时节点消失了,就争抢再次创建临时节点,

保证有一台新的broker会成为controller角色。


* offset
* 偏移量
MARKDOWN 复制 全屏

消费者在对应分区上已经消费的消息数(位置),offset保存的地方跟kafka版本有一定的关系。
kafka0.8 版本之前offset保存在zookeeper上。
kafka0.8 版本之后offset保存在kafka集群上。
它是把消费者消费topic的位置通过kafka集群内部有一个默认的topic,名称叫 __consumer_offsets,它默认有50个分区。

Kafka集群安装部署

 

1 集群安装部署

  • 1、下载安装包(http://kafka.apache.org)

    • https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
    kafka_2.11-1.1.0.tgz
    
  • 2、规划安装目录

    /bigdata/install
    
  • 3、上传安装包到服务器中

    通过FTP工具上传安装包到node01服务器上
    
  • 4、解压安装包到指定规划目录

    tar -zxvf kafka_2.11-1.1.0.tgz -C /bigdata/install
    
  • 5、重命名解压目录

    mv kafka_2.11-1.1.0 kafka
    
  • 6、在hadoop01上修改配置文件

    • 进入到kafka安装目录下有一个config目录,修改配置信息 vi server.properties

      #指定kafka对应的broker id ,唯一
      broker.id=0
      #指定数据存放的目录
      log.dirs=/bigdata/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=hadoop01
      
    • 配置kafka环境变量 sudo vi /etc/profile

      export KAFKA_HOME=/bigdata/install/kafka
      export PATH=$PATH:$KAFKA_HOME/bin
      
  • 6、分发kafka安装目录到其他节点

    scp -r kafka hadoop02:/bigdata/install
    scp -r kafka hadoop03:/bigdata/install
    ## hadoop02/hadoop03上都加上环境变量
    export KAFKA_HOME=/bigdata/install/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
  • 7、修改hadoop02和hadoop03上的配置

    • hadoop02 上修改配置文件 vi server.properties

      #指定kafka对应的broker id ,唯一
      broker.id=1
      #指定数据存放的目录
      log.dirs=/bigdata/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=hadoop02
      
    • hadoop03 上修改配置文件 vi server.properties

      #指定kafka对应的broker id ,唯一
      broker.id=2
      #指定数据存放的目录
      log.dirs=/bigdata/install/kafka/kafka-logs
      #指定zk地址
      zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181
      #指定是否可以删除topic ,默认是false 表示不可以删除
      delete.topic.enable=true
      #指定broker主机名
      host.name=hadoop03
      
  • 8、让每台节点的kafka环境变量生效

    • 在每台服务器执行命令

      source /etc/profile
      

2 kafka集群启动和停止

  • 1、启动kafka集群

    • 先启动zookeeper集群,然后在所有节点如下执行脚本

      nohup kafka-server-start.sh /bigdata/install/kafka/config/server.properties >/dev/null 2>&1 &
      
      SHELL 复制 全屏
  • 2、停止kafka集群

    • 所有节点执行关闭kafka脚本

      kafka-server-stop.sh

kafka命令行的管理使用

 
  • 1、创建topic

    • 使用 kafka-topics.sh脚本

      kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
      
  • 2、查询所有的topic

    • 使用 kafka-topics.sh脚本

      kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
      
  • 3、查看topic的描述信息

    • 使用 kafka-topics.sh脚本

      kafka-topics.sh --describe --topic test --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
      
  • 4、删除topic

    • 使用 kafka-topics.sh脚本

      kafka-topics.sh --delete --topic test --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
      
  • 5、模拟生产者写入数据到topic中

    • 使用 kafka-console-producer.sh 脚本

      kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test 
      
  • 6、模拟消费者拉取topic中的数据

    • 使用 kafka-console-consumer.sh 脚本

      kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic test --from-beginning
      或者
      kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic test --from-beginning

kafka集群起停脚本

 

脚本位置

cd /home/hadoop/bin

脚本内容

vi kafka.sh
#输入以下内容
#!/bin/bash
case $1 in
"start" ){
  for(( i = 1;i <= 3;i = $i +1));do
    echo ============ hadoop0$i kafka $1 ===================
    ssh hadoop$i "source /etc/profile;nohup kafka-server-start.sh /bigdata/install/kafka/config/server.properties >/dev/null 2>&1 &"
  done
};;
"stop" ){
  for(( i = 1;i <= 3;i = $i +1));do
    echo ============ hadoop0$i kafka $1 ===================
    ssh hadoop$i "source /etc/profile;kafka-server-stop.sh"
  done
};;
esac

修改文件权限

chmod 777 kafka.sh

执行脚本,验证脚本

# 先确保已经启动了zookeeper
kafka.sh start
xcall jps
# 输出以下内容
============= hadoop1 jps =============
9616 Jps
9267 Kafka
9191 QuorumPeerMain
============= hadoop2 jps =============
8291 Kafka
8213 QuorumPeerMain
8632 Jps
============= hadoop3 jps =============
7129 Kafka
7053 QuorumPeerMain
7470 Jps
kafka.sh stop
xcall jps
# 输出以下内容
============= hadoop1 jps =============
9191 QuorumPeerMain
9740 Jps
============= hadoop2 jps =============
8213 QuorumPeerMain
8748 Jps
============= hadoop3 jps =============
7585 Jps
7053 QuorumPeerMain

 

Link to comment
Share on other sites