logo资料库

Flume1.6.0入门:安装、部署、及flume的案例.docx

第1页 / 共35页
第2页 / 共35页
第3页 / 共35页
第4页 / 共35页
第5页 / 共35页
第6页 / 共35页
第7页 / 共35页
第8页 / 共35页
资料共35页,剩余部分请下载后查看
问题导读 1.什么是 flume 2.flume 的官方网站在哪里? 3.flume 有哪些术语? 4.如何配置 flume 数据源码? 一、什么是 Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,Flume OG 代码工程臃肿、核心组件设计不合理、核心配置不标准等 缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的 现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,cloudera 完成了 Flume-728, 对 Flume 进行了里程碑式的改动:重构核心组件、核心配置以及代码架构,重构后的版本 统称为 Flume NG(next generation);改动的另一原因是将 Flume 纳入 apache 旗下, cloudera Flume 改名为 Apache Flume。 flume 的特点: flume 是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日 志系统中定制各类数据发送方,用于收集数据;同时,Flume 提供对数据进行简单处理,并 写到各种数据接受方(比如文本、HDFS、Hbase 等)的能力 。 flume 的数据流由事件(Event)贯穿始终。事件是 Flume 的基本数据单位,它携带日志 数据(字节数组形式)并且携带有头信息,这些 Event 由 Agent 外部的 Source 生成,当 Source 捕获事件后会进行特定的格式化,然后 Source 会把事件推入(单个或多个)Channel 中。你 可以把 Channel 看作是一个缓冲区,它将保存事件直到 Sink 处理完该事件。Sink 负责持久 化日志或者把事件推向另一个 Source。 flume 的可靠性 当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume 提供了三种级别 的可靠性保障,从强到弱依次分别为:end-to-end(收到数据 agent 首先将 event 写到磁盘 上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。),Store on failure (这也是 scribe 采用的策略,当数据接收方 crash 时,将数据写到本地,待恢复后,继续发 送),Besteffort(数据发送到接收方后,不会进行确认)。 flume 的可恢复性: 还是靠 Channel。推荐使用 FileChannel,事件持久化在本地文件系统里(性能较差)。
flume 的一些核心概念: 1. Agent 使用 JVM 运行 Flume。每台机器运行一个 agent,但是可以在一个 agent 中包含多个 sources 和 sinks。 2. Client 3. Source 4. Sink 5. Channel 6. Events 生产数据,运行在一个独立的线程。 从 Client 收集数据,传递给 Channel。 从 Channel 收集数据,运行在一个独立线程。 连接 sources 和 sinks ,这个有点像一个队列。 可以是日志记录、 avro 对象等。 Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM。单 agent 由 Source、 Sink 和 Channel 三大组件构成,如下图: 值得注意的是,Flume 提供了大量内置的 Source、Channel 和 Sink 类型。不同类型的 Source,Channel 和 Sink 可以自由组合。组合方式基于用户设置的配置文件,非常灵活。比 如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入 HDFS, HBase,甚至是另外一个 Source 等等。Flume 支持用户建立多级流,也就是说,多 个 agent 可以协同工作,并且支持 Fan-in、Fan-out、Contextual Routing、Backup Routes, 这也正是 NB 之处。如下图所示:
二、flume 的官方网站在哪里? http://flume.apache.org/ 三、在哪里下载? http://www.apache.org/dyn/closer.lua/flume/1.6.0/apache-flume-1.6.0-b in.tar.gz 四、如何安装? 1)将下载的 flume 包,解压到/home/hadoop 目录中,你就已经完成了 50%:)简 单吧 2)修改 flume-env.sh 配置文件,主要是 JAVA_HOME 变量设置 1. #/etc/profile 增加环境变量 export FLUME_HOME=/home/hadoop/apache-flume-1.6.0-bin export FLUME_CONF_DIR=$FLUME_HOME/conf export PATH=.:$PATH::$FLUME_HOME/bin
3)验证是否安装成功 1. root@m1:/home/hadoop# flume-ng version 2. Flume 1.6.0 3. Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git 4. Revision: 8633220df808c4cd0c13d1cf0320454a94f1ea97 5. Compiled by hshreedharan on Wed May 6. From source with checksum a01fe726e4380ba0c9f7a7d222db961f 7. root@m1:/home/hadoop# 7 14:49:18 PDT 2014 复制代码 出现上面的信息,表示安装成功了 五、flume 的案例 1)案例 1:Avro Avro 可以发送一个给定的文件给 Flume,Avro 源使用 AVRO RPC 机制。 a)创建 agent 配置文件 1. root@m1:/home/hadoop#vi /home/hadoop/flume-1.6.0-bin/conf/avro.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = avro 9. a1.sources.r1.channels = c1 10. a1.sources.r1.bind = 0.0.0.0 11. a1.sources.r1.port = 4141 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1
复制代码 b)启动 flume agent a1 1. root@m1:flume-1.6.0-bin # flume-ng agent -c conf -f conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 复制代码 #命令参数说明 -c conf 指定配置目录为 conf -f conf/avro.conf 指定配置文件为 conf/avro.conf -n a1 指定 agent 名字为 a1,需要与 avro.conf 中的一致 -Dflume.root.logger=INFO,console 指定 DEBUF 模式在 console 输出 INFO 信息 (注意路径,当前路径与配置所指的路径) c)创建指定文件 1. root@m1:/home/hadoop# echo "hello world" > /home/hadoop/flume-1.6.0-bin/log.00 复制代码 d)使用 avro-client 发送文件 1. root@m1:/home/hadoop# flume-ng avro-client -c . -H m1 -p 4141 -F /home/hadoop/flume-1.5.0-bin/log.00 复制代码 注意参数中 m1 e)在 m1 的控制台,可以看到以下信息,注意最后一行: 1. root@m1:/home/hadoop/flume-1.5.0-bin/conf# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console 2. Info: Sourcing environment configuration script /home/hadoop/flume-1.5.0-bin/conf/flume-env.sh
3. Info: Including Hadoop libraries found via (/home/hadoop/hadoop-2.2.0/bin/hadoop) for HDFS access 4. Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-api-1.7.5.jar from classpath 5. Info: Excluding /home/hadoop/hadoop-2.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.j ar from classpath 6. ... 7. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(N ettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] UNBOUND 8. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(N ettyServer.java:171)] [id: 0x92464c4f, /192.168.1.50:59850 :> /192.168.1.50:4141] CLOSED 9. 2014-08-10 10:43:25,112 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(Ne ttyServer.java:209)] Connection to /192.168.1.50:59850 disconnected. 10. 2014-08-10 10:43:26,718 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 world } hello 复制代码 2)案例 2:Spool Spool 监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点: 1) 拷贝到 spool 目录下的文件不可以再打开编辑。 2) spool 目录下不可包含相应的子目录 a)创建 agent 配置文件 1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/spool.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = spooldir
9. a1.sources.r1.channels = c1 10. a1.sources.r1.spoolDir = /home/hadoop/flume-1.5.0-bin/logs 11. a1.sources.r1.fileHeader = true 12. 13. # Describe the sink 14. a1.sinks.k1.type = logger 15. 16. # Use a channel which buffers events in memory 17. a1.channels.c1.type = memory 18. a1.channels.c1.capacity = 1000 19. a1.channels.c1.transactionCapacity = 100 20. 21. # Bind the source and sink to the channel 22. a1.sources.r1.channels = c1 23. a1.sinks.k1.channel = c1 复制代码 复制代码 复制代码 b)启动 flume agent a1 1. root@m1:apache-flume-1.6.0-bin# flume-ng agent -c conf -f conf/spool.conf -n a1 -Dflume.root.logger=INFO,console c)追加文件到/home/hadoop/flume-1.5.0-bin/logs 目录 1. root@m1:/home/hadoop# echo "spool test1" > /home/hadoop/flume-1.5.0-bin/logs/spool_text.log d)在 m1 的控制台,可以看到以下相关信息: 1. 14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 2. 14/08/10 11:37:13 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 3. 14/08/10 11:37:14 INFO avro.ReliableSpoolingFileEventReader: Preparing to move file /home/hadoop/flume-1.5.0-bin/logs/spool_text.log to /home/hadoop/flume-1.5.0-bin/logs/spool_text.log.COMPLETED 4. 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 5. 14/08/10 11:37:14 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown.
6. 14/08/10 11:37:14 INFO sink.LoggerSink: Event: { headers:{file=/home/hadoop/flume-1.5.0-bin/logs/spool_text.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 } 7. 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 8. 14/08/10 11:37:15 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 9. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 10. 14/08/10 11:37:16 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 11. 14/08/10 11:37:17 INFO source.SpoolDirectorySource: Spooling Directory Source runner has shutdown. 复制代码 3)案例 3:Exec EXEC 执行一个给定的命令获得输出的源,如果要使用 tail 命令,必选使得 file 足够 大才能看到输出内容 a)创建 agent 配置文件 1. root@m1:/home/hadoop# vi /home/hadoop/flume-1.5.0-bin/conf/exec_tail.conf 2. 3. a1.sources = r1 4. a1.sinks = k1 5. a1.channels = c1 6. 7. # Describe/configure the source 8. a1.sources.r1.type = exec 9. a1.sources.r1.channels = c1 10. a1.sources.r1.command = tail -F /home/hadoop/flume-1.5.0-bin/log_exec_tail 11. 12. # Describe the sink 13. a1.sinks.k1.type = logger 14. 15. # Use a channel which buffers events in memory 16. a1.channels.c1.type = memory 17. a1.channels.c1.capacity = 1000 18. a1.channels.c1.transactionCapacity = 100
分享到:
收藏