logo资料库

CDH添加第三方服务的方法.docx

第1页 / 共51页
第2页 / 共51页
第3页 / 共51页
第4页 / 共51页
第5页 / 共51页
第6页 / 共51页
第7页 / 共51页
第8页 / 共51页
资料共51页,剩余部分请下载后查看
1前瞻导读
2制作相关介绍
2.1名词介绍
2.2相关下载
3制作CDH组件
3.1整理预发布组件
3.2制作flink组件包
3.2.1下载flink包
3.2.2制作parcel
3.2.3制作standlaone csd jar
3.2.4制作yarn csd jar
3.3制作activemq组件包
3.3.1下载activemq包
3.3.2制作parcel
3.3.3制作csd jar
4发布到CDH上
4.1激活parcel包
4.2添加服务
CDH 添加第三方服务的方法 1 前瞻导读 CDH 可以很方便的添加一些大数据相关服务,但这仅限于 cloudera 公司提供。若 想将第三方服务(如公司自己开发的组件)添加到 CDH 集群(托管在 CDH 上),需 要按照一定的规则流程制作相关程序包,最后发布到 CDH 上。 本文就是指导大家如何打包自己的服务,发布到 CDH 上,并且由 CDH 控制服务的运 行、监控服务的基本运行状态。 2 制作相关介绍 2.1 名词介绍 parcel:以“.parcel”结尾的 gz 格式的压缩文件。它必须为一个固定规则的文件 名。 命名规则必须如下: 文件名称格式为三段,第一段是包名,第二段是版本号,第三段是运行平台。 例如:FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel 包名:FLINK 版本号:1.6.0-hadoop_2.6-scala_2.11 运行环境:el7 el6 是代表 centos6 系统,centos7 则用 el7 表示 parcel 包内包含了你的服务组件,同时包含一个重要的描述性文件 parcel.json: 这个文件记录了你的服务信息,如版本、所属用户、适用的 CDH 平台版本等 parcel 必须包置于/opt/cloudera/parcel-repo/目录下才可以被 CDH 发布程序时 识别到。 csd:csd 文件是一个 jar 包,它记录了你的服务在 CDH 上的管理规则 如你的服务在 CDH 页面上显示的图标、依赖的服务、暴露的端口、启动规则等。 csd 的 jar 包必须置于/opt/cloudera/csd/目录才可以在添加集群服务时被识别到。 2.2 相关下载 https://github.com/cloudera/cm_csds https://github.com/cloudera/cm_ext
3 制作 CDH 组件 3.1 整理预发布组件 将你通过测试的服务整理到一个目录内,目录内的子目录结构就是你的工程项目结 构,不需要作任何变化。依赖的相关库文件可以由系统环境提供,也可以直接放置 在该工程目录下。 任何语言编写的服务都可以托管到 CDH。 3.2 制作 flink 组件包 3.2.1 下载 flink 包 https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoo p26-scala_2.11.tgz 3.2.2 制作 parcel parcel 包的根目录结构如下: parcel 包目录结构由你的服务目录(lib/flink)和一个 meta 目录组成。 meta 目录组成文件如下: flink_env.sh 文件可以声明你的服务运行时的 bash 环境下的一些变量环境,根据 你的服务需要可以自行添加设置。 创建 flink_env.sh 文件: vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh 内容:
#!/bin/bash FLINK_DIRNAME=${PARCEL_DIRNAME:-"FLINK-1.6.0-hadoop_2.6-scala_2.11"} export FLINK_HOME=$PARCELS_ROOT/$FLINK_DIRNAME/lib/flink parcel.json 文件需要填写好相关的 parcel 包名、兼容的 CDH 平台版本信息。 创建 parcel.json 文件(parcel 包描述): vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json 内容: { "schema_version": 1, "name": "FLINK", "version": "1.6.0-hadoop_2.6-scala_2.11", "depends": "CDH (>= 5.2), CDH (<< 6.0)", "setActiveSymlink": true, "replaces": "FLINK", "scripts": { "defines": "flink_env.sh" }, "packages": [{ "name": "flink-master", "version": "1.6.0+flink1.6.0" }, { "name": "flink-worker", "version": "1.6.0+flink1.6.0" }], "components": [{ "name": "flink", "version": "1.6.0-flink1.6.0", "pkg_version": "1.6.0+flink1.6.0", "pkg_release": "hadoop_2.6-scala_2.11" }], "provides": ["flink"], "users": { "flink": { "longname": "Flink", "home": "/var/lib/flink", "shell": "/bin/bash", "extra_groups": [] } }, "groups": ["flink"] } 注意:务必注意文件内容的大小写,否则可能造成 parcel 包无法发布的情况。
创建 flink-master.sh 文件: vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh 内容: #!/bin/bash # Flink Master. USAGE="Usage: flink-master.sh (start|stop)" bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`" else flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP}) FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes}) fi if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m" fi # Add JobManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}" # Startup parameters ARGS=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "--host" "${FLINK_MASTER_HOST}" "--webui-port" "${FLINK_WEB_UI_PORT}") echo "FLINK_MASTER_HOST: $FLINK_MASTER_HOST" echo "FLINK_WEB_UI_PORT: $FLINK_WEB_UI_PORT" echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"
echo "MASTER_ARGS: ${ARGS[@]}" CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClust erEntrypoint FLINK_TM_CLASSPATH=`constructFlinkClassPath` FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-master" log="${FLINK_LOG_PREFIX}.log" out="${FLINK_LOG_PREFIX}.out" log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') # Only set JVM 8 arguments if we have correctly extracted the version if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then if [ "$JAVA_VERSION" -lt 18 ]; then JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m" fi fi MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}') if [ "${MY_PID}" = "" ];then # Rotate log files rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX" # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"` CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g") echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME." exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 else echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME." fi flink-master.sh 文件用于启动 flink 的 master 管理节点。 注意:flink-master.sh 脚本中的 exec 命令是必须的。 创建 flink-worker.sh 文件:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh 内容: #!/bin/bash #Flink Worker. USAGE="Usage: flink-worker.sh (start|stop)" OPERATION=$1 bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh # if memory allocation mode is lazy and no other JVM options are set, # set the 'Concurrent Mark Sweep GC' if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" fi if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with key \`${KEY_TASKM_MEM_SIZE}\`" else flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP}) FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes}) fi if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}." exit 1 fi if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB) # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used TM_MAX_OFFHEAP_SIZE="8388607T"
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}" fi # Add TaskManager-specific JVM options export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" # Startup parameters ARGS=("--configDir" "${FLINK_CONF_DIR}") echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}" echo "MASTER_ARGS: ${ARGS[@]}" CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner FLINK_TM_CLASSPATH=`constructFlinkClassPath` FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-worker" log="${FLINK_LOG_PREFIX}.log" out="${FLINK_LOG_PREFIX}.out" log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml") JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') # Only set JVM 8 arguments if we have correctly extracted the version if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then if [ "$JAVA_VERSION" -lt 18 ]; then JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m" fi fi MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}') if [ "${MY_PID}" = "" ];then # Rotate log files rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX" # Evaluate user options for local variable expansion FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS}) CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g") echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME." exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1 else echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME." fi flink-worker.sh 文件用于启动 flink 的 worker 任务节点。 注意:flink-worker.sh 脚本中的 exec 命令是必须的。 创建 flink-yarn.sh 文件: vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh 内容: #!/bin/bash bin=`dirname "$0"` bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh JVM_ARGS="$JVM_ARGS -Xmx512m" CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli log=$FLINK_LOG_DIR/flink-yarn.log out=$FLINK_LOG_DIR/flink-yarn.out log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.propert ies -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" # Rotate log files rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX" CLASS_PATH=`manglePathList $(constructFlinkClassPath):$(hadoop classpath)` CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g") exec $JAVA_RUN $JVM_ARGS -classpath "$CLASS_PATH" $log_setting ${CLASS_TO_RUN} -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@" > "$out" 2>&1 flink-yarn.sh 文件用于在 yarn 中启动 flink。 注意:flink-yarn.sh 脚本中的 exec 命令是必须的。 创建 permissions.json 文件:
分享到:
收藏