一、kafka 获取 oracle 日志实时数据
NO.1 下载 zookeeper 和 kafka
步骤一、安装 zookeeper
我这里安装的版本:apache-zookeeper-3.5.8-bin.tar.gz
ZooKeeper 官网:http://zookeeper.apache.org/releases.html
笔记文档:Zookeeper 在 Linux 下载安装及部署
步骤二、安装 kafka
我这里安装的版本:kafka_2.12-2.4.0.tgz
Kafka 官网下载地址:http://kafka.apache.org/downloads
笔记文档:Kafka 在 Linux 下载安装及部署
NO.1 ORCLA 注意事项
数据库必须处于 archivelog 模式,并且必须启用补充日志记录。
在数据库服务器上:
注:alter database add supplemental log data (all) columns 是指启用补
充日志记录。
为了成功执行连接器,必须以 dba Oracle 用户启动连接器。
NO.2 简单的创建一个 Oracle 表
SQL> create table test_kafka(id VARCHAR(50) NOT NULL,username varchar2(100));
NO.3 添加相关组件
1)从 https://github.com/erdemcer/kafka-connect-oracle 下载整个项目,把
整个项目 mvn clean package 成 kafa-connect-oracle-1.0.jar。
2)下载一个 oracle 的 jdbc 驱动 ojdbc7.jar
3)将 kafa-connect-oracle-1.0.jar 和 ojdbc7.jar 放在 kafka 的安装包下的
lib 目录下,
4)将 github 项目里面的 config/OracleSourceConnector.properties 文件拷贝
到 kafak/config
No.4 修改配置文件
[root@localhost config]# vim connect-standalone.properties
[root@localhost config]# vim OracleSourceConnector.properties
NO.5 启动 kafka
[root@localhost bin]# ./kafka-server-start.sh …/config/server.properties
1.创建 Topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
--partitions 1 --topic test-Oracle
2.启动 connector
[root@localhost
bin]# ./connect-standalone.sh …/config/connect-standalone.properties …/config/
OracleSourceConnector.properties
-----启动成功会有以下信息:
3.启动 consumer
[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server
localhost:9092 --from-beginning --topic huang-Oracle
启动成功会等待 topic 数据,然后消费到控制台
NO.6 在数据库中插入一条数据
SQL> INSERT INTO test_kafka values(‘1’,‘huang’);
一定注意 commit 提交,否则 kafka 不会收到数据
可以看到在消费的控制台上出现:
插入的值以 key:value 的形式打印到控制台上。
同样删除和修改也会将信息打印到控制台上,会在 key 为 data 以及 before 上体
现不同。
这样也就实现了 oracle 中的数据在 Insert 时与 kafka 同步。
二、flume 写入到 hdfs
1.概述
对于数据的转发,Kafka 是一个不错的选择。Kafka 能够装载数据到消息队列,
然后等待其他业务场景去消费这些数据,Kafka 的应用接口 API 非常的丰富,支
持各种存储介质,例如 HDFS、HBase 等。如果不想使用 Kafka API 编写代码去消
费 Kafka Topic,也是有组件可以去集成消费的。下面笔者将为大家介绍如何使
用 Flume 快速消费 Kafka Topic 数据,然后将消费后的数据转发到 HDFS 上。
2.内容
在实现这套方案之间,可以先来看看整个数据的流向,如下图所示:
业务数据实时存储到 Kafka 集群,然后通过 Flume Source 组件实时去消费 Kafka 业务
Topic 获取数据,将消费后的数据通过 Flume Sink 组件发送到 HDFS 进行存储。
2.1 准备基础环境
按照上图所示数据流向方案,需要准备好 Kafka、Flume、Hadoop(HDFS 可用)等组件。
2.1.1 启动 Kafka 集群并创建 Topic
Kafka 目前来说,并没有一个批量的管理脚本,不过我们可以对 kafka-server-start.sh
脚本和 kafka-server-stop.sh 脚本进行二次封装。代码如下所示:
#! /bin/bash
# Kafka 代理节点地址, 如果节点较多可以用一个文件来存储
hosts=(dn1 dn2 dn3)
# 打印启动分布式脚本信息
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"`
echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation.
# 执行分布式开启命令
function start()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i]
begins to execute the startup operation.;kafka-server-start.sh
$KAFKA_HOME/config/server.properties>/dev/null" &
sleep 1
done
}
# 执行分布式关闭命令
function stop()
{
for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i]
begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" &
sleep 1
done
}
# 查看 Kafka 代理节点状态
function status()
{ for i in ${hosts[@]}
do
smill=`date "+%N"`
stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"`
ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i]
status message is :;jps | grep Kafka;" &
sleep 1
done
}
# 判断输入的 Kafka 命令参数是否有效
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
*)
echo "Usage: $0 {start|stop|status}"
RETVAL=1
esac
启动 Kafka 集群后,在 Kafka 集群可用的情况下,创建一个业务 Topic,执行命令如下:
# 创建一个 flume_collector_data 主题
kafka-topics.sh --create --zookeeper dn1:2181,dn2:2181,dn3:2181
--replication-factor 3 --partitions 6 --topic flume_collector_data
2.2 配置 Flume Agent
然后,开始配置 Flume Agent 信息,让 Flume 从 Kafka 集群的 flume_collector_data
主题中读取数据,并将读取到的数据发送到 HDFS 中进行存储。配置内容如下:
# ------------------- define data source ----------------------
# source alias
agent.sources = source_from_kafka
# channels alias
agent.channels = mem_channel
# sink alias
agent.sinks = hdfs_sink
# define kafka source