Kafka 接收 Flume 数据并存储至 HDFS
1.安装及配置环境
1.1 flume 的安装
1.1.1 下载安装包并解压
安装包可以在本地下载,通过 redHat 中的 ftp 传输到 linux 下文件中即可,
解压 tar -zxvf +压缩文件名称
1.1.2 配置 flume 环境变量
export FLUME_HOME=/usr/hdp/2.3.4.0-3485/flume/apache-flume-1.7.0-bin
export PATH=$PATH:$FLUME_HOME/bin
配置好环境变量要 source:
Source /etc/profile
使环境变量生效。
1.1.3 在多台服务间使用 scp 命令
其他两台使用 scp 命令:
Scp /usr/hdp/2.3.4.0-3485/flume root@node-231:/usr/hdp/2.3.4.0-3485/
Scp /usr/hdp/2.3.4.0-3485/flume root@node-232:/usr/hdp/2.3.4.0-3485/
同时:
scp /etc/profile root@node-231:/etc/
scp /etc/profile root@node-232:/etc/
复制过后,两个都需要 source 使环境变量生效。
1.1.4 检验是否安装成功
判断是否安装成功,在 flume 安装包下的 bin 目录中输入:
flume/apache-flume-1.7.0-bin/bin
Flume-ng version
出现我们安装的 flume 的版本,即安装成功。
1.2 Kafka 的安装
1.2.1 下载安装包
解压:tar -zxvf
进入:cd kafka_2.11-0.10.1.0
kafka_2.11-0.10.1.0.tgz
1.2.2 配置环境变量:
KAFKA_HOME=/usr/tools/kafka_2.11-0.10.1.0
PATH=$PATH:$KAFKA_HOME/bin
vi /etc/profile
export
export
配置后:
source /etc/profile
cd /kafka_2.11-0.10.1.0/config
将 server.properties 再复制一个
cp server.properties server1.properties
1.2.3 修改配置文件
vi server.properties
如图:
再次修改刚刚复制的 server1.properties
vi server1.properties
/etc/profile
/etc/profile
root@node-231:/etc/
root@node-233:/etc/
-r
-r
-r
-r
/usr/tools/kafka_2.11-0.10.1.0 root@node-233:/usr/tools/
/usr/tools/kafka_2.11-0.10.1.0
root@node-231:/usr/tools
将配置好的复制到其他的节点上:
scp
scp
scp
scp
将环境变量复制到其他的节点上后,其他的两个节点要分别 source:
source /etc/profile
其他的两个节点的 server.properfile 配置文件要进行修改。分别进入到每个配
置文件中,进行 id 的修改,一般修改为与文件相对应的 id 的即可。同时将文件
名称修改为不一样的,比如节点 1 上的文件是:
server1.properties,server.properties 那么节点 2 上的文件被复制后应该也
是这个两个文件:现在修改为 server2.properties,server3.properties 同时
将 id 修改为一般跟文件名对应的就可以,其他的就不用修改。
1.2.4 开启服务
bin/zookeeper-server-start.sh config/zookeeper.properties(先开启
zookeeper )
bin/kafka-server-start.sh config/server.properties
进入每个节点分别开启 kafka:
kafka-server-start.sh config/server1.properties &
kafka-server-start.sh config/server2.properties &
kafka-server-start.sh config/server3.properties &
kafka-server-start.sh config/server4.properties &
kafka-server-start.sh config/server5.properties &
查看每个节点的两个 kafka 的是否都已经开启 jps:
1.2.5 安装好后创建 Topic
输入命令:
kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic meassage
查询创建的 Topic:
kafka-topics.sh --list --zookeeper localhost:2181
1.2.6 生产者发送消息:
kafka-console-producer.sh --broker-list localhost:9092 --topic message
输入:This is a messageThis is another message
1.2.7 消费者消费数据
Kafka-console-consumer.sh --zookeeper localhost:2181 --topic message
自动生成:This is a messageThis is another message
1.2.8 关闭 kafka
jps 查询出启动了两个 Kafka,要是不启动它们,就直接 kill 掉
kafka 前面的代码:Kill -9 xxxx
1.3 Spark 的安装
1.3.1 scala 的安装
1、解压 scala
tar –zxvf scala-2.11.8.tgz
2、 配置环境变量
vi /etc/profile
export SCALA_HOME=/usr/tools/scala-2.11.8
export PATH=$PATH:$SCALA_HOME/bin
3、使环境变量生效
source /etc/profile
4、检查安装成功:
scala –version
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL
5、拷贝到子节点上:
scp -r scala-2.11.8 root@jokeros2:/usr/tools
scp -r scala-2.11.8 root@jokeros2:/usr/tools
6、分别配置子节点的环境变量并使其生效.
1.3.2 spark 的安装
1、解压 spark
tar –zxvf spark-1.6.1-bin-hadoop2.6.tgz
2、配置环境变量
export SPARK_HOME=/usr/tools/spark-1.6.1-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin
3、使环境变量生效
source /etc/profile
4、spark 的配置
进入 spark 的 conf 目录:
cd /usr/tools/spark-1.6.1-bin-hadoop2.6/conf
scp spark-env.sh.template spark-env.sh
scp log4j.properties.template log4j.properties
scp slaves.template slaves
编辑 spark-env.sh
export SCALA_HOME=/usr/tools/scala-2.11.8
export JAVA_HOME=/usr/tools/jdk1.7.0_67
export SPARK_WORKER_MEMORY=1G
export HADOOP_CONF_DIR=/usr/tools/hadoop-2.6.4/etc/hadoop
编辑 slaves(虚拟机的名称)
jokeros1
5、将 spark 拷贝到子节点上然后配置环境变量并使其生效。
jokeros2
jokeros3
source /etc/profile
scp -r spark-1.6.1-bin-hadoop2.6 root@jokeros2:/usr/tools
scp -r spark-1.6.1-bin-hadoop2.6 root@jokeros3:/usr/tools
6、进入主节点的 sbin 目录
运行 start-all.sh
2.文件通过 flume 直接存储到 hdfs
2.1 通过监控端口采集信息
这个只需要写个配置文件如下:
Agent 配置如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sources.r1.deserializer.outputCharset = UTF-8
# Describe the sink
a1.sinks.k1.type = hdfs
#a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://node-231:8020/flume/%Y%m%d%H
a1.sinks.k1.hdfs.fileSuffix = .txt
a1.sinks.k1.hdfs.userLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = events-
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0