CDH 添加第三方服务的方法
1 前瞻导读
CDH 可以很方便的添加一些大数据相关服务,但这仅限于 cloudera 公司提供。若
想将第三方服务(如公司自己开发的组件)添加到 CDH 集群(托管在 CDH 上),需
要按照一定的规则流程制作相关程序包,最后发布到 CDH 上。
本文就是指导大家如何打包自己的服务,发布到 CDH 上,并且由 CDH 控制服务的运
行、监控服务的基本运行状态。
2 制作相关介绍
2.1 名词介绍
parcel:以“.parcel”结尾的 gz 格式的压缩文件。它必须为一个固定规则的文件
名。
命名规则必须如下:
文件名称格式为三段,第一段是包名,第二段是版本号,第三段是运行平台。
例如:FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel
包名:FLINK
版本号:1.6.0-hadoop_2.6-scala_2.11
运行环境:el7
el6 是代表 centos6 系统,centos7 则用 el7 表示
parcel 包内包含了你的服务组件,同时包含一个重要的描述性文件 parcel.json:
这个文件记录了你的服务信息,如版本、所属用户、适用的 CDH 平台版本等
parcel 必须包置于/opt/cloudera/parcel-repo/目录下才可以被 CDH 发布程序时
识别到。
csd:csd 文件是一个 jar 包,它记录了你的服务在 CDH 上的管理规则
如你的服务在 CDH 页面上显示的图标、依赖的服务、暴露的端口、启动规则等。
csd 的 jar 包必须置于/opt/cloudera/csd/目录才可以在添加集群服务时被识别到。
2.2 相关下载
https://github.com/cloudera/cm_csds
https://github.com/cloudera/cm_ext
3 制作 CDH 组件
3.1 整理预发布组件
将你通过测试的服务整理到一个目录内,目录内的子目录结构就是你的工程项目结
构,不需要作任何变化。依赖的相关库文件可以由系统环境提供,也可以直接放置
在该工程目录下。
任何语言编写的服务都可以托管到 CDH。
3.2 制作 flink 组件包
3.2.1 下载 flink 包
https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoo
p26-scala_2.11.tgz
3.2.2 制作 parcel
parcel 包的根目录结构如下:
parcel 包目录结构由你的服务目录(lib/flink)和一个 meta 目录组成。
meta 目录组成文件如下:
flink_env.sh 文件可以声明你的服务运行时的 bash 环境下的一些变量环境,根据
你的服务需要可以自行添加设置。
创建 flink_env.sh 文件:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh 内容:
#!/bin/bash
FLINK_DIRNAME=${PARCEL_DIRNAME:-"FLINK-1.6.0-hadoop_2.6-scala_2.11"}
export FLINK_HOME=$PARCELS_ROOT/$FLINK_DIRNAME/lib/flink
parcel.json 文件需要填写好相关的 parcel 包名、兼容的 CDH 平台版本信息。
创建 parcel.json 文件(parcel 包描述):
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json
FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json 内容:
{
"schema_version": 1,
"name": "FLINK",
"version": "1.6.0-hadoop_2.6-scala_2.11",
"depends": "CDH (>= 5.2), CDH (<< 6.0)",
"setActiveSymlink": true,
"replaces": "FLINK",
"scripts": {
"defines": "flink_env.sh"
},
"packages": [{
"name": "flink-master",
"version": "1.6.0+flink1.6.0"
},
{
"name": "flink-worker",
"version": "1.6.0+flink1.6.0"
}],
"components": [{
"name": "flink",
"version": "1.6.0-flink1.6.0",
"pkg_version": "1.6.0+flink1.6.0",
"pkg_release": "hadoop_2.6-scala_2.11"
}],
"provides": ["flink"],
"users": {
"flink": {
"longname": "Flink",
"home": "/var/lib/flink",
"shell": "/bin/bash",
"extra_groups": []
}
},
"groups": ["flink"]
}
注意:务必注意文件内容的大小写,否则可能造成 parcel 包无法发布的情况。
创建 flink-master.sh 文件:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh 内容:
#!/bin/bash
# Flink Master.
USAGE="Usage: flink-master.sh (start|stop)"
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with
key \`${KEY_JOBM_MEM_SIZE}\`"
else
flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
fi
if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}"
-lt "0" ]]; then
echo "[ERROR] Configured JobManager memory size is not a valid value.
Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m
-Xmx"$FLINK_JM_HEAP_MB"m"
fi
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS}
${FLINK_ENV_JAVA_OPTS_JM}"
# Startup parameters
ARGS=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster"
"--host" "${FLINK_MASTER_HOST}" "--webui-port" "${FLINK_WEB_UI_PORT}")
echo "FLINK_MASTER_HOST: $FLINK_MASTER_HOST"
echo "FLINK_WEB_UI_PORT: $FLINK_WEB_UI_PORT"
echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"
echo "MASTER_ARGS: ${ARGS[@]}"
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClust
erEntrypoint
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-master"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}"
"-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties"
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')
if [ "${MY_PID}" = "" ];then
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop
classpath)"`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed
"s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
-classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1
else
echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."
fi
flink-master.sh 文件用于启动 flink 的 master 管理节点。
注意:flink-master.sh 脚本中的 exec 命令是必须的。
创建 flink-worker.sh 文件:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh 内容:
#!/bin/bash
#Flink Worker.
USAGE="Usage: flink-worker.sh (start|stop)"
OPERATION=$1
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# if memory allocation mode is lazy and no other JVM options are set,
# set the 'Concurrent Mark Sweep GC'
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z
"${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with
key \`${KEY_TASKM_MEM_SIZE}\`"
else
flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
fi
if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}"
-lt "0" ]]; then
echo "[ERROR] Configured TaskManager JVM heap size is not a number.
Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory
will be used
TM_MAX_OFFHEAP_SIZE="8388607T"
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M
-Xmx${TM_HEAP_SIZE}M
-XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
fi
# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS}
${FLINK_ENV_JAVA_OPTS_TM}"
# Startup parameters
ARGS=("--configDir" "${FLINK_CONF_DIR}")
echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"
echo "MASTER_ARGS: ${ARGS[@]}"
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-worker"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}"
"-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties"
"-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')
if [ "${MY_PID}" = "" ];then
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop
classpath)"`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed
"s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}"
-classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1
else
echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."
fi
flink-worker.sh 文件用于启动 flink 的 worker 任务节点。
注意:flink-worker.sh 脚本中的 exec 命令是必须的。
创建 flink-yarn.sh 文件:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh 内容:
#!/bin/bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
JVM_ARGS="$JVM_ARGS -Xmx512m"
CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli
log=$FLINK_LOG_DIR/flink-yarn.log
out=$FLINK_LOG_DIR/flink-yarn.out
log_setting="-Dlog.file="$log"
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.propert
ies
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
CLASS_PATH=`manglePathList $(constructFlinkClassPath):$(hadoop
classpath)`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed
"s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
exec $JAVA_RUN $JVM_ARGS -classpath "$CLASS_PATH" $log_setting
${CLASS_TO_RUN} -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@" > "$out" 2>&1
flink-yarn.sh 文件用于在 yarn 中启动 flink。
注意:flink-yarn.sh 脚本中的 exec 命令是必须的。
创建 permissions.json 文件: