logo资料库

kafka+flume 实时采集oracle数据到hive中.docx

第1页 / 共13页
第2页 / 共13页
第3页 / 共13页
第4页 / 共13页
第5页 / 共13页
第6页 / 共13页
第7页 / 共13页
第8页 / 共13页
资料共13页,剩余部分请下载后查看
一、kafka 获取 oracle 日志实时数据 NO.1 下载 zookeeper 和 kafka 步骤一、安装 zookeeper 我这里安装的版本:apache-zookeeper-3.5.8-bin.tar.gz ZooKeeper 官网:http://zookeeper.apache.org/releases.html 笔记文档:Zookeeper 在 Linux 下载安装及部署 步骤二、安装 kafka 我这里安装的版本:kafka_2.12-2.4.0.tgz Kafka 官网下载地址:http://kafka.apache.org/downloads 笔记文档:Kafka 在 Linux 下载安装及部署 NO.1 ORCLA 注意事项 数据库必须处于 archivelog 模式,并且必须启用补充日志记录。 在数据库服务器上:
注:alter database add supplemental log data (all) columns 是指启用补 充日志记录。 为了成功执行连接器,必须以 dba Oracle 用户启动连接器。 NO.2 简单的创建一个 Oracle 表 SQL> create table test_kafka(id VARCHAR(50) NOT NULL,username varchar2(100)); NO.3 添加相关组件 1)从 https://github.com/erdemcer/kafka-connect-oracle 下载整个项目,把 整个项目 mvn clean package 成 kafa-connect-oracle-1.0.jar。 2)下载一个 oracle 的 jdbc 驱动 ojdbc7.jar 3)将 kafa-connect-oracle-1.0.jar 和 ojdbc7.jar 放在 kafka 的安装包下的 lib 目录下, 4)将 github 项目里面的 config/OracleSourceConnector.properties 文件拷贝 到 kafak/config No.4 修改配置文件 [root@localhost config]# vim connect-standalone.properties
[root@localhost config]# vim OracleSourceConnector.properties
NO.5 启动 kafka [root@localhost bin]# ./kafka-server-start.sh …/config/server.properties 1.创建 Topic kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-Oracle 2.启动 connector [root@localhost bin]# ./connect-standalone.sh …/config/connect-standalone.properties …/config/ OracleSourceConnector.properties -----启动成功会有以下信息:
3.启动 consumer [root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic huang-Oracle 启动成功会等待 topic 数据,然后消费到控制台 NO.6 在数据库中插入一条数据 SQL> INSERT INTO test_kafka values(‘1’,‘huang’);
一定注意 commit 提交,否则 kafka 不会收到数据 可以看到在消费的控制台上出现: 插入的值以 key:value 的形式打印到控制台上。 同样删除和修改也会将信息打印到控制台上,会在 key 为 data 以及 before 上体 现不同。 这样也就实现了 oracle 中的数据在 Insert 时与 kafka 同步。 二、flume 写入到 hdfs 1.概述 对于数据的转发,Kafka 是一个不错的选择。Kafka 能够装载数据到消息队列, 然后等待其他业务场景去消费这些数据,Kafka 的应用接口 API 非常的丰富,支 持各种存储介质,例如 HDFS、HBase 等。如果不想使用 Kafka API 编写代码去消 费 Kafka Topic,也是有组件可以去集成消费的。下面笔者将为大家介绍如何使 用 Flume 快速消费 Kafka Topic 数据,然后将消费后的数据转发到 HDFS 上。 2.内容 在实现这套方案之间,可以先来看看整个数据的流向,如下图所示:
业务数据实时存储到 Kafka 集群,然后通过 Flume Source 组件实时去消费 Kafka 业务 Topic 获取数据,将消费后的数据通过 Flume Sink 组件发送到 HDFS 进行存储。 2.1 准备基础环境 按照上图所示数据流向方案,需要准备好 Kafka、Flume、Hadoop(HDFS 可用)等组件。 2.1.1 启动 Kafka 集群并创建 Topic Kafka 目前来说,并没有一个批量的管理脚本,不过我们可以对 kafka-server-start.sh 脚本和 kafka-server-stop.sh 脚本进行二次封装。代码如下所示: #! /bin/bash # Kafka 代理节点地址, 如果节点较多可以用一个文件来存储 hosts=(dn1 dn2 dn3) # 打印启动分布式脚本信息 mill=`date "+%N"` tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"` echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation. # 执行分布式开启命令 function start() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done } # 执行分布式关闭命令 function stop() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" & sleep 1 done }
# 查看 Kafka 代理节点状态 function status() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" & sleep 1 done } # 判断输入的 Kafka 命令参数是否有效 case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1 esac 启动 Kafka 集群后,在 Kafka 集群可用的情况下,创建一个业务 Topic,执行命令如下: # 创建一个 flume_collector_data 主题 kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181 --replication-factor 3 --partitions 6 --topic flume_collector_data 2.2 配置 Flume Agent 然后,开始配置 Flume Agent 信息,让 Flume 从 Kafka 集群的 flume_collector_data 主题中读取数据,并将读取到的数据发送到 HDFS 中进行存储。配置内容如下: # ------------------- define data source ---------------------- # source alias agent.sources = source_from_kafka # channels alias agent.channels = mem_channel # sink alias agent.sinks = hdfs_sink # define kafka source
分享到:
收藏