Overview

这次做服务器,计划加入消息队列,并在web页面显示当前提交的序列处理状态和已处理序列的信息。我们知道,在后台命令行中可以看到kafka的消息者处理消息的状态,但是,对于访问者来说,查看命令行是不现实的,于是我们便采用了KafkaOffsetMonitor这一开源软件。Github的下载地址如下:Kafka Offset Monitor

1. 安装jdk,zookeeper,kafka

这部分可以参考上一篇文章:ubuntu14.04单机安装配置zookeeper和kafka

2. 安装配置KafkaOffsetMonitor

新建一个文件夹,比如我在kafka文件夹下建立子文件夹kafkaMonitor下载好以后,把这个KafkaOffsetMonitor-assembly-0.2.0.jar文件放入kafkaMonitor文件夹。在当前位置新建一个kafkaMonitor.sh文件,文件内容如下:

#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk localhost:2181 \
--port 8089 \
--refresh 10.seconds \
--retain 7.days

上面是最关键的几条配置,剩余配置不写也可以运行。逐一分析上面每一项配置:

  • --zk 这里写的地址和端口,是zookeeper集群的各个地址和端口。应和kafka/bin文件夹中的zookeeper.properties中的host.nameclientPort一致。本机是host.name=localhost,clientPort=2181
  • --port 这个是本软件KafkaOffsetMonitor的端口。注意不要使用那些著名的端口号,例如80,8080等。我采用了8089.
  • --refresh 这个是软件刷新间隔时间,不要太短也不要太长。
  • --retain 这个是数据在数据库中保存的时间。

3. 启动KafkaOffsetMonitor

  • 首先,启动zookeeper

    切换到/home/young/zookeeper/bin目录下,运行下面命令:

    ./zkServer.sh start
    
  • 然后,启动kafka

    切换到/home/young/kafka目录下,执行下面命令:

    bin/kafka-server-start.sh config/server.properties
    
  • 最后,启动KafkaOffsetMonitor

    切换到/home/young/kafka/kafkaMonitor目录下面,执行下面命令:

    ./kafkaMonitor.sh 
    

    如果显示如下,就证明成功了:

    serving resources from: jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    2016-06-10 12:05:13.724:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    2016-06-10 12:05:13.781:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
    2016-06-10 12:05:13.802:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8089
    

4.运行KafkaOffsetMonitor

这里就不新建话题topic了,沿用上一篇文章中的testkafka
切换到/home/young/kafka,打开两个命令行终端,分别打开生产者和消费者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafka --from-beginning

在生产者那里输入一条内容,如果在消费者那里能够收到,就证明成功了:

Hello kafkaOffsetMonitor!

这时候,我们打开浏览器,输入如下地址(对应你前面设置的地址和端口):

http://127.0.0.1:8089/

显示如下:

1.png

我们选择点击下面那个(因为上面那个已经死掉了):

2.png

其中,logSize,Offset,Lag分别代表总消息数,已处理消息数,未处理消息数
我们点击最上面的Topic List,就会显示所有Topic

3.png

我们点击最下面那个Topictestkafka,一路点下去,显示的是处理进度的过程图:

5.png

这里消息不多,如果多的话,将会非常壮观。
我们点击标题栏最后一项Visualization,就可以查看其他消费者或者整个集群的情况了(我们这里只有一个消费者,一个服务器):

6.png

这样,我们的安装就结束了,实际工程中使用,根据需要更改配置。
这篇文章主要参考了官方文档,和以下文章:
Kafka实战-KafkaOffsetMonitor
Apache Kafka监控之KafkaOffsetMonitor
apache kafka监控系列-KafkaOffsetMonitor