Overview
之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeper
和kafka
的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java
交互,不可能接触到命令行和后台的。本文旨在记录一下java
和kafka
的简单交互,web
中道理相同,只不过程序入口换成了action
。
1.新建项目配置环境
打开eclipse
,依次点击Window→Preferences→Java→Build Path→User Libraries
,然后在右边选择New
,添加一个自己常用的Library
,我命名为kafka
。选中kafka
,右边选择Add External JARS
,然后到之前安装好的kafka
的目录,找到libs
这个文件夹,如果按照上次配置好的情况,这里应该是15
个jar
文件,见下图:
全部选中,点击确定。这样,我们以后就可以复用了。
然后我们在eclipse
中构建一个普通的java
项目testKafka
。右击项目,依次点击Build Path→Add Libraries→User Library
,选择kafka
这个library
,点击Finish
。这样环境就搭建好了。
2.生产者消费者程序
下面编码测试程序,即消息生产者和消息消费者。
2.1 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | package testKafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MsgProducer { private static Producer<String,String> producer; private final Properties props= new Properties(); public MsgProducer(){ //定义连接的broker list props.put( "metadata.broker.list" , "127.0.0.1:9092" ); //定义序列化类,Java中对象传输之前要序列化 props.put( "serializer.class" , "kafka.serializer.StringEncoder" ); producer = new Producer<String, String>( new ProducerConfig(props)); } public static void main(String[] args) { MsgProducer mProducer= new MsgProducer(); //定义topic String topic= "testkafka" ; //定义要发送给topic的消息 String mString = "Hello kafka!" ; //构建消息对象 KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString); //推送消息到broker producer.send(data); producer.close(); } } |
这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list
为127.0.0.1:9092
和serializer.class
设置为kafka.serializer.StringEncoder
。打开上次配置的producer.properties
文件,看到这两项配置分别为metadata.broker.list=localhost:9092
,serializer.class=kafka.serializer.DefaultEncoder
。broker list
要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。
2.2 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | package testKafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MsgConsumer { private final ConsumerConnector consumer; private final String topic; public MsgConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); //定义连接zookeeper信息 props.put( "zookeeper.connect" , zookeeper); //定义Consumer所有的groupID props.put( "group.id" , groupId); props.put( "zookeeper.session.timeout.ms" , "500" ); props.put( "zookeeper.sync.time.ms" , "250" ); props.put( "auto.commit.interval.ms" , "1000" ); consumer = Consumer.createJavaConsumerConnector( new ConsumerConfig(props)); this .topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); //定义订阅topic数量 topicCount.put(topic, new Integer( 1 )); //返回的是所有topic的Map Map<String, List<KafkaStream< byte [], byte []>>> consumerStreams = consumer.createMessageStreams(topicCount); //取出我们要需要的topic中的消息流 List<KafkaStream< byte [], byte []>> streams = consumerStreams.get(topic); for ( final KafkaStream stream : streams) { ConsumerIterator< byte [], byte []> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { System.out.println( new String(consumerIte.next().message())); } } if (consumer != null ) { consumer.shutdown(); } } public static void main(String[] args) { String topic = "testkafka" ; MsgConsumer mConsumer = new MsgConsumer( "127.0.0.1:2181" , "test-consumer-group" , topic); mConsumer.testConsumer(); } } |
这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connect
和group.id
。这两项打开consumer.properties
就可以看到。
3. 测试
首先,要在命令行中启动zookeeper
和kafka
。
在消费者程序里面,运行一下,Console
框显示如下:
1 2 | log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. |
这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console
框显示如下:
1 2 3 | log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please initialize the log4j system properly. Hello kafka! |