您正在查看: 分布式 分类下的文章

IntelliJ IDEA中创建Spark项目

Overview

之前项目的人遗留的数据被接管后进行了改造,现在处理数据都用Spark来做了。这里记录一下如何在Mac本地的IntelliJ IDEA中搭建一个简单的Spark项目。这里不涉及HDFS这些相关的内容,只记录Spark
在此之前,我们默认已经安装好了最新版的IntelliJ IDEA以及配置好了JDK

1. IDEA中安装Scala插件

在IDEA的启动页面,点击Configure,在下拉列表中选择Plugins。

1572421037325.jpg

在左边输入框内搜索Scala,点击安装即可。这样我们就安装好了Scala。
接着回到刚刚的启动页面,选择Create New Project,选择Scala,右边的4个选项需要选IDEA,点击Next,项目名字我们就叫做hellospark吧,选好JDK后,选择Scala SDK,点击Create,我喜欢新的,所以选择了最新版本的scala-sdk-2.12.7(这一步导致了后面的异常),点击Finish。

1572421838866.jpg

2. 安装Spark2.4.4

登录到Spark官网,下载最新的稳定版本Spark2.4.4的压缩包。

1572420350442.jpg

在本地解压缩下载的压缩包,记住解压地址。
点击File -> Project Structure,弹出的窗口点击Libraries,点击Java

1573040365622.jpg

然后选择从解压的Spark压缩包里面找jars文件夹,

1573040567239.jpg

点击OK之后,会弹出来让选择模块的对话框,选择hellospark即可。

3.编写Scala程序

下面来试试是不是我们的环境是不是可行。
src文件夹上右击,新建一个Scala Class,选择Object,名字叫HelloWorldHelloWorld.scala内容如下:

import org.apache.spark.{SparkConf, SparkContext}
object HelloWorld {
    def main(args: Array[String]): Unit={
        val conf = new SparkConf()
            .setAppName("first spark")
            .setMaster("local[1]");

        new SparkContext(conf)
            .parallelize(List(1,2,3,4,5,6,7,8,9,10)) //传入可迭代对象
            .map(x => x*x) // 平方运算
            .filter(_%9 == 0) //只要可以被9整除的
            .collect()
            .foreach(println);
    }
}

在文件内右击选择Run 'HelloWorld',抛出如下异常:

Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
    at org.apache.spark.util.Utils$.stringToSeq(Utils.scala:2664)
    at org.apache.spark.internal.config.ConfigHelpers$.stringToSeq(ConfigBuilder.scala:49)
    at org.apache.spark.internal.config.TypedConfigBuilder$$anonfun$toSequence$1.apply(ConfigBuilder.scala:125)
    at org.apache.spark.internal.config.TypedConfigBuilder$$anonfun$toSequence$1.apply(ConfigBuilder.scala:125)
    at org.apache.spark.internal.config.TypedConfigBuilder.createWithDefault(ConfigBuilder.scala:143)
    at org.apache.spark.internal.config.package$.<init>(package.scala:172)
    at org.apache.spark.internal.config.package$.<clinit>(package.scala)
    at org.apache.spark.SparkConf$.<init>(SparkConf.scala:716)
    at org.apache.spark.SparkConf$.<clinit>(SparkConf.scala)
    at org.apache.spark.SparkConf.set(SparkConf.scala:95)
    at org.apache.spark.SparkConf.set(SparkConf.scala:84)
    at org.apache.spark.SparkConf.setAppName(SparkConf.scala:121)
    at HelloWorld$.main(HelloWorld.scala:5)
    at HelloWorld.main(HelloWorld.scala)

Scala SDK版本太新了,我们需要更换版本。

点击File -> Project Structure,弹出的窗口点击Global Libraries,点击Scala SDK,点击-号,把2.12.7版本删掉;接着点击+号,点击点击Scala SDK,选择2.11.12版本。配置好以后,右击hellospark项目,选择Rebuild Module 'hellospark'。再运行程序,虽然也会出来很多红字(因为完整的大数据环境并没有搭建,只安装了Spark),但是程序还是正确产生结果9,36,81了。

Diffuser单一节点所需软件的安装

Overview

Diffuser是一个基于分布式的生物序列特征生成网站,因此需要在每个单一节点安装所需的软件:ViennaRNA-2.4.8VSL2psipredspineX。本文以deepamp节点为例,分别安装和配置以上4个软件。

1. 安装ViennaRNA-2.4.8

该软件用于预测RNA序列结构,安装步骤如下:

wget https://www.tbi.univie.ac.at/RNA/download/sourcecode/2_4_x/ViennaRNA-2.4.8.tar.gz
tar -zxvf ViennaRNA-2.4.8.tar.gz
cd ViennaRNA-2.4.8
./configure
make
sudo make install

2. 安装VSL2

该软件用于生成预测的蛋白质序列的disorder information,只需下载后解压,无需安装:

wget http://www.dabi.temple.edu/disprot/download/VSL2.tar.gz
tar -xvf VSL2.tar.gz

3. 安装psipred

该软件用于预测Protein序列的二级结构,安装步骤如下:

wget http://bioinfadmin.cs.ucl.ac.uk/downloads/psipred/psipred.4.02.tar.gz
tar -xvf psipred.4.02.tar.gz

在解压成功后,还需进行简单的配置。首先,由于该软件部分代码需要tcsh支持,因此我们需要先进行安装:

sudo apt-get install tcsh

其次,该软件当中调用了blast,因此我们需要指定blast的安装路径和数据库路径,这里blast的我们用的是2.2.26版本而数据库我们使用的是uniref50。配置如下:

cd psipred
vim runpsipred

将下面的代码:

# The name of the BLAST data bank
set dbname = uniref90

# Where the NCBI programs have been installed
set ncbidir = /usr/local/bin

# Where the PSIPRED V4 programs have been installed
set execdir = ./bin

# Where the PSIPRED V4 data files have been installed
set datadir = ./data

改为:

# The name of the BLAST data bank
set dbname = /feagen/uniref/uniref50/uniref50

# Where the NCBI programs have been installed
set ncbidir = /var/www/cgi-bin/blast-2.2.26/bin

# Where the PSIPRED V4 programs have been installed
set execdir = /var/www/cgi-bin/psipred/bin

# Where the PSIPRED V4 data files have been installed
set datadir = /var/www/cgi-bin/psipred/data

4. 安装spineX

spineX的压缩包可以到http://sparks.informatics.iupui.edu/上下载,下载完成后,解压并进行如下配置:

tar -zxvf spineXpublic.tgz
cd spineXpublic
vim spX.pl

到指定行修改脚本文件,89行,指定blast安装路径:

$blastdir = '/var/www/cgi-bin-diffuser/blast-2.2.26';

108行,修改数据库路径:

system("$blastdir/bin/blastpgp -d /feagen/uniref/uniref50/uniref50 -j 3 -i $infl -Q $tmprf -a 4 > $workdir/_tmp2.$irnd")==0 or die " Aborting(spineX): Can't do psiblast\n";

128行,删除缓存文件:

system("rm -r $workdir")==0 or print "Couldn't remove temp. dir. $workdir\n";

FastDFS分布式文件系统在服务器集群上的安装部署

Overview

到目前为止,我们手里已经有了10台服务器了。之前一直说要把这些服务器搭建一个分布式文件系统,现在条件终于成熟了。这些服务器预装的系统大多数是Ubuntu16.04LTS发行版的Linux,少部分是Ubuntu14.04LTS。这次我们选择5台服务器,1feagen(118.138.241.39)服务器作为tracker4台服务器(Bastion3(118.138.240.146),Bastion4(118.138.233.74),Bastion6(118.138.233.26),POSSUM(118.138.233.27))作为storage。这里会记录下FastDFS分布式文件系统在Ubuntu16.04上的安装部署过程的细节。

5台服务器都要安装,只是配置文件稍有不同。

1.必备软件

  • FastDFS
    这个去GitHub上面下载源码:FastDFS
  • libfastcommon
    这个仍然要去GitHub上面下载源码:libfastcommon

这两个Zip包用wget或者在本地下载上传至服务器均可。

2.libfastcommon的安装

首先解压libfastcommon-master.zip包:

unzip libfastcommon-master.zip

进入这个文件夹:

cd libfastcommon-master/

依次输入如下命令:

sudo ./make.sh
sudo ./make.sh install

输入第二条命令之后,会有如下显示:

mkdir -p /usr/lib64
mkdir -p /usr/lib
install -m 755 libfastcommon.so /usr/lib64
install -m 755 libfastcommon.so /usr/lib
mkdir -p /usr/include/fastcommon
install -m 644 common_define.h hash.h chain.h logger.h base64.h shared_func.h pthread_func.h ini_file_reader.h _os_define.h sockopt.h sched_thread.h http_func.h md5.h local_ip_func.h avl_tree.h ioevent.h ioevent_loop.h fast_task_queue.h fast_timer.h process_ctrl.h fast_mblock.h connection_pool.h fast_mpool.h fast_allocator.h fast_buffer.h skiplist.h multi_skiplist.h flat_skiplist.h skiplist_common.h system_info.h fast_blocked_queue.h php7_ext_wrapper.h id_generator.h char_converter.h char_convert_loader.h /usr/include/fastcommon

可以看到libfastcommon.so安装到了/usr/lib64.
下面,我们需要为两个文件创建软链接,指向FastDFS主程序的lib目录:

sudo ln -s /usr/lib64/libfastcommon.so /usr/local/lib/libfastcommon.so
sudo ln -s /usr/lib64/libfastcommon.so /usr/lib/libfastcommon.so
sudo ln -s /usr/lib64/libfdfsclient.so /usr/local/lib/libfdfsclient.so
sudo ln -s /usr/lib64/libfdfsclient.so /usr/lib/libfdfsclient.so

3.FastDFS的安装

解压fastdfs-master.zip文件:

unzip fastdfs-master.zip

进入FastDFS源码根目录:

cd fastdfs-master

依次输入下面两条命令:

sudo ./make.sh
sudo ./make.sh install

安装完成之后,所有的可执行文件都被放在了/usr/bin/目录下面,可以用下面的命令查看:

ls /usr/bin/fdfs*

显示共有14个文件:

/usr/bin/fdfs_appender_test   
/usr/bin/fdfs_download_file  
/usr/bin/fdfs_test1
/usr/bin/fdfs_appender_test1  
/usr/bin/fdfs_file_info      
/usr/bin/fdfs_trackerd
/usr/bin/fdfs_append_file     
/usr/bin/fdfs_monitor        
/usr/bin/fdfs_upload_appender
/usr/bin/fdfs_crc32           
/usr/bin/fdfs_storaged       
/usr/bin/fdfs_upload_file
/usr/bin/fdfs_delete_file     
/usr/bin/fdfs_test

4. 配置FastDFS

配置文件在目录/etc/fdfs下,查看一下:

ls /etc/fdfs

显示如下:

client.conf.sample  storage.conf.sample  storage_ids.conf.sample  tracker.conf.sample

4.1 创建跟踪器和存储节点的配置文件

cd /etc/fdfs
sudo cp tracker.conf.sample tracker.conf
sudo cp storage.conf.sample storage.conf

4.2 修改tracker配置文件

feagen(118.138.241.39)服务器中修改tracker配置文件如下:

sudo vim /etc/fdfs/tracker.conf

可以看到如下显示:

# the base path to store data and log files
base_path=/home/yuqing/fastdfs

这行改成如下内容:

base_path=/feagen/fastdfs/tracker

这种配置文件,注意等号两边不能有空格。且上面的目录必须是真实存在的。
启动tracker

sudo fdfs_trackerd /etc/fdfs/tracker.conf start

查看监听端口:

sudo netstat -unltp|grep fdfs

如果显示如下,则证明tracker启动成功:

tcp        0      0 0.0.0.0:22122           0.0.0.0:*               LISTEN      6951/fdfs_trackerd

4.3 修改storage配置文件

Bastion3(118.138.240.146)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion3_cache/fastdfs/storage
store_path0=/bastion3_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

Bastion4(118.138.233.74)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion4_cache/fastdfs/storage
store_path0=/bastion4_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

Bastion6(118.138.233.26)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/bastion6_cache/fastdfs/storage
store_path0=/bastion6_cache/fastdfs/storage
tracker_server=118.138.241.39:22122

POSSUM(118.138.233.27)服务器中,修改storage配置文件如下:

sudo vim /etc/fdfs/storage.conf

可以看到如下显示:

group_name=group1
base_path=/home/yuqing/fastdfs
store_path0=/home/yuqing/fastdfs
tracker_server=192.168.209.121:22122

修改为:

group_name=group1
base_path=/possum/fastdfs/storage
store_path0=/possum/fastdfs/storage
tracker_server=118.138.241.39:22122

启动storage服务:

sudo fdfs_storaged /etc/fdfs/storage.conf start

显示如下,即代表成功启动:

process fdfs_storaged already running, pid: 28250

查看监听端口:

sudo netstat -unltp|grep fdfs

如果显示如下,则证明storage启动成功:

tcp        0      0 0.0.0.0:23000           0.0.0.0:*               LISTEN      28250/fdfs_storaged

如果没有显示,则有可能是storage启动失败。

那么我们以Bastion6服务器为例,查看启动日志:

tail /bastion6_cache/fastdfs/storage/logs/storaged.log

如果显示如下,代表启动失败:

[2018-05-13 19:56:31] ERROR - file: storage_ip_changed_dealer.c, line: 186, connect to tracker server 118.138.2
40.146:22122 fail, errno: 110, error info: Connection timed out

连接超时;
如果显示如下,代表启动成功:

[2018-05-14 12:59:40] INFO - file: tracker_client_thread.c, line: 310, successfully connect to tracker server 118.138.240.146:22122, as a tracker client, my ip is 118.138.233.26
[2018-05-14 13:00:10] INFO - file: tracker_client_thread.c, line: 1263, tracker server 118.138.240.146:22122, set tracker leader: 118.138.240.146:22122
[2018-05-14 13:03:06] INFO - file: storage_sync.c, line: 2733, successfully connect to storage server 118.138.233.74:23000, continuous fail count: 16
[2018-05-14 13:03:41] INFO - file: storage_sync.c, line: 2733, successfully connect to storage server 118.138.233.74:23000

连接成功。

超时解读:
之前跟Chris讨论的时候,刚开始以为这几天服务器都是Monash云服务器中心的,属于内网,所以相互之间的端口是不用开的,实验之后才知道不开端口是不行的。后来联系Jerico开开3个端口之后,才连接成功。这3个端口分别是22122888823000

  • 22122:代表tracker服务端口;
  • 8888:代表HTTP协议端口,网页上传下载文件需要这个端口;
  • 23000:代表storage服务端口。

至此,storage存储节点安装成功。

4.4 查看所有存储节点信息

所有存储节点都启动之后,在任意一台storage上面用下面命令,查看集群状态信息:

sudo /usr/bin/fdfs_monitor /etc/fdfs/storage.conf

显示如下:

[2018-05-24 01:27:18] DEBUG - base_path=/bastion6_cache/fastdfs/storage, connect_timeout=10, network_timeout=60, tracker_server_count=1, anti_steal_token=0, anti_steal_secret_key length=0, use_connection_pool=0, g_connection_pool_max_idle_time=3600s, use_storage_id=0, storage server id count: 0

server_count=1, server_index=0

tracker server is 118.138.241.39:22122

group count: 1

Group 1:
group name = group1
disk total space = 100665 MB
disk free space = 72339 MB
trunk free space = 0 MB
storage server count = 4
active server count = 4
storage server port = 23000
storage HTTP port = 8888
store path count = 1
subdir count per path = 256
current write server index = 0
current trunk file id = 0

    Storage 1:
        id = 118.138.233.26
        ip_addr = 118.138.233.26 (vm-118-138-233-26.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-13 19:55:46
        up time = 2018-05-20 00:51:17
        total storage = 483679 MB
        free storage = 452554 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 1
        success_upload_count = 1
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 87026
        success_upload_bytes = 87026
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 11288
        success_sync_in_bytes = 11288
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:26:54
        last_source_update = 2018-05-17 01:27:29
        last_sync_update = 2018-05-17 01:06:46
        last_synced_timestamp = 1970-01-01 10:00:00 (never synced)
    Storage 2:
        id = 118.138.233.27
        ip_addr = 118.138.233.27 (vm-118-138-233-27.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-17 00:39:17
        up time = 2018-05-20 00:52:30
        total storage = 2015737 MB
        free storage = 1689979 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 0
        success_upload_count = 0
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 0
        success_upload_bytes = 0
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 98314
        success_sync_in_bytes = 98314
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:27:08
        last_source_update = 1970-01-01 10:00:00
        last_sync_update = 2018-05-21 16:32:36
        last_synced_timestamp = 2018-05-17 01:06:38 (20m:51s delay)
    Storage 3:
        id = 118.138.233.74
        ip_addr = 118.138.233.74 (vm-118-138-233-74.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-13 20:03:06
        up time = 2018-05-20 00:53:40
        total storage = 100665 MB
        free storage = 72339 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 2
        success_upload_count = 2
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 2
        success_set_meta_count = 2
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 11190
        success_upload_bytes = 11190
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 174142
        success_sync_in_bytes = 87026
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 3
        success_file_open_count = 3
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 3
        success_file_write_count = 3
        last_heart_beat_time = 2018-05-24 01:27:13
        last_source_update = 2018-05-17 01:06:38
        last_sync_update = 2018-05-21 16:32:54
        last_synced_timestamp = 2018-05-17 01:27:30 (-1s delay)
    Storage 4:
        id = 118.138.240.146
        ip_addr = 118.138.240.146 (vm-118-138-240-146.erc.monash.edu.au)  ACTIVE
        http domain = 
        version = 5.12
        join time = 2018-05-20 00:53:13
        up time = 2018-05-20 00:53:13
        total storage = 483679 MB
        free storage = 449872 MB
        upload priority = 10
        store_path_count = 1
        subdir_count_per_path = 256
        storage_port = 23000
        storage_http_port = 8888
        current_write_path = 0
        source storage id = 118.138.233.26
        if_trunk_server = 0
        connection.alloc_count = 256
        connection.current_count = 3
        connection.max_count = 3
        total_upload_count = 0
        success_upload_count = 0
        total_append_count = 0
        success_append_count = 0
        total_modify_count = 0
        success_modify_count = 0
        total_truncate_count = 0
        success_truncate_count = 0
        total_set_meta_count = 0
        success_set_meta_count = 0
        total_delete_count = 0
        success_delete_count = 0
        total_download_count = 0
        success_download_count = 0
        total_get_meta_count = 0
        success_get_meta_count = 0
        total_create_link_count = 0
        success_create_link_count = 0
        total_delete_link_count = 0
        success_delete_link_count = 0
        total_upload_bytes = 0
        success_upload_bytes = 0
        total_append_bytes = 0
        success_append_bytes = 0
        total_modify_bytes = 0
        success_modify_bytes = 0
        stotal_download_bytes = 0
        success_download_bytes = 0
        total_sync_in_bytes = 98314
        success_sync_in_bytes = 98314
        total_sync_out_bytes = 0
        success_sync_out_bytes = 0
        total_file_open_count = 5
        success_file_open_count = 5
        total_file_read_count = 0
        success_file_read_count = 0
        total_file_write_count = 5
        success_file_write_count = 5
        last_heart_beat_time = 2018-05-24 01:27:03
        last_source_update = 1970-01-01 10:00:00
        last_sync_update = 2018-05-21 16:32:54
        last_synced_timestamp = 2018-05-17 01:27:30 (-1s delay)

可以看到,4storage状态都是ACTIVE,表示都启动成功了。

5. 测试上传文件

需要注意一点,我们之后的特征提取程序会在feagen服务器上,所以,我们暂时将client安装在这台服务器上。测试一下在本台服务器上上传文件,能否在所有storage服务器上面查看得到此文件的备份。

5.1 修改tracker服务器上的client配置文件

先后输入以下命令:

cd /etc/fdfs
sudo cp client.conf.sample client.conf
sudo vim client.conf

base_pathtracker_server两处配置修改为:

base_path=/feagen/fastdfs/client
tracker_server=118.138.241.39:22122

shell当中输入下面的命令:

sudo /usr/bin/fdfs_upload_file /etc/fdfs/client.conf fastdfs-master.zip

返回如下,即可表明上传成功:

group1/M00/00/00/dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

这个返回值,在perl当中,以如下方式获取:

my $file_name = qx(sudo /usr/bin/fdfs_upload_file /etc/fdfs/client.conf fastdfs-master.zip);

到任意一个storage服务器当中查看,以Bastion3为例,至此处:

/bastion3_cache/fastdfs/storage/data/00/00

发现存在如下文件:

dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

即表明上传成功。

6. 测试下载文件

当前目录下,运行下面的程序:

sudo /usr/bin/fdfs_download_file /etc/fdfs/client.conf group1/M00/00/00/dorpG1td37yAFSYPAAZ-Sk23ivY904.zip

即可在当前目录下面找到dorpG1td37yAFSYPAAZ-Sk23ivY904.zip,表明下载成功。

至此我们的分布式文件服务器算是搭建完成了。

我们参考了这些文章,表示感谢!
FastDFS分布式文件系统集群安装与配置
FastDFS--原理篇
分布式文件系统FastDFS原理介绍
Ubuntu下安装并配置FastDFS

ubuntu14.04安装gearman及perl扩展包

Overview

Bastion4这个项目经过我们实验验证和安全考虑,决定舍弃kafka而转用gearman这个消息队列框架,具体分析将在后续文章中给出,这里只记录gearman相关的安装。

1.下载安装gearman

最新版的gearmangearmand-1.1.12。我们执行下面几步,先将其下载到本地主文件夹,并解压缩。

sudo apt-get update
wget https://launchpad.net/gearmand/1.2/1.1.12/+download/gearmand-1.1.12.tar.gz
tar zxvf gearmand-1.1.12.tar.gz
cd gearmand-1.1.12/

进入gearmand-1.1.12文件夹后,如果直接运行

./configure

就会报缺少如下几个依赖包错误:

configure: error: could not find boost
configure: error: Could not find a version of the library
configure: error: could not find gperf
configure: error: Unable to find libevent
configure: error: Unable to find libuuid

所以,我们先将这些依赖都安装好:

sudo apt-get install libboost-dev
sudo apt-get install libboost-all-dev
sudo apt-get install gperf
sudo apt-get install libevent-dev
sudo apt-get install uuid-dev

安装好之后,如果没有错误,仍在gearmand-1.1.12文件夹下运行下面两条命令,编译时间比较长:

sudo make
sudo make install

这个过程中如果出现了错误,就运行下面的命令清除一下之前编译产生的可执行文件以及object文件(即扩展名为o的文件):

sudo make clean

继续重新安装编译:

./configure
sudo make
sudo make install

没有错误的话,就安装gearmanjob server

sudo apt-get install gearman-job-server

安装好以后,运行一下gearman:

gearman

会提示错误:

gearman: error while loading shared libraries: libgearman.so.8: cannot open shared object file: No such file or directory

这表示找不到libgearman.so.8所在的目录。这时我们打开/etc/ld.so.conf文件:

sudo vim /etc/ld.so.conf

添加一句话:

include /usr/local/lib

保存退出,并执行下面这句:

sudo /sbin/ldconfig

这样就不会出错了。
启动一下job server

gearmand -d

报错如下:

gearmand: Could not open log file "/usr/local/var/log/gearmand.log", from "/home/young/gearmand-1.1.12", switching to stderr. (No such file or directory)

我们这样解决:在/usr/local/下面新建var子目录,进去,新建log子目录,再进去,新建文件gearmand.log。这样就没有问题了。
sudo权限运行下面的命令:

sudo gearmand -d -L 127.0.0.1 -p 4730

-d表示daemon,在后台运行;
-L表示监听的ip,默认是localhost
-p表示监听的端口号port,默认是4730

这样gearmanubuntu14.04上面就安装成功了。

2.安装perl扩展包

perl端需要3个扩展包:

Gearman::Server
Gearman::Client
Gearman::Worker

可以用之前Chris的博客BioPerl(一):安装BioPerl中介绍的方法,先安装好CPAN
之后,用sudo权限打开CPAN

sudo cpan

然后依次安装3个扩展包:

install Gearman::Server
install Gearman::Client
install Gearman::Worker

都安装后之后,可以重启一下机器。
启动之后,用如下命令观察4730端口,查看gearmanjob server是否已经启动(gearmand -d这条命令是否生效):

sudo lsof -i:4730

如果不加sudo是看不到的,所以linux命令建议都要加上sudo

可以看到:

COMMAND  PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand 576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand 576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)

服务器端可能会只显示

COMMAND  PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand 576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)

可见这个server是开机启动的。

3.运行测试perl脚本

client.pl将消息发送到server,相当于生产者;
worker.pl来处理这些消息,并把结果返回给client.pl
相当于消费者。我们观察输出即可。
client.pl代码如下:

#!/usr/bin/perl
use strict;
use warnings;
use Gearman::Client;
use Storable; 
use Storable qw(freeze);
use Storable qw(thaw);
use IO::All;

# fork this process
my $pid = fork();
if ($pid == 0)
{
    # do this in the child
    print "start new client \n";
    my $client = Gearman::Client->new;
    print "finish new client \n";
    print "start job_servers \n";
    $client->job_servers('127.0.0.1',4730);
    print "finish job_servers \n";
    # 设置异步任务
    print "start new_task_set \n";
    my $tasks = $client->new_task_set; 
    print "finish new_task_set \n";
    print "start add_task \n";
    #handle database
    my @rows=('hello','byebye');
    $tasks->add_task(
        # 开始任务,多个参数
        showMessage => freeze(\@rows), 
        # 注册回调函数 
        { on_complete => \&complete },  
    );  
    print "finish add_task \n";
    print "start wait \n";
    # 等待任务结束
    $tasks->wait;
    print "finish wait \n";
    exit;
}

print "The background task will be finished shortly.\n";
 
sub complete{   
    my $ret = ${ $_[0] };
    #io("complete.txt")->print($ret);
    print $ret, "\n";
} 
 

worker.pl代码如下:

#!/usr/bin/perl
use strict;
use warnings;
use Gearman::Worker;
use Storable qw(thaw);
use Storable qw(freeze);

 print "start new worker \n";
my $worker = Gearman::Worker->new;
print "finish new worker \n";
print "start job_servers \n";
$worker->job_servers('127.0.0.1',4730);
print "finish job_servers \n";
# Worker 注册可以使用的功能
print "start register_function \n";
$worker->register_function( showMessage => \&showMessage );  
 print "finish register_function \n";
# 等待连接的任务
print "start work \n";

$worker->work while 1;  
print "finish work \n";
sub showMessage{
    my @row=@{ thaw($_[0]->arg) };

    my $job = \@row;
    print "\n";
    print "$row[0] \n";
    print "$row[1] \n";   
    print "start sleep \n";
    my $date = &getTime();  
    print  $date->{date}," ",$date->{hour},":",$date->{minute},":",$date->{second};
    print "\n";
    sleep(10);
    print "finish sleep \n";
    $date = &getTime();  
    print  $date->{date}," ",$date->{hour},":",$date->{minute},":",$date->{second};
    print "\n";

    my $ret = "hello world";
    return $ret;
}

sub getTime
{
    my $time = shift || time();
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($time);

    $year += 1900;
    $mon ++;

    $min  = '0'.$min  if length($min)  < 2;
    $sec  = '0'.$sec  if length($sec)  < 2;
    $mon  = '0'.$mon  if length($mon)  < 2;
    $mday = '0'.$mday if length($mday) < 2;
    $hour = '0'.$hour if length($hour) < 2;
    
    my $weekday = ('Sun','Mon','Tue','Wed','Thu','Fri','Sat')[$wday];

    return { 'second' => $sec,
             'minute' => $min,
             'hour'   => $hour,
             'day'    => $mday,
             'month'  => $mon,
             'year'   => $year,
             'weekNo' => $wday,
             'wday'   => $weekday,
             'yday'   => $yday,
             'date'   => "$year-$mon-$mday"
          };
}

我们先运行worker.pl

sudo perl worker.pl

再查看4730端口:

sudo lsof -i:4730

发现多了两个占用:

COMMAND   PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand  576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   33u  IPv4 191760      0t0  TCP localhost:4730->localhost:37151 (ESTABLISHED)
perl     4508    root    3u  IPv4 192605      0t0  TCP localhost:37151->localhost:4730 (ESTABLISHED)

再运行多个client.pl

sudo perl client.pl

每启动一个client.pl就会多两个端口占用。

COMMAND   PID    USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
gearmand  576 gearman    9u  IPv4  12861      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   10u  IPv6  12862      0t0  TCP *:4730 (LISTEN)
gearmand  576 gearman   33u  IPv4 191760      0t0  TCP localhost:4730->localhost:37151 (ESTABLISHED)
gearmand  576 gearman   34u  IPv4 215493      0t0  TCP localhost:4730->localhost:37189 (ESTABLISHED)
perl     4508    root    3u  IPv4 192605      0t0  TCP localhost:37151->localhost:4730 (ESTABLISHED)
perl     4709    root    3u  IPv4 214688      0t0  TCP localhost:37189->localhost:4730 (ESTABLISHED)

每运行一次worker.pl表示启动一个新的消费者。如果只启动一个消费者,而启动多个生产者,就可以很好地观察到“排队”效果了:一定是一个任务执行完成之后才会开始处理另一个任务。
这样gearmanperl扩展包的安装使用就结束了。

这篇文章主要参考了如下几篇文章:

Gearman Job Server
Gearman
使用 Gearman 实现分布式处理
Gearman 安装使用 以及 问题处理
Ubuntu下Gearman安装搭建

kafka在java中简单应用

Overview

之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeperkafka的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java交互,不可能接触到命令行和后台的。本文旨在记录一下javakafka的简单交互,web中道理相同,只不过程序入口换成了action

1.新建项目配置环境

打开eclipse,依次点击Window→Preferences→Java→Build Path→User Libraries,然后在右边选择New
,添加一个自己常用的Library,我命名为kafka。选中kafka,右边选择Add External JARS,然后到之前安装好的kafka的目录,找到libs这个文件夹,如果按照上次配置好的情况,这里应该是15jar文件,见下图:

2016-07-12 17:16:06屏幕截图.png

全部选中,点击确定。这样,我们以后就可以复用了。

然后我们在eclipse中构建一个普通的java项目testKafka。右击项目,依次点击Build Path→Add Libraries→User Library,选择kafka这个library,点击Finish。这样环境就搭建好了。

2.生产者消费者程序

下面编码测试程序,即消息生产者和消息消费者。

2.1 生产者

package testKafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MsgProducer {
    private static Producer<String,String> producer;
    private final Properties props=new Properties();
    public MsgProducer(){
        //定义连接的broker list
        props.put("metadata.broker.list", "127.0.0.1:9092");
        //定义序列化类,Java中对象传输之前要序列化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }
    public static void main(String[] args) {
        MsgProducer mProducer=new MsgProducer();
        //定义topic
        String topic="testkafka";
        
        //定义要发送给topic的消息
        String mString = "Hello kafka!";
                
        //构建消息对象
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString);
 
        //推送消息到broker
        producer.send(data);
        producer.close();
    }
}

这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list127.0.0.1:9092serializer.class设置为kafka.serializer.StringEncoder。打开上次配置的producer.properties文件,看到这两项配置分别为metadata.broker.list=localhost:9092serializer.class=kafka.serializer.DefaultEncoderbroker list要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。

2.2 消费者

package testKafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MsgConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MsgConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        //定义连接zookeeper信息
        props.put("zookeeper.connect", zookeeper);
        //定义Consumer所有的groupID
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        //定义订阅topic数量
        topicCount.put(topic, new Integer(1));
        //返回的是所有topic的Map
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        //取出我们要需要的topic中的消息流
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            while (consumerIte.hasNext()) {
                System.out.println(new String(consumerIte.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
            
    }

    public static void main(String[] args) {
        String topic = "testkafka";
        MsgConsumer mConsumer = new MsgConsumer("127.0.0.1:2181", "test-consumer-group", topic);
        mConsumer.testConsumer();
    }

}

这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connectgroup.id。这两项打开consumer.properties就可以看到。

3. 测试

首先,要在命令行中启动zookeeperkafka

在消费者程序里面,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.

这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Hello kafka!

这样,我们的程序就跑通了。
这里主要参考了kafka官方例子:生产者消费者