logo资料库

Flume采集数据到Kafka,然后从kafka取数据存储到HDFS的方法思路和完整步骤.docx

第1页 / 共20页
第2页 / 共20页
第3页 / 共20页
第4页 / 共20页
第5页 / 共20页
第6页 / 共20页
第7页 / 共20页
第8页 / 共20页
资料共20页,剩余部分请下载后查看
1.安装及配置环境
1.1 flume的安装
1.1.1下载安装包并解压
1.1.2配置flume环境变量
1.1.3在多台服务间使用scp命令
1.1.4检验是否安装成功
1.2 Kafka的安装
1.2.1下载安装包
1.2.2配置环境变量:
1.2.3修改配置文件
1.2.4开启服务
1.2.5安装好后创建Topic
1.2.6生产者发送消息:
1.2.7消费者消费数据
1.2.8关闭kafka
1.3 Spark的安装
1.3.1 scala的安装
1.3.2 spark的安装
2.文件通过flume直接存储到hdfs
2.1通过监控端口采集信息
2.2 通过监控目录采集信息
2.3 出现问题及解决办法
3.文件通过flume采集kafka消费存储到hdfs
3.1写flume发送消息到kafka的配置
3.2 写kafka到hdfs上的配置文件
3.3 执行发送消息命令
3.3.1 kafka上要执行的命令
3.3.2 flume上执行的命令
3.3.3 发送文件到监控目录上
4.文件通过flume采集kafka消费sparkstreaming实时处理存到hdfs
4.1 flume文件配置
4.2 spark程序获取kafka上消费的消息
4.3 kafka上创建topic
4.4测试过程
4.5 出现问题及解决办法
4.5.1将数据发送到spark上进行处理的问题
4.5.2 安装scala的版本问题
4.5.3 填写参数问题
5.需要考虑的问题
5.1 flume方面问题
5.1.1 flume发送文件是文件流的形式发送,还是按行发送,可以配置哪种方式?
5.1.2 如果flume读取日志文件的时候不需要按行读取时,如何解决?
5.2 kafka方面问题
5.2.1 数据在内存中是不是备份的,备份多少?
5.2.2能够利用的内存资源怎么配置?
5.2.3 如果内存满了,flume还继续往kafka发送数据会怎么样?
5.2.4 每个topic都应该是一个消息队列,每次去取数据,可以有哪些方式(比如队列取空,只取一条
5.2.5 Kafka的消息的有效期
5.2.6 Kafka可以实现批量处理
Kafka 接收 Flume 数据并存储至 HDFS 1.安装及配置环境 1.1 flume 的安装 1.1.1 下载安装包并解压 安装包可以在本地下载,通过 redHat 中的 ftp 传输到 linux 下文件中即可, 解压 tar -zxvf +压缩文件名称 1.1.2 配置 flume 环境变量 export FLUME_HOME=/usr/hdp/2.3.4.0-3485/flume/apache-flume-1.7.0-bin export PATH=$PATH:$FLUME_HOME/bin 配置好环境变量要 source: Source /etc/profile 使环境变量生效。 1.1.3 在多台服务间使用 scp 命令 其他两台使用 scp 命令: Scp /usr/hdp/2.3.4.0-3485/flume root@node-231:/usr/hdp/2.3.4.0-3485/ Scp /usr/hdp/2.3.4.0-3485/flume root@node-232:/usr/hdp/2.3.4.0-3485/ 同时: scp /etc/profile root@node-231:/etc/ scp /etc/profile root@node-232:/etc/ 复制过后,两个都需要 source 使环境变量生效。 1.1.4 检验是否安装成功 判断是否安装成功,在 flume 安装包下的 bin 目录中输入: flume/apache-flume-1.7.0-bin/bin Flume-ng version 出现我们安装的 flume 的版本,即安装成功。 1.2 Kafka 的安装 1.2.1 下载安装包 解压:tar -zxvf 进入:cd kafka_2.11-0.10.1.0 kafka_2.11-0.10.1.0.tgz
1.2.2 配置环境变量: KAFKA_HOME=/usr/tools/kafka_2.11-0.10.1.0 PATH=$PATH:$KAFKA_HOME/bin vi /etc/profile export export 配置后: source /etc/profile cd /kafka_2.11-0.10.1.0/config 将 server.properties 再复制一个 cp server.properties server1.properties 1.2.3 修改配置文件 vi server.properties 如图:
再次修改刚刚复制的 server1.properties vi server1.properties
/etc/profile /etc/profile root@node-231:/etc/ root@node-233:/etc/ -r -r -r -r /usr/tools/kafka_2.11-0.10.1.0 root@node-233:/usr/tools/ /usr/tools/kafka_2.11-0.10.1.0 root@node-231:/usr/tools 将配置好的复制到其他的节点上: scp scp scp scp 将环境变量复制到其他的节点上后,其他的两个节点要分别 source: source /etc/profile 其他的两个节点的 server.properfile 配置文件要进行修改。分别进入到每个配 置文件中,进行 id 的修改,一般修改为与文件相对应的 id 的即可。同时将文件 名称修改为不一样的,比如节点 1 上的文件是: server1.properties,server.properties 那么节点 2 上的文件被复制后应该也 是这个两个文件:现在修改为 server2.properties,server3.properties 同时 将 id 修改为一般跟文件名对应的就可以,其他的就不用修改。 1.2.4 开启服务 bin/zookeeper-server-start.sh config/zookeeper.properties(先开启 zookeeper ) bin/kafka-server-start.sh config/server.properties 进入每个节点分别开启 kafka: kafka-server-start.sh config/server1.properties & kafka-server-start.sh config/server2.properties & kafka-server-start.sh config/server3.properties & kafka-server-start.sh config/server4.properties & kafka-server-start.sh config/server5.properties &
查看每个节点的两个 kafka 的是否都已经开启 jps: 1.2.5 安装好后创建 Topic 输入命令: kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic meassage 查询创建的 Topic: kafka-topics.sh --list --zookeeper localhost:2181 1.2.6 生产者发送消息: kafka-console-producer.sh --broker-list localhost:9092 --topic message 输入:This is a messageThis is another message 1.2.7 消费者消费数据 Kafka-console-consumer.sh --zookeeper localhost:2181 --topic message 自动生成:This is a messageThis is another message 1.2.8 关闭 kafka jps 查询出启动了两个 Kafka,要是不启动它们,就直接 kill 掉 kafka 前面的代码:Kill -9 xxxx
1.3 Spark 的安装 1.3.1 scala 的安装 1、解压 scala tar –zxvf scala-2.11.8.tgz 2、 配置环境变量 vi /etc/profile export SCALA_HOME=/usr/tools/scala-2.11.8 export PATH=$PATH:$SCALA_HOME/bin 3、使环境变量生效 source /etc/profile 4、检查安装成功: scala –version Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL 5、拷贝到子节点上: scp -r scala-2.11.8 root@jokeros2:/usr/tools scp -r scala-2.11.8 root@jokeros2:/usr/tools 6、分别配置子节点的环境变量并使其生效. 1.3.2 spark 的安装 1、解压 spark tar –zxvf spark-1.6.1-bin-hadoop2.6.tgz 2、配置环境变量 export SPARK_HOME=/usr/tools/spark-1.6.1-bin-hadoop2.6 export PATH=$PATH:$SPARK_HOME/bin 3、使环境变量生效 source /etc/profile 4、spark 的配置 进入 spark 的 conf 目录: cd /usr/tools/spark-1.6.1-bin-hadoop2.6/conf scp spark-env.sh.template spark-env.sh scp log4j.properties.template log4j.properties scp slaves.template slaves 编辑 spark-env.sh export SCALA_HOME=/usr/tools/scala-2.11.8 export JAVA_HOME=/usr/tools/jdk1.7.0_67 export SPARK_WORKER_MEMORY=1G export HADOOP_CONF_DIR=/usr/tools/hadoop-2.6.4/etc/hadoop
编辑 slaves(虚拟机的名称) jokeros1 5、将 spark 拷贝到子节点上然后配置环境变量并使其生效。 jokeros2 jokeros3 source /etc/profile scp -r spark-1.6.1-bin-hadoop2.6 root@jokeros2:/usr/tools scp -r spark-1.6.1-bin-hadoop2.6 root@jokeros3:/usr/tools 6、进入主节点的 sbin 目录 运行 start-all.sh 2.文件通过 flume 直接存储到 hdfs 2.1 通过监控端口采集信息 这个只需要写个配置文件如下: Agent 配置如下: a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.deserializer.outputCharset = UTF-8 # Describe the sink a1.sinks.k1.type = hdfs #a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://node-231:8020/flume/%Y%m%d%H a1.sinks.k1.hdfs.fileSuffix = .txt a1.sinks.k1.hdfs.userLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = events- #a1.sinks.k1.hdfs.round = true #a1.sinks.k1.hdfs.roundValue = 10 #a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval=10 a1.sinks.k1.hdfs.rollSize=0 a1.sinks.k1.hdfs.rollCount=0 a1.sinks.k1.hdfs.idleTimeout=0
分享到:
收藏