您正在查看: 后台开发 分类下的文章

Java后台获取客户端ip地址的注意事项

Overview

前面讲到,消息队列需要用到客户端的ip地址和时间戳,获取客户端的ip地址就需要下面几行简单的代码。

import javax.servlet.http.HttpServletRequest;

HttpServletRequest request = ServletActionContext.getRequest();
String ipAddress = request.getHeader("X-FORWARDED-FOR");  
if (ipAddress == null) {  
    ipAddress = request.getRemoteAddr();  
}
System.out.println(ipAddress);

我将上面几行代码,放入之前的PhosphoPredict项目的action中做实验,结果显示如下:

0:0:0:0:0:0:0:1

这和我们平时的认知不同,下面就是解决办法。

1.修改hosts文件

我在ubuntu14.04系统下,打开/etc/hosts文件,看到如下信息:

# Localhost
127.0.0.1       localhost
127.0.0.1       young
255.255.255.255     broadcasthost
::1     localhost
fe80::1%lo0     localhost

我们将

::1     localhost

这句注释掉,再提交,

127.0.0.1

正确答案就出现了。
其他系统,请自行查找文件目录并修改。

2.设置eclipse

我们在eclipse运行本程序的时候,可以配置一下运行参数:
依次点击Run→Run Configurations…→Apache Tomcat→Tomcat v7.0 Server at localhost,这时候点击右边的Arguments,在下面的VM arguments框中加上一句:

-Djava.net.preferIPv4Stack=true

最后点击Apply。然后就可以了。

总结:这是因为,只有在本机提交的时候,才会出现,客户端提交就会显示真实ip地址了。而系统会优先显示ipv6格式的本机ip。我们两种方法,第一种是屏蔽掉ipv6的表示方法,第二种则是指定优先ipv4表示。
两种方法更推荐第二种。

这篇文章主要参考了stackoverflow.

kafka在java中简单应用

Overview

之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeperkafka的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java交互,不可能接触到命令行和后台的。本文旨在记录一下javakafka的简单交互,web中道理相同,只不过程序入口换成了action

1.新建项目配置环境

打开eclipse,依次点击Window→Preferences→Java→Build Path→User Libraries,然后在右边选择New
,添加一个自己常用的Library,我命名为kafka。选中kafka,右边选择Add External JARS,然后到之前安装好的kafka的目录,找到libs这个文件夹,如果按照上次配置好的情况,这里应该是15jar文件,见下图:

2016-07-12 17:16:06屏幕截图.png

全部选中,点击确定。这样,我们以后就可以复用了。

然后我们在eclipse中构建一个普通的java项目testKafka。右击项目,依次点击Build Path→Add Libraries→User Library,选择kafka这个library,点击Finish。这样环境就搭建好了。

2.生产者消费者程序

下面编码测试程序,即消息生产者和消息消费者。

2.1 生产者

package testKafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MsgProducer {
    private static Producer<String,String> producer;
    private final Properties props=new Properties();
    public MsgProducer(){
        //定义连接的broker list
        props.put("metadata.broker.list", "127.0.0.1:9092");
        //定义序列化类,Java中对象传输之前要序列化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }
    public static void main(String[] args) {
        MsgProducer mProducer=new MsgProducer();
        //定义topic
        String topic="testkafka";
        
        //定义要发送给topic的消息
        String mString = "Hello kafka!";
                
        //构建消息对象
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString);
 
        //推送消息到broker
        producer.send(data);
        producer.close();
    }
}

这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list127.0.0.1:9092serializer.class设置为kafka.serializer.StringEncoder。打开上次配置的producer.properties文件,看到这两项配置分别为metadata.broker.list=localhost:9092serializer.class=kafka.serializer.DefaultEncoderbroker list要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。

2.2 消费者

package testKafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MsgConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MsgConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        //定义连接zookeeper信息
        props.put("zookeeper.connect", zookeeper);
        //定义Consumer所有的groupID
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        //定义订阅topic数量
        topicCount.put(topic, new Integer(1));
        //返回的是所有topic的Map
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        //取出我们要需要的topic中的消息流
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            while (consumerIte.hasNext()) {
                System.out.println(new String(consumerIte.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
            
    }

    public static void main(String[] args) {
        String topic = "testkafka";
        MsgConsumer mConsumer = new MsgConsumer("127.0.0.1:2181", "test-consumer-group", topic);
        mConsumer.testConsumer();
    }

}

这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connectgroup.id。这两项打开consumer.properties就可以看到。

3. 测试

首先,要在命令行中启动zookeeperkafka

在消费者程序里面,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.

这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Hello kafka!

这样,我们的程序就跑通了。
这里主要参考了kafka官方例子:生产者消费者

SQLite数据库在java中应用

Overview

这次的服务器,我们准备使用消息队列和SQLite这两个新东西,SQLite主要负责将用户提交的序列信息存储,并在结果页面查询显示。存储的信息包括用户的ip+timeStamp,序列内容,序列的处理状态。

1.安装SQLite

SQLite的安装特别简单。
ubuntu14.04这个版本自带SQLite,在命令行输入sqlite3,显示如下:

~sqlite3
SQLite version 3.8.2 2013-12-06 14:53:30
Enter ".help" for instructions
Enter SQL statements terminated with a ";"
sqlite>

如果没有出现上述信息,可以使用如下命令进行安装:

sudo apt-get install sqlite3

每个版本的ubuntu的软件源略有新旧不同,但不影响使用。

2.SQLitejava交互

SQLitejava中的基本使用方法和其他诸如mysql等数据库大同小异,下面我们从创建并连接数据库,建表,增,删,改,查这6个方面简单介绍下。

2.1 创建并连接数据库

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;

public class SQLiteJDBC {
    public static void main( String args[] ) {
        Connection c;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Connected database successfully");
    }
}

2.2 建表

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;

public class createTable {
    public static void main( String args[] ) {
        Connection c;
        Statement stmt;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
            System.out.println("Connected database successfully");

            stmt = c.createStatement();
            String sql = "CREATE TABLE TEAM " +
                         "(NUMBER INT PRIMARY KEY     NOT NULL, " +
                         " NAME           TEXT    NOT NULL, " + 
                         " AGE            INT     NOT NULL, " + 
                         " COUNTRY        CHAR(50), " + 
                         " POSITION       CHAR(50))"; 
            stmt.executeUpdate(sql);
            stmt.close();
            c.close();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Table created successfully");
    }

}

2.3 增加(insert)

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;


public class insert {
    public static void main(String args[]) {
        Connection c;
        Statement stmt;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
            c.setAutoCommit(false);
            System.out.println("Connected database successfully");

            stmt = c.createStatement();
            String sql = "INSERT INTO TEAM (NUMBER,NAME,AGE,COUNTRY,POSITION) " +
                       "VALUES (1, 'Navas', 29, 'Costa Rica', 'goalkeeper' );"; 
            stmt.executeUpdate(sql);

            sql = "INSERT INTO TEAM (NUMBER,NAME,AGE,COUNTRY,POSITION) " +
                "VALUES (2, 'Varane', 23, 'France', 'center back' );"; 
            stmt.executeUpdate(sql);

            sql = "INSERT INTO TEAM (NUMBER,NAME,AGE,COUNTRY,POSITION) " +
                "VALUES (3, 'Pepe', 33, 'Portugal', 'center back' );"; 
            stmt.executeUpdate(sql);

            sql = "INSERT INTO TEAM (NUMBER,NAME,AGE,COUNTRY,POSITION) " +
                "VALUES (4, 'Ramos', 30, 'Spain', 'center back' );";            
            stmt.executeUpdate(sql);
            
            sql = "INSERT INTO TEAM (NUMBER,NAME,AGE,COUNTRY,POSITION) " +
                    "VALUES (7, 'Ronaldo', 31, 'Portugal', 'stricker' );";          
            stmt.executeUpdate(sql);

            stmt.close();
            c.commit();
            c.close();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Insert successfully");      
    }

}

2.4 查找(select)

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class select {

    public static void main(String[] args) {
        Connection c;
        Statement stmt;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
            c.setAutoCommit(false);
            System.out.println("Connected database successfully");

            stmt = c.createStatement();
            ResultSet rs = stmt.executeQuery( "SELECT * FROM TEAM;" );
            while ( rs.next() ) {
                int number = rs.getInt("number");
                String  name = rs.getString("name");
                int age  = rs.getInt("age");
                String  country = rs.getString("country");
                String position = rs.getString("position");
                System.out.println( "NUMBER = " + number );
                System.out.println( "NAME = " + name );
                System.out.println( "AGE = " + age );
                System.out.println( "COUNTRY = " + country );
                System.out.println( "POSITION = " + position );
                System.out.println();
            }
            rs.close();
            stmt.close();
            c.close();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Select successfully");

    }

}


2.5 更改(update)

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class update {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Connection c = null;
        Statement stmt = null;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
            c.setAutoCommit(false);
            System.out.println("Connected database successfully");

            stmt = c.createStatement();
            String sql = "UPDATE TEAM set POSITION = 'fullback' where NUMBER=4;";
            stmt.executeUpdate(sql);
            c.commit();

            ResultSet rs = stmt.executeQuery( "SELECT * FROM TEAM;" );
            while ( rs.next() ) {
                int number = rs.getInt("number");
                String  name = rs.getString("name");
                int age  = rs.getInt("age");
                String  country = rs.getString("country");
                String position = rs.getString("position");
                System.out.println( "NUMBER = " + number );
                System.out.println( "NAME = " + name );
                System.out.println( "AGE = " + age );
                System.out.println( "COUNTRY = " + country );
                System.out.println( "POSTION = " + position );
                System.out.println();
            }
            rs.close();
            stmt.close();
            c.close();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Update successfully");

    }

}

2.6 删除(delete)

package testSQLite;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

public class delete {

    public static void main(String[] args) {
        Connection c = null;
        Statement stmt = null;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:testSQLite.db");
            c.setAutoCommit(false);
            System.out.println("Connected database successfully");

            stmt = c.createStatement();
            String sql = "DELETE from TEAM where NUMBER=3;";
            stmt.executeUpdate(sql);
            c.commit();

            ResultSet rs = stmt.executeQuery( "SELECT * FROM TEAM;" );
            while ( rs.next() ) {
                int number = rs.getInt("number");
                String  name = rs.getString("name");
                int age  = rs.getInt("age");
                String  country = rs.getString("country");
                String position = rs.getString("position");
                System.out.println( "NUMBER = " + number );
                System.out.println( "NAME = " + name );
                System.out.println( "AGE = " + age );
                System.out.println( "COUNTRY = " + country );
                System.out.println( "POSTION = " + position );
                System.out.println();
            }
            rs.close();
            stmt.close();
            c.close();
        } catch ( Exception e ) {
            e.printStackTrace();
        }
        System.out.println("Delete successfully");

    }

}

这篇文章主要参考了这里:SQLite - Java

KafkaOffsetMonitor监控消息消费状态

Overview

这次做服务器,计划加入消息队列,并在web页面显示当前提交的序列处理状态和已处理序列的信息。我们知道,在后台命令行中可以看到kafka的消息者处理消息的状态,但是,对于访问者来说,查看命令行是不现实的,于是我们便采用了KafkaOffsetMonitor这一开源软件。Github的下载地址如下:Kafka Offset Monitor

1. 安装jdk,zookeeper,kafka

这部分可以参考上一篇文章:ubuntu14.04单机安装配置zookeeper和kafka

2. 安装配置KafkaOffsetMonitor

新建一个文件夹,比如我在kafka文件夹下建立子文件夹kafkaMonitor下载好以后,把这个KafkaOffsetMonitor-assembly-0.2.0.jar文件放入kafkaMonitor文件夹。在当前位置新建一个kafkaMonitor.sh文件,文件内容如下:

#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk localhost:2181 \
--port 8089 \
--refresh 10.seconds \
--retain 7.days

上面是最关键的几条配置,剩余配置不写也可以运行。逐一分析上面每一项配置:

  • --zk 这里写的地址和端口,是zookeeper集群的各个地址和端口。应和kafka/bin文件夹中的zookeeper.properties中的host.nameclientPort一致。本机是host.name=localhost,clientPort=2181
  • --port 这个是本软件KafkaOffsetMonitor的端口。注意不要使用那些著名的端口号,例如80,8080等。我采用了8089.
  • --refresh 这个是软件刷新间隔时间,不要太短也不要太长。
  • --retain 这个是数据在数据库中保存的时间。

3. 启动KafkaOffsetMonitor

  • 首先,启动zookeeper

    切换到/home/young/zookeeper/bin目录下,运行下面命令:

    ./zkServer.sh start
    
  • 然后,启动kafka

    切换到/home/young/kafka目录下,执行下面命令:

    bin/kafka-server-start.sh config/server.properties
    
  • 最后,启动KafkaOffsetMonitor

    切换到/home/young/kafka/kafkaMonitor目录下面,执行下面命令:

    ./kafkaMonitor.sh 
    

    如果显示如下,就证明成功了:

    serving resources from: jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    2016-06-10 12:05:13.724:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    2016-06-10 12:05:13.781:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
    2016-06-10 12:05:13.802:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8089
    

4.运行KafkaOffsetMonitor

这里就不新建话题topic了,沿用上一篇文章中的testkafka
切换到/home/young/kafka,打开两个命令行终端,分别打开生产者和消费者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafka --from-beginning

在生产者那里输入一条内容,如果在消费者那里能够收到,就证明成功了:

Hello kafkaOffsetMonitor!

这时候,我们打开浏览器,输入如下地址(对应你前面设置的地址和端口):

http://127.0.0.1:8089/

显示如下:

1.png

我们选择点击下面那个(因为上面那个已经死掉了):

2.png

其中,logSize,Offset,Lag分别代表总消息数,已处理消息数,未处理消息数
我们点击最上面的Topic List,就会显示所有Topic

3.png

我们点击最下面那个Topictestkafka,一路点下去,显示的是处理进度的过程图:

5.png

这里消息不多,如果多的话,将会非常壮观。
我们点击标题栏最后一项Visualization,就可以查看其他消费者或者整个集群的情况了(我们这里只有一个消费者,一个服务器):

6.png

这样,我们的安装就结束了,实际工程中使用,根据需要更改配置。
这篇文章主要参考了官方文档,和以下文章:
Kafka实战-KafkaOffsetMonitor
Apache Kafka监控之KafkaOffsetMonitor
apache kafka监控系列-KafkaOffsetMonitor

ubuntu14.04单机安装配置zookeeper和kafka

Overview

T4这个项目实验部分已经结束,下面需要将服务器做出来。为了方便以后扩展分布式的需要,这次将会运用Apache Kafka这个分布式消息发布订阅系统,以满足多人同时提交计算T4序列,解决并发排队问题。Apache kafka的详细介绍详见官网:Apache kafka
运行Apache Kafka,需要先安装好jdkzookeeperjdk安装过程就不赘述了。

1.安装配置zookeeper单机模式

这里,我们选择的是zookeeper-3.4.5这个版本,官网下载链接zookeeper-3.4.5。下载之后将zookeeper-3.4.5.tar.gz移动到主文件夹并重命名:

mv Downloads/zookeeper-3.4.5.tar.gz zookeeper.tar.gz

然后解压缩为zookeeper文件夹:

tar -zxvf zookeeper.tar.gz

切换到zookeeper/conf目录:

cd /home/young/zookeeper/conf

复制zoo_simple.cfgzoo.cfg:

cp zoo_simple.cfg zoo.cfg

打开zoo.cfg并修改内容如下:

initLimit=10
syncLimit=5
dataDir=/home/young/zookeeper/data
clientPort=2181

配置好后,手动创建dataDir目录:

mkdir /home/young/zookeeper/data

zookeeper配置环境变量,打开/etc/profile,在结尾添加如下两句,并保存:

export ZOOKEEPER_HOME=/home/young/zookeeper
export PATH=.:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

切换到zookeeper/bin目录:

cd /home/young/zookeeper/bin

启动zookeeperserver

./zkServer.sh start

显示效果如下,则证明zookeeper配置成功:

JMX enabled by default
Using config: /home/young/zookeeper/bin/zookeeper/conf/zoo.cfg
Starting zookeeper ... STARTED

需要结束服务时,仍在本目录下,执行如下命令即可:

./zkServer.sh stop

2.安装配置kafka单机模式

我们选择的是kafka_2.10-0.8.1.1.tgz,下载链接在这里:Apache kafka。下载之后放到主文件夹,并改名为kafka.tgz,然后解压缩到当前文件夹:

tar -zxvf kafka.tgz

切换到kafka/config目录:

cd /home/young/kafka/config

这里,我们需要修改4个配置文件:server.propertieszookeeper.propertiesproducer.propertiesconsumer.properties

2.1 配置server.properties

下面几项是必须修改的,其他项目为默认配置:

#broker.id需改成正整数,单机为1就好
broker.id=1
#指定端口号
port=9092
#localhost这一项还有其他要修改,详细见下面说明
host.name=localhost
#指定kafka的日志目录
log.dirs=/home/young/kafka/kafka-logs
#连接zookeeper配置项,这里指定的是单机,所以只需要配置localhost,若是实际生产环境,需要在这里添加其他ip地址和端口号
zookeeper.connect=localhost:2181

然后手动创建log.dirs空目录:

mkdir /home/young/kafka/kafka-logs

2.2 配置zookeeper.properties

很简单,这个文件暂时只需要修改3项:

#数据目录
dataDir=/home/young/kafka/zookeeper/data
#客户端端口
clientPort=2181
host.name=localhost

然后手动创建dataDir空目录:

mkdir /home/young/kafka/zookeeper/data

2.3 配置producer.properties

只需要指定一项:

metadata.broker.list=localhost:9092

2.4 配置consumer.properties

只需要指定一项:

zookeeper.connect=localhost:2181

2.5 补充说明

对于这个版本的kafka,在配置好以上4项之后,仍然需要做2件事:

  • 配置localhost,修改/etc/hosts文件
    我的hosts文件里面,localhost部分是如下配置:
    127.0.0.1   localhost
    #下面这句,你的计算机名是什么就填什么,我的是young
    127.0.0.1   young
    255.255.255.255 broadcasthost
    ::1 localhost
    fe80::1%lo0 localhost
    

这里如果不修改不添加,就会产生异常java.net.UnknownHostException

  • 添加slf4j-simple-1.7.2.jar
    这里是个bug/home/young/kafka/libs这个目录缺少slf4j-simple-1.7.2.jar这个文件,只有slf4j-api-1.7.2.jar这个文件是不够的,必须两个都有。或者可以下载其他版本的两个slf4j文件,放入本目录。slf4j-simple-1.7.2.jar可以去官网下载slf4j-1.7.2.tar.gz,下载后解压缩,就可以看到这两个文件了。
    如果这两个文件不全,就会有错误:
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    

环境安装配置部分到此结束,下面介绍简单使用。

3.kafka的使用

3.1 启动zookeeper服务

切换至/home/young/kafka目录,执行以下命令,以启动zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

3.2 启动kafka服务

仍在/home/young/kafka目录下,执行以下命令,以启动kafka服务:

bin/kafka-server-start.sh config/server.properties

对于这种启动之后就可以忽略的服务,可以在最前面加上nohup,让其在后台自己运行。

3.3 创建话题topic

新开一个命令行窗口,创建一个叫做testkafkatopic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafka

然后可以根据地址和端口号将话题topic 展示出来:

bin/kafka-topics.sh --list --zookeeper localhost:2181

显示如下:

testkafka

3.4 启动生产者producer

再开一个producer命令行窗口,执行以下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafka

然后可以之间在本窗口输入消息,每遇到换行符就认为是一条消息输入完成。

Hello kafka !

3.5 启动消费者consumer

再新开一个consumer命令行窗口,执行以下命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafka --from-beginning

显示如下:

Hello kafka !

这时候,启动kafka服务的命令行就会有显示:

[2016-06-07 14:19:12,683] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

然后,每在producer那里输入一条,consumer这里就会显示一条,然后kafka服务那里也会产生日志记录。
在以后启动时,只需要依次启动kafka serverproducerconsumer就可以了。

查阅了很多资料,但是都不是很详细,但是仍要感谢这些参考文章:

Kafka快速入门
Zookeeper & Kafka Install
ZooKeeper安装配置
Kafka单机环境部署
SLF4J warning or error messages and their meanings
stackoverflow
stackoverflow