logo资料库

Kafka C++客户端库librdkafka笔记.pdf

第1页 / 共15页
第2页 / 共15页
第3页 / 共15页
第4页 / 共15页
第5页 / 共15页
第6页 / 共15页
第7页 / 共15页
第8页 / 共15页
资料共15页,剩余部分请下载后查看
KafkaC++客户端库librdkafka笔记一见2018/4/26目录目录.....................................................................................................................................................11.前言................................................................................................................................................22.缩略语............................................................................................................................................23.配置和主题....................................................................................................................................33.1.配置和主题结构................................................................................................................33.1.1.Conf...........................................................................................................................33.1.2.ConfImpl...................................................................................................................33.1.3.Topic..........................................................................................................................33.1.4.TopicImpl..................................................................................................................34.线程................................................................................................................................................45.消费者............................................................................................................................................55.1.消费者结构........................................................................................................................55.1.1.Handle.......................................................................................................................55.1.2.HandleImpl................................................................................................................55.1.3.ConsumeCb...............................................................................................................65.1.4.EventCb.....................................................................................................................65.1.5.Consumer..................................................................................................................75.1.6.KafkaConsumer.........................................................................................................75.1.7.KafkaConsumerImpl.................................................................................................75.1.8.rd_kafka_message_t..................................................................................................75.1.9.rd_kafka_msg_s........................................................................................................75.1.10.rd_kafka_msgq_t.....................................................................................................85.1.11.rd_kafka_toppar_t...................................................................................................86.生产者..........................................................................................................................................106.1.生产者结构......................................................................................................................106.1.1.DeliveryReportCb...................................................................................................116.1.2.PartitionerCb...........................................................................................................116.1.3.Producer..................................................................................................................116.1.4.ProduceImpl............................................................................................................116.2.生产者启动过程1...........................................................................................................116.3.生产者启动过程2...........................................................................................................126.4.生产者生产过程..............................................................................................................147.poll过程........................................................................................................................................15
1.前言librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。2.缩略语缩略语缩略语全称示例或说明rdRapidDevelopmentrd.hrkRdKafkatopparTopicPartitionstructrd_kafka_toppar_t{};repReply,structrd_kafka_t{rd_kafka_q_t*rk_rep};msgqMessageQueuestructrd_kafka_msgq_t{};rkbRdKafkaBrokerKafka代理rkoRdKafkaOperationKafka操作rkmRdKafkaMessageKafka消息payload存在Kafka上的消息(或叫Log)
3.配置和主题3.1.配置和主题结构ConfType<>+CONF_GLOBAL+CONF_TOPICConfResult<>+CONF_UNKNOWN+CONF_INVALID+CONF_OKConf<>+create(ConfType)+set(name, value, errstr)ConfImplTopic<>+create(Handle*, topic_str, Conf, errstr)+name()+partition_available(partition)+offset_store(partition, offset)TopicImpl<>+create(Handle&, topic_str, Conf)3.1.1.Conf配置接口,配置分两种:全局的和主题的。3.1.2.ConfImpl配置的实现。3.1.3.Topic主题接口。3.1.4.TopicImpl主题的实现。
4.线程RdKafka编程涉及到三类线程:1)应用线程,业务代码的实现2)KafkaBroker线程rd_kafka_broker_thread_main,负责与Broker通讯,多个3)KafkaHandler线程rd_kafka_thread_main,每创建一个consumer或producer即会创建一个Handler线程。rd_kafka_broker_thread_main<>rd_kafka_thread_main<>Application Thread<<应用线程>>rd_kafka_msgq_t<<消息队列>>push/writepushpollBrokersendmsgrecvmsgread/popCallback:1) EventCb2) DeliveryReportCb
5.消费者5.1.消费者结构Handle+poll(timeout_ms)+pause()+resume()+yield()+clusterid()KafkaConsumer<>+create(conf, errstr)+subscribe(topics)+unsubscribe()+assign(partitions)+unassign()+consume(timeout_ms)+commitSync()+commitAsync()+commitSync()+commitAsync(Message)+commitSync(offsets)+commitAsync(offsets)+committed(partitions)+position(partitions)+close()+seek(partition, timeout_ms)+offsets_store(offsets)Consumer<>+create(Conf, errstr)+start(Topic, partition, offset)+stop(Topic, partition)+seek(Topic, partition, offset, timeout_ms)+consume(Topic, partition, timeout_ms)+consume_callback(topic, partition, timeout_ms, ConsumeCb, opaque)ConsumerImplHandleImpl+poll(timeout_ms)int poll (int timeout_ms){ return rd_kafka_poll(rk_, timeout_ms);}int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms){ // ops: operations // // Pop all available ops from a queue and call the provided // callback for each op. // Returns the number of ops served. // Locality: any thread. return rd_kafka_q_serve(rk->rk_rep, timeout_ms, 0, RD_KAFKA_Q_CB_CALLBACK, rd_kafka_poll_cb, NULL);}KafkaConsumerImpl<>+create(Conf, errstr)Consumer:A simple non-balanced, non-group-aware, consumer.KafkaConsumer:High-level KafkaConsumerrd_kafka_s<>+rd_kafka_q_t *rk_rep+rd_kafka_q_t *rk_opsrd_kafka_q_t<>+rkq_qlen+int64_t rkq_qsize+int rkq_refcnt+int rkq_flags*rk_rep: kafka -> application reply queuerk_ops: any -> rdkafka main thread opsrkq_qlen: Number of entries in queuerkq_qsize: Size of all entries in queuerkq_refcnt: Reference countingrkq_flags:1) 0x1 Allocated: rd_free on destroy2) 0x2 Queue is ready to be used.3) 0x4 Queue is being forwarded by a call to rd_kafka_queue_forwardrd_kafka_msgq_t<>+rd_kafka_msg_s* tqh_first+rd_kafka_msg_s* tqh_last+rkmq_msgs+rkmq_msg_cnt+rkmq_msg_bytesrd_kafka_msg_s<>+rd_kafka_message_t+int rkm_flags+int64_t rkm_timestamp2MessageMessageQueuerd_kafka_message_t<>+int32_t partition+void* payload+size_t len+void* key+size_t key_len+int64_t offset+void* _private5.1.1.Handle定义了poll等接口,它的实现者为HandleImpl。5.1.2.HandleImpl实现了消费者和生产者均使用的poll等,其中poll的作用为:1)为生产者回调消息发送结果;2)为生产者和消费者回调事件。classHandle{/***@briefPollstheprovidedkafkahandleforevents.
**Eventswilltriggerapplicationprovidedcallbackstobecalled.**The\ptimeout_msargumentspecifiesthemaximumamountoftime*(inmilliseconds)thatthecallwillblockwaitingforevents.*Fornon-blockingcalls,provide0as\ptimeout_ms.*Towaitindefinatelyforevents,provide-1.**Events:*-deliveryreportcallbacks(ifanRdKafka::DeliveryCbisconfigured)[producer]*-eventcallbacks(ifanRdKafka::EventCbisconfigured)[producer&consumer]**@remarkAnapplicationshouldmakesuretocallpoll()atregular*intervalstoserveanyqueuedcallbackswaitingtobecalled.**@warningThismethodMUSTNOTbeusedwiththeRdKafka::KafkaConsumer,*useitsRdKafka::KafkaConsumer::consume()instead.**@returnsthenumberofeventsserved.*/virtualintpoll(inttimeout_ms)=0;};5.1.3.ConsumeCb只针对消费者的Callback。5.1.4.RebalanceCb只针对消费者的Callback。5.1.5.EventCb消费者和生产者均可设置EventCb,如:_global_conf->set("event_cb",&_event_cb,errmsg);。/***@briefEventcallbackclass**Eventsareagenericinterfaceforpropagatingerrors,statistics,logs,etc*fromlibrdkafkatotheapplication.**@saRdKafka::Event
*/classRD_EXPORTEventCb{public:/***@briefEventcallback**@saRdKafka::Event*/virtualvoidevent_cb(Event&event)=0;virtual~EventCb(){}};/***@briefEventobjectclassaspassedtotheEventCbcallback.*/classRD_EXPORTEvent{public:/**@briefEventtype*/enumType{EVENT_ERROR,/**
5.1.9.rd_kafka_message_t消息结构。5.1.10.rd_kafka_msg_s消息结构,但消息数据实际存储在rd_kafka_message_t,结构大致如下:structrd_kafka_msg_s{rd_kafka_message_trkm_rkmessage;struct{rd_kafka_msg_s*tqe_next;rd_kafka_msg_s**tqe_prev;int64_trkm_timestamp;rd_kafka_timestamp_type_trkm_tstype;}rkm_link;};5.1.11.rd_kafka_msgq_t存储消息的消息队列,生产者生产的消息并不直接socket发送到brokers,而是放入了这个队列,结构大致如下:structrd_kafka_msgq_t{struct{rd_kafka_msg_s*tqh_first;//队首rd_kafka_msg_s*tqh_last;//队尾};//消息个数rd_atomic32_trkmq_msg_cnt;//所有消息加起来的字节数rd_atomic64_trkmq_msg_bytes;};5.1.12.rd_kafka_toppar_tTopic-Partition队列,很复杂的一个结构,部分内容如下://Topic+Partitioncombination
分享到:
收藏