您正在查看: 分布式 分类下的文章

FastDFS分布式文件系统在服务器集群上的安装部署

Overview

到目前为止,我们手里已经有了10台服务器了。之前一直说要把这些服务器搭建一个分布式文件系统,现在条件终于成熟了。这些服务器预装的系统大多数是Ubuntu16.04LTS发行版的Linux,少部分是Ubuntu14.04LTS。这次我们选择5台服务器,1feagen(118.138.241.39)服务器作为tracker4台服务器(Bastion3(118.138.240.146),Bastion4(118.138.233.74),Bastion6(118.138.233.26),POSSUM(118.138.233.27))作为storage。这里会记录下FastDFS分布式文件系统在Ubuntu16.04上的安装部署过程的细节。

5台服务器都要安装,只是配置文件稍有不同。

1.必备软件

  • FastDFS
    这个去GitHub上面下载源码:FastDFS
  • libfastcommon
    这个仍然要去GitHub上面下载源码:libfastcommon

这两个Zip包用wget或者在本地下载上传至服务器均可。

2.libfastcommon的安装

首先解压libfastcommon-master.zip包:

unzip libfastcommon-master.zip

进入这个文件夹:

cd libfastcommon-master/

依次输入如下命令:

sudo ./make.sh
sudo ./make.sh install

输入第二条命令之后,会有如下显示:

mkdir -p /usr/lib64
mkdir -p /usr/lib
install -m 755 libfastcommon.so /usr/lib64
install -m 755 libfastcommon.so /usr/lib
mkdir -p /usr/include/fastcommon
install -m 644 common_define.h hash.h chain.h logger.h base64.h shared_func.h pthread_func.h ini_file_reader.h _os_define.h sockopt.h sched_thread.h http_func.h md5.h local_ip_func.h avl_tree.h ioevent.h ioevent_loop.h fast_task_queue.h fast_timer.h process_ctrl.h fast_mblock.h connection_pool.h fast_mpool.h fast_allocator.h fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h skiplist_common.h system_info.h fast_blocked_queue.h php7_ext_wrapper.h id_generator.h char_converter.h char_convert_loader.h /usr/include/fastcommon

可以看到libfastcommon.so安装到了/usr/lib64.
下面,我们需要为两个文件创建软链接,指向FastDFS主程序的lib目录:

sudo ln -s /usr/lib64/libfastcommon.so /usr/local/lib/libfastcommon.so
sudo ln -s /usr/lib64/libfastcommon.so /usr/lib/libfastcommon.so
sudo ln -s /usr/lib64/libfdfsclient.so /usr/local/lib/libfdfsclient.so
sudo ln -s /usr/lib64/libfdfsclient.so /usr/lib/libfdfsclient.so

3.FastDFS的安装

解压fastdfs-master.zip文件:

unzip fastdfs-master.zip

进入FastDFS源码根目录:

cd fastdfs-master

依次输入下面两条命令:

sudo ./make.sh
sudo ./make.sh install

安装完成之后,所有的可执行文件都被放在了/usr/bin/目录下面,可以用下面的命令查看:

ls /usr/bin/fdfs*

显示共有14个文件:

/usr/bin/fdfs_appender_test   
/usr/bin/fdfs_download_file  
/usr/bin/fdfs_test1
/usr/bin/fdfs_appender_test1  
/usr/bin/fdfs_file_info      
/usr/bin/fdfs_trackerd
/usr/bin/fdfs_append_file     
/usr/bin/fdfs_monitor        
/usr/bin/fdfs_upload_appender
/usr/bin/fdfs_crc32           
/usr/bin/fdfs_storaged       
/usr/bin/fdfs_upload_file
/usr/bin/fdfs_delete_file     
/usr/bin/fdfs_test

4. 配置FastDFS

配置文件在目录/etc/fdfs下,查看一下:

ls /etc/fdfs

显示如下:

client.conf.sample  storage.conf.sample  storage_ids.conf.sample  tracker.conf.sample

4.1 创建跟踪器和存储节点的配置文件

cd /etc/fdfs
sudo cp tracker.conf.sample tracker.conf
sudo cp storage.conf.sample storage.conf

4.2 修改tracker配置文件

feagen(118.138.241.39)服务器中修改tracker配置文件如下:

sudo vim /etc/fdfs/tracker.conf

可以看到如下显示:

# the base path to store data and log files
base_path=/home/yuqing/fastdfs

这行改成如下内容:

base_path=/feagen/fastdfs/tracker

这种配置文件,注意等号两边不能有空格。且上面的目录必须是真实存在的。
启动tracker

sudo fdfs_trackerd /etc/fdfs/tracker.conf start

查看监听端口:

sudo netstat -unltp|grep fdfs

如果显示如下,则证明tracker启动成功:

tcp        0      0 0.0.0.0:22122           0.0.0.0:*               LISTEN      6951/fdfs_trackerd

4.3 修改storage配置文件

Bastion3(118.138.240.146)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion3_cache/fastdfs/storage
store_path0=/bastion3_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

Bastion4(118.138.233.74)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion4_cache/fastdfs/storage
store_path0=/bastion4_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

Bastion6(118.138.233.26)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion6_cache/fastdfs/storage
store_path0=/bastion6_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

POSSUM(118.138.233.27)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/possum/fastdfs/storage
store_path0=/possum/fastdfs/storage
tracker_server=118.138.241.39:22122

启动storage服务:

sudo fdfs_storaged /etc/fdfs/storage.conf start

显示如下,即代表成功启动:

process fdfs_storaged already running, pid: 28250

查看监听端口:

sudo netstat -unltp|grep fdfs

如果显示如下,则证明storage启动成功:

tcp        0      0 0.0.0.0:23000           0.0.0.0:*               LISTEN      28250/fdfs_storaged

如果没有显示,则有可能是storage启动失败。

那么我们以Bastion6服务器为例,查看启动日志:

tail /bastion6_cache/fastdfs/storage/logs/storaged.log

如果显示如下,代表启动失败:

[2018-05-13 19:56:31] ERROR - file: storage_ip_changed_dealer.c, line: 186, connect to tracker server 118.138.2
40.146:22122 fail, errno: 110, error info: Connection timed out

连接超时;
如果显示如下,代表启动成功:

[2018-05-14 12:59:40] INFO - file: tracker_client_thread.c, line: 310, successfully connect to tracker server 118.138.240.146:22122, as a tracker client, my ip is 118.138.233.26
[2018-05-14 13:00:10] INFO - file: tracker_client_thread.c, line: 1263, tracker server 118.138.240.146:22122, set tracker leader: 118.138.240.146:22122
[2018-05-14 13:03:06] INFO - file: storage_sync.c, line: 2733, successfully connect to storage server 118.138.233.74:23000, continuous fail count: 16
[2018-05-14 13:03:41] INFO - file: storage_sync.c, line: 2733, successfully connect to storage server 118.138.233.74:23000

连接成功。

超时解读:
之前跟Chris讨论的时候,刚开始以为这几天服务器都是Monash云服务器中心的,属于内网,所以相互之间的端口是不用开的,实验之后才知道不开端口是不行的。后来联系Jerico开开3个端口之后,才连接成功。这3个端口分别是22122888823000

  • 22122:代表tracker服务端口;
  • 8888:代表HTTP协议端口,网页上传下载文件需要这个端口;
  • 23000:代表storage服务端口。

至此,storage存储节点安装成功。

4.4 查看所有存储节点信息

所有存储节点都启动之后,在任意一台storage上面用下面命令,查看集群状态信息:

sudo /usr/bin/fdfs_monitor /etc/fdfs/storage.conf

显示如下:

[2018-05-24 01:27:18] DEBUG - base_path=/bastion6_cache/fastdfs/storage, connect_timeout=10, network_timeout=60, tracker_server_count=1, anti_steal_token=0, anti_steal_secret_key length=0, use_connection_pool=0, g_connection_pool_max_idle_time=3600s, use_storage_id=0, storage server id count: 0

server_count=1, server_index=0

tracker server is 118.138.241.39:22122

group count: 1

Group 1:
group name = group1
disk total space = 100665 MB
disk free space = 72339 MB
trunk free space = 0 MB
storage server count = 4
active server count = 4
storage server port = 23000
storage HTTP port = 8888
store path count = 1
subdir count per path = 256
current write server index = 0
current trunk file id = 0

    Storage 1:
        id = 118.138.233.26
        ip_addr = 118.138.233.26 (vm-118-138-233-26.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-13 19:55:46
        up time = 2018-05-20 00:51:17
        total storage = 483679 MB
        free storage = 452554 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 1
        success_upload_count = 1
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 87026
        success_upload_bytes = 87026
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 11288
        success_sync_in_bytes = 11288
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:26:54
        last_source_update = 2018-05-17 01:27:29
        last_sync_update = 2018-05-17 01:06:46
        last_synced_timestamp = 1970-01-01 10:00:00 (never synced)
    Storage 2:
        id = 118.138.233.27
        ip_addr = 118.138.233.27 (vm-118-138-233-27.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-17 00:39:17
        up time = 2018-05-20 00:52:30
        total storage = 2015737 MB
        free storage = 1689979 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 0
        success_upload_count = 0
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 0
        success_upload_bytes = 0
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 98314
        success_sync_in_bytes = 98314
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:27:08
        last_source_update = 1970-01-01 10:00:00
        last_sync_update = 2018-05-21 16:32:36
        last_synced_timestamp = 2018-05-17 01:06:38 (20m:51s delay)
    Storage 3:
        id = 118.138.233.74
        ip_addr = 118.138.233.74 (vm-118-138-233-74.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-13 20:03:06
        up time = 2018-05-20 00:53:40
        total storage = 100665 MB
        free storage = 72339 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 2
        success_upload_count = 2
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 2
        success_set_meta_count = 2
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 11190
        success_upload_bytes = 11190
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 174142
        success_sync_in_bytes = 87026
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 3
        success_file_open_count = 3
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 3
        success_file_write_count = 3
        last_heart_beat_time = 2018-05-24 01:27:13
        last_source_update = 2018-05-17 01:06:38
        last_sync_update = 2018-05-21 16:32:54
        last_synced_timestamp = 2018-05-17 01:27:30 (-1s delay)
    Storage 4:
        id = 118.138.240.146
        ip_addr = 118.138.240.146 (vm-118-138-240-146.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-20 00:53:13
        up time = 2018-05-20 00:53:13
        total storage = 483679 MB
        free storage = 449872 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 118.138.233.26
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 0
        success_upload_count = 0
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 0
        success_upload_bytes = 0
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 98314
        success_sync_in_bytes = 98314
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:27:03
        last_source_update = 1970-01-01 10:00:00
        last_sync_update = 2018-05-21 16:32:54
        last_synced_timestamp = 2018-05-17 01:27:30 (-1s delay)

可以看到,4storage状态都是ACTIVE,表示都启动成功了。

5. 测试上传文件

需要注意一点,我们之后的特征提取程序会在feagen服务器上,所以,我们暂时将client安装在这台服务器上。测试一下在本台服务器上上传文件,能否在所有storage服务器上面查看得到此文件的备份。

5.1 修改tracker服务器上的client配置文件

先后输入以下命令:

cd /etc/fdfs
sudo cp client.conf.sample client.conf
sudo vim client.conf

base_pathtracker_server两处配置修改为:

base_path=/feagen/fastdfs/client
tracker_server=118.138.241.39:22122

shell当中输入下面的命令:

sudo /usr/bin/fdfs_upload_file /etc/fdfs/client.conf fastdfs-master.zip

返回如下,即可表明上传成功:

group1/M00/00/00/dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

这个返回值,在perl当中,以如下方式获取:

my $file_name = qx(sudo /usr/bin/fdfs_upload_file /etc/fdfs/client.conf fastdfs-master.zip);

到任意一个storage服务器当中查看,以Bastion3为例,至此处:

/bastion3_cache/fastdfs/storage/data/00/00

发现存在如下文件:

dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

即表明上传成功。

6. 测试下载文件

当前目录下,运行下面的程序:

sudo /usr/bin/fdfs_download_file /etc/fdfs/client.conf group1/M00/00/00/dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

即可在当前目录下面找到dorpG1td37yAFSYPAAZ-Sk23ivY904.zip,表明下载成功。

至此我们的分布式文件服务器算是搭建完成了。

我们参考了这些文章,表示感谢!
FastDFS分布式文件系统集群安装与配置
FastDFS--原理篇
分布式文件系统FastDFS原理介绍
Ubuntu下安装并配置FastDFS

ubuntu14.04安装gearman及perl扩展包

Overview

Bastion4这个项目经过我们实验验证和安全考虑,决定舍弃kafka而转用gearman这个消息队列框架,具体分析将在后续文章中给出,这里只记录gearman相关的安装。

1.下载安装gearman

最新版的gearmangearmand-1.1.12。我们执行下面几步,先将其下载到本地主文件夹,并解压缩。

sudo apt-get update
wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz
tar zxvf gearmand-1.1.12.tar.gz
cd gearmand-1.1.12/

进入gearmand-1.1.12文件夹后,如果直接运行

./configure

就会报缺少如下几个依赖包错误:

configure: error: could not find boost
configure: error: Could not find a version of the library
configure: error: could not find gperf
configure: error: Unable to find libevent
configure: error: Unable to find libuuid

所以,我们先将这些依赖都安装好:

sudo apt-get install libboost-dev
sudo apt-get install libboost-all-dev
sudo apt-get install gperf
sudo apt-get install libevent-dev
sudo apt-get install uuid-dev

安装好之后,如果没有错误,仍在gearmand-1.1.12文件夹下运行下面两条命令,编译时间比较长:

sudo make
sudo make install

这个过程中如果出现了错误,就运行下面的命令清除一下之前编译产生的可执行文件以及object文件(即扩展名为o的文件):

sudo make clean

继续重新安装编译:

./configure
sudo make
sudo make install

没有错误的话,就安装gearmanjob server

sudo apt-get install gearman-job-server

安装好以后,运行一下gearman:

gearman

会提示错误:

gearman: error while loading shared libraries: libgearman.so.8: cannot open shared object file: No such file or directory

这表示找不到libgearman.so.8所在的目录。这时我们打开/etc/ld.so.conf文件:

sudo vim /etc/ld.so.conf

添加一句话:

include /usr/local/lib

保存退出,并执行下面这句:

sudo /sbin/ldconfig

这样就不会出错了。
启动一下job server

gearmand -d

报错如下:

gearmand: Could not open log file "/usr/local/var/log/gearmand.log", from "/home/young/gearmand-1.1.12", switching to stderr. (No such file or directory)

我们这样解决:在/usr/local/下面新建var子目录,进去,新建log子目录,再进去,新建文件gearmand.log。这样就没有问题了。
sudo权限运行下面的命令:

sudo gearmand -d -L 127.0.0.1 -p 4730

-d表示daemon,在后台运行;
-L表示监听的ip,默认是localhost
-p表示监听的端口号port,默认是4730

这样gearmanubuntu14.04上面就安装成功了。

2.安装perl扩展包

perl端需要3个扩展包:

Gearman::Server
Gearman::Client
Gearman::Worker

可以用之前Chris的博客BioPerl(一):安装BioPerl中介绍的方法,先安装好CPAN
之后,用sudo权限打开CPAN

sudo cpan

然后依次安装3个扩展包:

install Gearman::Server
install Gearman::Client
install Gearman::Worker

都安装后之后,可以重启一下机器。
启动之后,用如下命令观察4730端口,查看gearmanjob server是否已经启动(gearmand -d这条命令是否生效):

sudo lsof -i:4730

如果不加sudo是看不到的,所以linux命令建议都要加上sudo

可以看到:

COMMAND  PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand 576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand 576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)

服务器端可能会只显示

COMMAND  PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand 576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)

可见这个server是开机启动的。

3.运行测试perl脚本

client.pl将消息发送到server,相当于生产者;
worker.pl来处理这些消息,并把结果返回给client.pl
相当于消费者。我们观察输出即可。
client.pl代码如下:

#!/usr/bin/perl
use strict;
use warnings;
use Gearman::Client;
use Storable; 
use Storable qw(freeze);
use Storable qw(thaw);
use IO::All;

# fork this process
my $pid = fork();
if ($pid == 0)
{
    # do this in the child
    print "start new client \n";
    my $client = Gearman::Client->new;
    print "finish new client \n";
    print "start job_servers \n";
    $client->job_servers('127.0.0.1',4730);
    print "finish job_servers \n";
    # 设置异步任务
    print "start new_task_set \n";
    my $tasks = $client->new_task_set; 
    print "finish new_task_set \n";
    print "start add_task \n";
    #handle database
    my @rows=('hello','byebye');
    $tasks->add_task(
        # 开始任务,多个参数
        showMessage => freeze(\@rows), 
        # 注册回调函数 
        { on_complete => \&complete },  
    );  
    print "finish add_task \n";
    print "start wait \n";
    # 等待任务结束
    $tasks->wait;
    print "finish wait \n";
    exit;
}

print "The background task will be finished shortly.\n";
 
sub complete{   
    my $ret = ${ $_[0] };
    #io("complete.txt")->print($ret);
    print $ret, "\n";
} 
 

worker.pl代码如下:

#!/usr/bin/perl
use strict;
use warnings;
use Gearman::Worker;
use Storable qw(thaw);
use Storable qw(freeze);

 print "start new worker \n";
my $worker = Gearman::Worker->new;
print "finish new worker \n";
print "start job_servers \n";
$worker->job_servers('127.0.0.1',4730);
print "finish job_servers \n";
# Worker 注册可以使用的功能
print "start register_function \n";
$worker->register_function( showMessage => \&showMessage );  
 print "finish register_function \n";
# 等待连接的任务
print "start work \n";

$worker->work while 1;  
print "finish work \n";
sub showMessage{
    my @row=@{ thaw($_[0]->arg) };

    my $job = \@row;
    print "\n";
    print "$row[0] \n";
    print "$row[1] \n";   
    print "start sleep \n";
    my $date = &getTime();  
    print  $date->{date}," ",$date->{hour},":",$date->{minute},":",$date->{second};
    print "\n";
    sleep(10);
    print "finish sleep \n";
    $date = &getTime();  
    print  $date->{date}," ",$date->{hour},":",$date->{minute},":",$date->{second};
    print "\n";

    my $ret = "hello world";
    return $ret;
}

sub getTime
{
    my $time = shift || time();
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($time);

    $year += 1900;
    $mon ++;

    $min  = '0'.$min  if length($min)  < 2;
    $sec  = '0'.$sec  if length($sec)  < 2;
    $mon  = '0'.$mon  if length($mon)  < 2;
    $mday = '0'.$mday if length($mday) < 2;
    $hour = '0'.$hour if length($hour) < 2;
    
    my $weekday = ('Sun','Mon','Tue','Wed','Thu','Fri','Sat')[$wday];

    return { 'second' => $sec,
             'minute' => $min,
             'hour'   => $hour,
             'day'    => $mday,
             'month'  => $mon,
             'year'   => $year,
             'weekNo' => $wday,
             'wday'   => $weekday,
             'yday'   => $yday,
             'date'   => "$year-$mon-$mday"
          };
}

我们先运行worker.pl

sudo perl worker.pl

再查看4730端口:

sudo lsof -i:4730

发现多了两个占用:

COMMAND   PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand  576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   33u  IPv4 191760      0t0  TCP localhost:4730->localhost:37151 (ESTABLISHED)
perl     4508    root    3u  IPv4 192605      0t0  TCP localhost:37151->localhost:4730 (ESTABLISHED)

再运行多个client.pl

sudo perl client.pl

每启动一个client.pl就会多两个端口占用。

COMMAND   PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand  576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   33u  IPv4 191760      0t0  TCP localhost:4730->localhost:37151 (ESTABLISHED)
gearmand  576 gearman   34u  IPv4 215493      0t0  TCP localhost:4730->localhost:37189 (ESTABLISHED)
perl     4508    root    3u  IPv4 192605      0t0  TCP localhost:37151->localhost:4730 (ESTABLISHED)
perl     4709    root    3u  IPv4 214688      0t0  TCP localhost:37189->localhost:4730 (ESTABLISHED)

每运行一次worker.pl表示启动一个新的消费者。如果只启动一个消费者,而启动多个生产者,就可以很好地观察到“排队”效果了:一定是一个任务执行完成之后才会开始处理另一个任务。
这样gearmanperl扩展包的安装使用就结束了。

这篇文章主要参考了如下几篇文章:

Gearman Job Server
Gearman
使用 Gearman 实现分布式处理
Gearman 安装使用 以及 问题处理
Ubuntu下Gearman安装搭建

kafka在java中简单应用

Overview

之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeperkafka的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java交互,不可能接触到命令行和后台的。本文旨在记录一下javakafka的简单交互,web中道理相同,只不过程序入口换成了action

1.新建项目配置环境

打开eclipse,依次点击Window→Preferences→Java→Build Path→User Libraries,然后在右边选择New
,添加一个自己常用的Library,我命名为kafka。选中kafka,右边选择Add External JARS,然后到之前安装好的kafka的目录,找到libs这个文件夹,如果按照上次配置好的情况,这里应该是15jar文件,见下图:

2016-07-12 17:16:06屏幕截图.png

全部选中,点击确定。这样,我们以后就可以复用了。

然后我们在eclipse中构建一个普通的java项目testKafka。右击项目,依次点击Build Path→Add Libraries→User Library,选择kafka这个library,点击Finish。这样环境就搭建好了。

2.生产者消费者程序

下面编码测试程序,即消息生产者和消息消费者。

2.1 生产者

package testKafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MsgProducer {
    private static Producer<String,String> producer;
    private final Properties props=new Properties();
    public MsgProducer(){
        //定义连接的broker list
        props.put("metadata.broker.list", "127.0.0.1:9092");
        //定义序列化类,Java中对象传输之前要序列化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }
    public static void main(String[] args) {
        MsgProducer mProducer=new MsgProducer();
        //定义topic
        String topic="testkafka";
        
        //定义要发送给topic的消息
        String mString = "Hello kafka!";
                
        //构建消息对象
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString);
 
        //推送消息到broker
        producer.send(data);
        producer.close();
    }
}

这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list127.0.0.1:9092serializer.class设置为kafka.serializer.StringEncoder。打开上次配置的producer.properties文件,看到这两项配置分别为metadata.broker.list=localhost:9092serializer.class=kafka.serializer.DefaultEncoderbroker list要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。

2.2 消费者

package testKafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MsgConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MsgConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        //定义连接zookeeper信息
        props.put("zookeeper.connect", zookeeper);
        //定义Consumer所有的groupID
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        //定义订阅topic数量
        topicCount.put(topic, new Integer(1));
        //返回的是所有topic的Map
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        //取出我们要需要的topic中的消息流
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            while (consumerIte.hasNext()) {
                System.out.println(new String(consumerIte.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
            
    }

    public static void main(String[] args) {
        String topic = "testkafka";
        MsgConsumer mConsumer = new MsgConsumer("127.0.0.1:2181", "test-consumer-group", topic);
        mConsumer.testConsumer();
    }

}

这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connectgroup.id。这两项打开consumer.properties就可以看到。

3. 测试

首先,要在命令行中启动zookeeperkafka

在消费者程序里面,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.

这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Hello kafka!

这样,我们的程序就跑通了。
这里主要参考了kafka官方例子:生产者消费者

KafkaOffsetMonitor监控消息消费状态

Overview

这次做服务器,计划加入消息队列,并在web页面显示当前提交的序列处理状态和已处理序列的信息。我们知道,在后台命令行中可以看到kafka的消息者处理消息的状态,但是,对于访问者来说,查看命令行是不现实的,于是我们便采用了KafkaOffsetMonitor这一开源软件。Github的下载地址如下:Kafka Offset Monitor

1. 安装jdk,zookeeper,kafka

这部分可以参考上一篇文章:ubuntu14.04单机安装配置zookeeper和kafka

2. 安装配置KafkaOffsetMonitor

新建一个文件夹,比如我在kafka文件夹下建立子文件夹kafkaMonitor下载好以后,把这个KafkaOffsetMonitor-assembly-0.2.0.jar文件放入kafkaMonitor文件夹。在当前位置新建一个kafkaMonitor.sh文件,文件内容如下:

#! /bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk localhost:2181 \
--port 8089 \
--refresh 10.seconds \
--retain 7.days

上面是最关键的几条配置,剩余配置不写也可以运行。逐一分析上面每一项配置:

  • --zk 这里写的地址和端口,是zookeeper集群的各个地址和端口。应和kafka/bin文件夹中的zookeeper.properties中的host.nameclientPort一致。本机是host.name=localhost,clientPort=2181
  • --port 这个是本软件KafkaOffsetMonitor的端口。注意不要使用那些著名的端口号,例如80,8080等。我采用了8089.
  • --refresh 这个是软件刷新间隔时间,不要太短也不要太长。
  • --retain 这个是数据在数据库中保存的时间。

3. 启动KafkaOffsetMonitor

  • 首先,启动zookeeper

    切换到/home/young/zookeeper/bin目录下,运行下面命令:

    ./zkServer.sh start
    
  • 然后,启动kafka

    切换到/home/young/kafka目录下,执行下面命令:

    bin/kafka-server-start.sh config/server.properties
    
  • 最后,启动KafkaOffsetMonitor

    切换到/home/young/kafka/kafkaMonitor目录下面,执行下面命令:

    ./kafkaMonitor.sh 
    

    如果显示如下,就证明成功了:

    serving resources from: jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    2016-06-10 12:05:13.724:INFO:oejs.Server:jetty-7.x.y-SNAPSHOT
    log4j:WARN No appenders could be found for logger (org.I0Itec.zkclient.ZkConnection).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    2016-06-10 12:05:13.781:INFO:oejsh.ContextHandler:started o.e.j.s.ServletContextHandler{/,jar:file:/home/young/kafka/kafkaMonitor/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp}
    2016-06-10 12:05:13.802:INFO:oejs.AbstractConnector:Started SocketConnector@0.0.0.0:8089
    

4.运行KafkaOffsetMonitor

这里就不新建话题topic了,沿用上一篇文章中的testkafka
切换到/home/young/kafka,打开两个命令行终端,分别打开生产者和消费者:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafka --from-beginning

在生产者那里输入一条内容,如果在消费者那里能够收到,就证明成功了:

Hello kafkaOffsetMonitor!

这时候,我们打开浏览器,输入如下地址(对应你前面设置的地址和端口):

http://127.0.0.1:8089/

显示如下:

1.png

我们选择点击下面那个(因为上面那个已经死掉了):

2.png

其中,logSize,Offset,Lag分别代表总消息数,已处理消息数,未处理消息数
我们点击最上面的Topic List,就会显示所有Topic

3.png

我们点击最下面那个Topictestkafka,一路点下去,显示的是处理进度的过程图:

5.png

这里消息不多,如果多的话,将会非常壮观。
我们点击标题栏最后一项Visualization,就可以查看其他消费者或者整个集群的情况了(我们这里只有一个消费者,一个服务器):

6.png

这样,我们的安装就结束了,实际工程中使用,根据需要更改配置。
这篇文章主要参考了官方文档,和以下文章:
Kafka实战-KafkaOffsetMonitor
Apache Kafka监控之KafkaOffsetMonitor
apache kafka监控系列-KafkaOffsetMonitor

ubuntu14.04单机安装配置zookeeper和kafka

Overview

T4这个项目实验部分已经结束,下面需要将服务器做出来。为了方便以后扩展分布式的需要,这次将会运用Apache Kafka这个分布式消息发布订阅系统,以满足多人同时提交计算T4序列,解决并发排队问题。Apache kafka的详细介绍详见官网:Apache kafka
运行Apache Kafka,需要先安装好jdkzookeeperjdk安装过程就不赘述了。

1.安装配置zookeeper单机模式

这里,我们选择的是zookeeper-3.4.5这个版本,官网下载链接zookeeper-3.4.5。下载之后将zookeeper-3.4.5.tar.gz移动到主文件夹并重命名:

mv Downloads/zookeeper-3.4.5.tar.gz zookeeper.tar.gz

然后解压缩为zookeeper文件夹:

tar -zxvf zookeeper.tar.gz

切换到zookeeper/conf目录:

cd /home/young/zookeeper/conf

复制zoo_simple.cfgzoo.cfg:

cp zoo_simple.cfg zoo.cfg

打开zoo.cfg并修改内容如下:

initLimit=10
syncLimit=5
dataDir=/home/young/zookeeper/data
clientPort=2181

配置好后,手动创建dataDir目录:

mkdir /home/young/zookeeper/data

zookeeper配置环境变量,打开/etc/profile,在结尾添加如下两句,并保存:

export ZOOKEEPER_HOME=/home/young/zookeeper
export PATH=.:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH

切换到zookeeper/bin目录:

cd /home/young/zookeeper/bin

启动zookeeperserver

./zkServer.sh start

显示效果如下,则证明zookeeper配置成功:

JMX enabled by default
Using config: /home/young/zookeeper/bin/zookeeper/conf/zoo.cfg
Starting zookeeper ... STARTED

需要结束服务时,仍在本目录下,执行如下命令即可:

./zkServer.sh stop

2.安装配置kafka单机模式

我们选择的是kafka_2.10-0.8.1.1.tgz,下载链接在这里:Apache kafka。下载之后放到主文件夹,并改名为kafka.tgz,然后解压缩到当前文件夹:

tar -zxvf kafka.tgz

切换到kafka/config目录:

cd /home/young/kafka/config

这里,我们需要修改4个配置文件:server.propertieszookeeper.propertiesproducer.propertiesconsumer.properties

2.1 配置server.properties

下面几项是必须修改的,其他项目为默认配置:

#broker.id需改成正整数,单机为1就好
broker.id=1
#指定端口号
port=9092
#localhost这一项还有其他要修改,详细见下面说明
host.name=localhost
#指定kafka的日志目录
log.dirs=/home/young/kafka/kafka-logs
#连接zookeeper配置项,这里指定的是单机,所以只需要配置localhost,若是实际生产环境,需要在这里添加其他ip地址和端口号
zookeeper.connect=localhost:2181

然后手动创建log.dirs空目录:

mkdir /home/young/kafka/kafka-logs

2.2 配置zookeeper.properties

很简单,这个文件暂时只需要修改3项:

#数据目录
dataDir=/home/young/kafka/zookeeper/data
#客户端端口
clientPort=2181
host.name=localhost

然后手动创建dataDir空目录:

mkdir /home/young/kafka/zookeeper/data

2.3 配置producer.properties

只需要指定一项:

metadata.broker.list=localhost:9092

2.4 配置consumer.properties

只需要指定一项:

zookeeper.connect=localhost:2181

2.5 补充说明

对于这个版本的kafka,在配置好以上4项之后,仍然需要做2件事:

  • 配置localhost,修改/etc/hosts文件
    我的hosts文件里面,localhost部分是如下配置:
    127.0.0.1   localhost
    #下面这句,你的计算机名是什么就填什么,我的是young
    127.0.0.1   young
    255.255.255.255 broadcasthost
    ::1 localhost
    fe80::1%lo0 localhost
    

这里如果不修改不添加,就会产生异常java.net.UnknownHostException

  • 添加slf4j-simple-1.7.2.jar
    这里是个bug/home/young/kafka/libs这个目录缺少slf4j-simple-1.7.2.jar这个文件,只有slf4j-api-1.7.2.jar这个文件是不够的,必须两个都有。或者可以下载其他版本的两个slf4j文件,放入本目录。slf4j-simple-1.7.2.jar可以去官网下载slf4j-1.7.2.tar.gz,下载后解压缩,就可以看到这两个文件了。
    如果这两个文件不全,就会有错误:
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    

环境安装配置部分到此结束,下面介绍简单使用。

3.kafka的使用

3.1 启动zookeeper服务

切换至/home/young/kafka目录,执行以下命令,以启动zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

3.2 启动kafka服务

仍在/home/young/kafka目录下,执行以下命令,以启动kafka服务:

bin/kafka-server-start.sh config/server.properties

对于这种启动之后就可以忽略的服务,可以在最前面加上nohup,让其在后台自己运行。

3.3 创建话题topic

新开一个命令行窗口,创建一个叫做testkafkatopic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testkafka

然后可以根据地址和端口号将话题topic 展示出来:

bin/kafka-topics.sh --list --zookeeper localhost:2181

显示如下:

testkafka

3.4 启动生产者producer

再开一个producer命令行窗口,执行以下命令:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testkafka

然后可以之间在本窗口输入消息,每遇到换行符就认为是一条消息输入完成。

Hello kafka !

3.5 启动消费者consumer

再新开一个consumer命令行窗口,执行以下命令:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testkafka --from-beginning

显示如下:

Hello kafka !

这时候,启动kafka服务的命令行就会有显示:

[2016-06-07 14:19:12,683] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)

然后,每在producer那里输入一条,consumer这里就会显示一条,然后kafka服务那里也会产生日志记录。
在以后启动时,只需要依次启动kafka serverproducerconsumer就可以了。

查阅了很多资料,但是都不是很详细,但是仍要感谢这些参考文章:

Kafka快速入门
Zookeeper & Kafka Install
ZooKeeper安装配置
Kafka单机环境部署
SLF4J warning or error messages and their meanings
stackoverflow
stackoverflow