Overview
之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeper
和kafka
的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java
交互,不可能接触到命令行和后台的。本文旨在记录一下java
和kafka
的简单交互,web
中道理相同,只不过程序入口换成了action
。
1.新建项目配置环境
打开eclipse
,依次点击Window→Preferences→Java→Build Path→User Libraries
,然后在右边选择New
,添加一个自己常用的Library
,我命名为kafka
。选中kafka
,右边选择Add External JARS
,然后到之前安装好的kafka
的目录,找到libs
这个文件夹,如果按照上次配置好的情况,这里应该是15
个jar
文件,见下图:
全部选中,点击确定。这样,我们以后就可以复用了。
然后我们在eclipse
中构建一个普通的java
项目testKafka
。右击项目,依次点击Build Path→Add Libraries→User Library
,选择kafka
这个library
,点击Finish
。这样环境就搭建好了。
2.生产者消费者程序
下面编码测试程序,即消息生产者和消息消费者。
2.1 生产者
package testKafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class MsgProducer {
private static Producer<String,String> producer;
private final Properties props=new Properties();
public MsgProducer(){
//定义连接的broker list
props.put("metadata.broker.list", "127.0.0.1:9092");
//定义序列化类,Java中对象传输之前要序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
producer = new Producer<String, String>(new ProducerConfig(props));
}
public static void main(String[] args) {
MsgProducer mProducer=new MsgProducer();
//定义topic
String topic="testkafka";
//定义要发送给topic的消息
String mString = "Hello kafka!";
//构建消息对象
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString);
//推送消息到broker
producer.send(data);
producer.close();
}
}
这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list
为127.0.0.1:9092
和serializer.class
设置为kafka.serializer.StringEncoder
。打开上次配置的producer.properties
文件,看到这两项配置分别为metadata.broker.list=localhost:9092
,serializer.class=kafka.serializer.DefaultEncoder
。broker list
要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。
2.2 消费者
package testKafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class MsgConsumer {
private final ConsumerConnector consumer;
private final String topic;
public MsgConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
//定义连接zookeeper信息
props.put("zookeeper.connect", zookeeper);
//定义Consumer所有的groupID
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<String, Integer>();
//定义订阅topic数量
topicCount.put(topic, new Integer(1));
//返回的是所有topic的Map
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
//取出我们要需要的topic中的消息流
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
while (consumerIte.hasNext()) {
System.out.println(new String(consumerIte.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
String topic = "testkafka";
MsgConsumer mConsumer = new MsgConsumer("127.0.0.1:2181", "test-consumer-group", topic);
mConsumer.testConsumer();
}
}
这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connect
和group.id
。这两项打开consumer.properties
就可以看到。
3. 测试
首先,要在命令行中启动zookeeper
和kafka
。
在消费者程序里面,运行一下,Console
框显示如下:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console
框显示如下:
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Hello kafka!