找回密码
 会员注册
查看: 30|回复: 0

基于Pulsar的海量DB数据采集和分拣

[复制链接]

2

主题

0

回帖

7

积分

新手上路

积分
7
发表于 2024-9-20 23:09:08 | 显示全部楼层 |阅读模式
导语ApachePulsar是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制、快速扩容、灵活容错等特性。本文是Pulsar技术系列中的一篇,主要介绍Pulsar在海量DBBinlog增量数据采集、分拣场景下的应用。前言Pulsar作为下一代消息中间件的典型代表,在大数据领域、广告、计费等场景已经得到了广泛的应用。本文主要分享Pulsar在大数据领域,DBBinlog增量数据采集、分拣案例中的应用,以及在使用过程中对PulsarJavaSDK的使用调优,供大家参考。背景介绍本文分享的对MySQLBinlog做增量数据采集和分拣的场景,是 ApacheInLong 系统的一个子能力。需要使用到ApacheInLong中的DBAgent(采集Binlog的组件)、Sort(分拣入库的组件)及US(调度系统)等组件。图1InlongDbAgent数据采集处理流程如图1所示,InLongDBAgent(采集Binlog)组件使用Java语言实现,完成Binlog同步、Binlog数据解析、Binlog数据过滤、Binlog数据转换及将符合过滤条件的数据及指标发送到Pulsar集群的功能。InLongSort(分拣入库)采用Java语言实现,完成数据从Pulsar集群的订阅、数据的解析-转换及最终数据的入库操作(Thive)。USRunner(调度任务)采用Java语言实现,这里依赖US调度平台,是通过Pulsar消息方式触发,在拉起业务方挂载的任务Runner之前,完成保障数据完整性的校验,即对前置依赖的数据采集状态进行校验、完成指标数据对账、完成端到端对账及端到端补数据等。功能架构图2DB数据采集、分拣流程总览如图2所示,ApacheInLong系统内,基于MySQLBinlog做的增量数据采集、分拣流程主要有如下几个部分组成:InLongManger:负责DB采集、分拣配置的接入和下发。InLongDBAgent:负责具体DB采集任务的执行,节点无状态,高可用,支持异构机型部署,支持DB采集任务在多个InLongDBAgent之间做HA调度,发送数据和指标分别到对应的Pulsar集群。Pulsar:分为数据集群和指标集群,使用时可以配置为相同集群地址。InLongSort:负责订阅分拣数据,处理数据的转换和入库逻辑。支持ExactlyOnce语意,支持多种入库Sink,如Thive/Hive、Iceberg、Hbase、Clickhouse等。USRunner:US是调度平台,这里的Runner是指在其上运行的任务,当前支持指标对账和端到端对账,只有对账通过时,下游任务才会运行,确保数据在一定质量保障的前提下被用户使用。基于Pulsar的采集端采集端架构设计InLongDBAgent作为数据的采集端,将采集的数据发送到Pulsar集群。InLongDBAgent为无状态节点,具备断点续采、单机多DB任务采集、DB采集任务HA调度等能力,同时支持单机多部署、异构机型部署等能力。图3DbAgent架构设计如图3所示,InLongDBAgent同步的Job元数据信息通过InLongManager进行管理,用户通过InLongManager进行Job的元数据的配置。多个InLongDBAgent执行节点组成一个InLongDBAgent集群。每个InLongDBAgent集群,会通过Zookeeper选主,产生一个Coordinator角色的节点,负责这个集群下DB采集Job的分配。生产数据与指标图4InlongDbAgent内单Job数据/指标流扭转流程及各部分耗时InLongDBAgent同时处理多个Job的采集,如图4所示,为InlongDBAgent内部单个Job的处理流程,而不同Job之间,是逻辑隔离的(历史版本很长时间没有做到完全的隔离,后面章节会介绍这里存在的一些问题),即不同Job使用完全独立的逻辑资源,如DB连接、数据PulsarClient、数据PulsarProducer、指标PulsarClient、指标PulsarProducer及中间数据扭转过程中用于汇聚的Cache、分发的线程和Queue等,避免Job之间相互影响,同时也方便Job在不同InLongDBAgent节点间做HA调度。当然,这种设计方式也存在一定的风险,需要在部署和运营过程中做合理的规划,后面章节会有详细的说明。为了保证数据的完成性,整个采集、分拣流程支持指标对账流程,这里的指标对账保证的是每个时间分区内,InLongDBAgent采集发送到Pulsar成功的数据条数与InLongSort入库写入到Thive且去重复后的数据总量的比较。InLongDBAgent通过两点设计保障数据的完整性和指标数据的准确性。首先,设计Binlog位点的确认机制。通过这个机制保证采集拉取过程的连续性,避免采集跳点问题。InLongDBAgent中的每个Job拉取到数据、解析,处理完向后分发逻辑(包括,没有实际向后分发数据的场景,如需要跳过的位点,心跳时间产生的位逻辑位点等,也需要做加入和Ack的操作,移除时会更新当前的最小位置信息)之后,将位置信息保存到 ConcurrentSkipListSet 类型的集合中,当数据发送到Pulsar成功后,会走内部的位点Ack流程,从 ConcurrentSkipListSet 中移除位点的同时,将当前的集合中最小的位置,通过比较逻辑,更新到采集位点缓存,这个缓存信息作为当前采集完成的位置。后台通过周期线程,将当前的采集完的缓存位置信息同步到ZK和上报给InLongManager。当InLongDBAgent进程重启或Job被调度到新的InLongDBAgent节点上执行时,Job需要首先使用ZK中保存的位点信息进行初始化,进而保证从上次采集完成的位置继续开始拉取数据。需要注意的一点是,位点是通过异步方式进行更新保存的。因此,在重启或HA调度后,Job的续采可能产生少量的重复数据。其次,设计指标与数据一对一的保障机制。指标数据是在消息数据异步发送Pulsar消息后,回调处理的成功逻辑中生成的,通过汇聚计算,周期的发送到指标服务器。InLongDBAgent的进程停止和Job停止过程处理的相对闭环和复杂,需要保证发送给Pulsar的消息成功后的对账指标全部发送成功且最新的位点更新到ZK后再停止应用或Job。而在Kill-9这种非正常的操作情况下,会产生重复数据和导致指标丢失。这种情况下,所在分区的对账流程,需要人工介入处理。现网的环境是复杂的,业务的使用和运维场景也是多种多样的,位点确认保证机制,不能完全的避免跳点和丢数据。比如,采集过程中,因当前连接的DB发生故障,采集触发了连接切换,从新的DB节点上面拉取数据,如果这个节点上的Binlog文件数据存储在断层,即新的节点上Binlog不全或者采集位置所在的Binlog已经被清理了。还比如,采集过程因数据量较大或采集机器出现了资源瓶颈,出现采集延迟,采集进度赶不上服务器端Binlog的清理速度等。这些都是在运营过程中出现过的场景,这种情况就需要通过监控指标,及时的发现,及时的进行人工干预处理。基于Pulsar的分拣端分拣端架构设计InLongSort作为数据的分拣端,负责从Pulsar集群订阅数据,做反序列化、转换和入库。InLongSort是基于Flink框架实现的,实现过程中涉及很多Flink相关的机制、概念,本文不做过多的描述,有兴趣的同学可以到Flink社区官网查看相关解释。InLongSort的整体架构如图5所示,采集的数据目前主要被分拣入库到Thive中。图5InlongSort整体架构消费数据InLongSort订阅消费Pulsar集群中的数据,按照数据的处理流程,大体分为如图6所示的4个部分,这里未标出指标相关的算子。当然,不同的入库类型,会存在些些许差异。图6InlongSort的数据处理流程InLongSort是单任务(Oceanus任务),多Dataflow分拣的应用。因此,每个算子都需要处理多Dataflow的场景,Dataflow之间的数据流处理过程,在逻辑上是隔离的。Source算子,处理Dataflow中Souceinfo部分的解析和加载,处理Pulsar消息的订阅和向后分发。Deserialization算子,处理MQ消息数据的解析,按照配置拆分成不同字段内容,组织在Record中,向后分发。Sink算子,处理数据的入库逻辑。Commiter算子,处理入库数据的提交逻辑,以Thive为例,Commiter部分处理分区的创建,USPulsar消息的生产等。Commiter算子并不是所有入库类型都需要的,程序中会根据接入的库类型做区分处理。InLongSort的整体处理流程和设计是比较清晰的,但是实现相对比较复杂,中间算子的实现也在不断的在迭代演进,本文不做过多的描述,有兴趣的同学可以关注相关的分享或后续相关主题文章进行了解。基于调度平台的对账Runner是US调度系统中执行的实例概念,InLongSort分拣数据之后,通过Pulsar消息触发US平台执行相应的Runner。这里主要有‘触发’和‘对账’两个相关类型的任务。其中‘触发’任务是一个空任务,US的Pulsar消息的消费者收到对应的MQ消息后,通过‘触发’任务间接的拉起‘对账’任务。Pulsar应用在整个的数据采集、分拣的过程中,Pulsar作为数据和指标的中转站,分别接收InLongDBAgent上报的数据和发送成功的数据指标,接受InLongSort任务订阅数据,接收DBAgent-Audit订阅指标数据。下面分两个小节,分别介绍采集生产Pulsar消息和分拣消费Pulsar数据的使用场景、存在的问题和处理的经验。Pulsar生产生产场景通过第一节对InLongDBAgent的架构设计的介绍可知,每个InLongDBAgent的进程中,需要跑1-N个采集Job,每个Job负责采集一个DB实例上面的Binlog数据,每个Job对应一个Pulsar集群配置,将采集到的数据生产到这个Pulsar集群上,每个Job下包含多个Task,而每个Task对应一个PulsarTopic,这个Topic汇集一组符合过滤条件的库、表数据。转换到Pulsar部分对应关系如下图7所示:图7单Job内数据流对应的PulsarSDK对象由此可见,InLongDBAgent使用PulsarSDK的场景是,我们需要在单个的Java进程内创建、维护1-M个PulsarClient对象。并且,需要使用每个PulsarClient对象创建、维护1-N个Topic的Producer对象。问题与调优针对上一小节,说明的应用场景,需要考虑和处理如下几个问题:问题1: 是否全局维护PulsarClient对象,多个Job间如果配置相同,共用一个PulsarClient对象?我们在老版本,的确是这么实现的,这样不但能减少PulsarClient对象的个数,也能减少采集节点(每个InLongDBAgent部署节点看作一个采集节点)与Pulsar集群的连接数。但是,在实际的运营过程中我们遇到了如下两个问题。首先,Job之间的(Job之内的Task之间)数据量具有不均衡性,有的数据量可能会非常大,如流水数据表、指标数据表等,有的数据量可能非常小,如海外的部分业务订单等,有些库表具备周期性特点,如每天凌晨批量更新跑批的数据表等。这些,如果共用一个PulsarClient,创建Producer对象进行生产,Job之间采集的数据进度,存在因数量级的不同,产生的相互影响,最终导致大量的采集延迟。其次,为了保证数据采集的高可用性,系统需要具备根据机器负载,在集群内多个InLongDBAgent节点之间调度Job的能力(也就是说Job1上一时刻可能在InLongDBAgent-1上面执行,后面某一个时刻可能就被调度走,在InLongDBAgent-2上面执行了)。多个Job之间共用PulsarClient,需要根据共用信息的变化,动态的维护PulsarClient及Producer,这样不仅增加了开发、维护的难度,实现不好会导致Client及Producer的对象泄漏,为程序留下隐患。同时,在关闭ProducerClient的时候也可能对其他的处在中间状态的Job产生影响甚至丢数据。经过一段时间的论证和考量之后,版本迭代过程中,做了Job之间完全隔离的策略,即每个Job维护自己的PulsarClient对象,并在此对象的基础上创建这个Job里需要的1-N个Topic的Producer。这样从逻辑上完全避免了Job之间的相互影响。有的读者可能会问,Job内多个Task之间就不存在相互影响吗?是不会的或者影响基本上是可以忽略的。这是因为,每个Job采集的是同一个DB实例内的Binlog数据,数据只会按照顺序进行拉取,数据天然的具备先后顺序,不同Topic间基本上不会引发任何问题。此外,Job间完整的隔离,也方便Job在InLongDBAgent节点间做HA调度,降低了代码的开发和维护难度。这里有另一个问题,不得不提一下---即连接数问题(占用FD资源)。每个InLongDBAgent上面,会根据当前的机器配置(也就是所谓的异构机型),配置当前InLongDBAgent所能同时运行的Job的最大个数。当前节点与Pulsar集群的最大连接数,需要按如下公式进行估算(假设每个Job内的1-N个Topic的分区能覆盖分布到所有的Broker节点上):最大连接数 =(MaxJobsNum)*(PreBrokerConnectNum)*(PulsarBrokerNum)*Min(MaxPartitionNum,PulsarBrokerNum)例如: MaxJobsNum = 60 、PreBrokerConnectNum = 2、PulsarBrokerNum = 90 最大连接数 =(MaxJobsNum)*(PreBrokerConnectNum)*(PulsarBrokerNum)*Min(MaxPartitionNum,PulsarBrokerNum)=97200这个数值在一般的现网机器上面,算占比,也是非常大的值了,而且会随着Broker节点个数的增加、单个InLongDBAgent节点内Job个数的增加而增加,在现网部署、运维过程中一定要进行相应值的估算和部署规划,避面前期没有问题,运营过程中偶发大面积进程崩溃。问题2: 在使用PulsarProducer生产消息时,为了提高效率,是否能采用多线程生产?答案是肯定的,我们可以通过多线程分发生产消息。但是,如下实现方式(伪代码),可能会严重的降低生产效率:publicSenderextendsThread{Producerprodcuer;QueuemsgQueue;publicSender(Producerprodcuer,QueuemsgQueue){this.prodcuer=prodcuer;this.msgQueue=msgQueue;}publicvoidrun(){while(true){Messagemsg=msgQueue.poll();producer.asynSend(msg);}}}.....PulsarProducerprodcuer=newPulsarProducer();QueuemsgQueue=newQueue();Sendersender1=newSender(prodcuer,msgQueue).start();Sendersender2=newSender(prodcuer,msgQueue).start();如伪代码所示,多线程之间同时从msgQueue中poll数据,通过相同的Producer按照异步(或者同步,同步的效果会更明显)的方式,生产Pulsar消息,PulsarSDK在生产过程中,会在多个分区间轮训,需要做并发和锁的控制(有兴趣的同学可以看下PulsarSDK中对Producer部分的具体实现),这种共用Producer的方式,并不能体现到多线程并行发送的优势,反而会增加生产耗时,降低生产效率。如果需要多线程进行并发生产,需要每个线程内使用自己的Producer对象进行生产。改进方式,如下图所示:publicSenderextendsThread{QueuemsgQueue;publicSender(Stringtopic,QueuemsgQueue){this.prodcuer=newProdcuer(topic);this.msgQueue=msgQueue;}publicvoidrun(){while(true){Messagemsg=msgQueue.poll();producer.asynSend(msg);}}}上面,是我在采集端,开发、测试、运维过程中,发现的生产Pulsar消息,比较具有代表性的两个问题,大家可以根据自己的业务特点进行参考借鉴。Pulsar消费消费场景由第一节的背景介绍可知,InLongSort是基于Flink框架实现的,采用的是单任务(这里指的是Oceanus任务)多数据流(多Dataflow)的方式,即每个Oceanus任务下,处理1-N个Dataflow的数据分拣入库。每个Dataflow对应一个Topic的消费配置,且单个Dataflow支持订阅多个Pulsar集群的数据。由此可见,InLongSort订阅处理过程,与InLongDBAgent的生产消息场景有些类似,一个进程中需要根据1-N个Dataflow配置维护多个PulsarClient,处理对应的1-N个Topic的订阅。问题与调优InLongSort的消息订阅消费部分,先后演进了两个版本,下面分别说明一下第一个版本的处理方式和存在的问题,以及第二个版本的改进方式。在开始说明消息订阅部分之前,简单的描述一下InLongSort分拣DB数据的一些信息。DB数据目前主要是入Thive。其中MQ消费进度的位点、数据的分区状态、入库文件的可见性等状态信息是通过Flink的State机制进行维护,依赖Flink的Checkpoint机制周期保存到持久化存储。同时,依赖Checkpoint机制,完成文件的使用方可见性的控制。MQ消费位点的维护和分区内文件的可见性控制,这两点直接影响数据的完整性。例如,如果消费位点已经更新保存,但是这之前的消息还不能保证已经落库完毕,发生重启(预期或非预期的重启)就会导致数据丢失。与之相对应的,如果每次重启都从已经处理过且文件已经可见的消息位置开始消费,会导致数据被重复消费,数据重复入库,导致重复。因此,这两点是我们分拣处理过程中的重中之重。下面具体说明一下,第一个版本的消费处理过程和存在的问题。第一个版本,与PulsarFlinkConnector的处理方式类似,采用PulsarReader的方式实现。PulsarReader设计的初衷是,每个reader订阅一个Topic的一个Partition,即初始化时需要分PartitionTopic做配置,同时Reader订阅消费过程中会使用一个随机的、非持久化的消费组。随机的订阅组,对运维过程中的监控很不友好,每次重启,不得不重新获取、配置监控的消费组信息。为了便于运维,第一个版本,利用了当时PulsarBroker版本的一个漏洞(或者说是与设计相悖的能力,这点很难保证后续版本会持续存在),即为每个Reader指定了一个持久化的订阅组,并利用这个持久化的订阅组在Broker的统计数据,进行进度监控。另外,在分拣的运维过程中,经常会根据消息量,调整Flink任务的内存、并行度等配置,而部分配置调整后会影响State的恢复,即部分配置变更后,需要选择不从Checkpoint状态恢复启动。此外,运营过程中,经常会出现因预期和非预期的原因,需要重新入库一份数据的需求。从源头补充数据,显得略有些重,需要业务方做配置。而比较便利的方式,是从Pulsar的历史位置再重新消费一次数据。说明到这里,总结一下,我们需要分拣过程中具备的能力:便于运维监控消费进度;不从Checkpoint恢复时,不能丢数据;能够根据需求,动态的重置消费位点。  通过上面的描述可知,Reader方式的实现,显的有些鸡肋。首先,是消费组名称的问题,上面已经描述清楚,主要是不能保证后续版本的可用性。其次,不从Checkpoint恢复时,可能会导致丢失消息。不从Checkpoint恢复时,只能选择从最开始,还是最后(新)的位置开始消费,前者一定会导致数据重复,后者很大可能会导致丢数据。再次,是不能做停止后的调整位点操作,只能在运行过程中调整。为了解决Reader方式的潜在风险和问题,InLongSort消费部分的第二个版本,改为PuslarConsumer实现。首先,Consumer方式,支持使用持久化订阅消费组,便于运维监控消费进度,这个机制符合Pulsar的设计预期,不涉及到兼容性问题。其次,Consumer方式支持运行过程中及程序停止后的重置位点操作,应用场景更丰富。再次,是Consumer方式支持多种订阅模式,即Shared、Exclusive、Failover等,而分拣消费这种场景非常适合使用Exclusive方式。与Reader方式类似,在InLongSort中采用Exclusive模式创建Consumer时,也需要采用指定ParititonTopic的方式处理。特别说明一下,InLongSort这里为什么不选用Shared模式创建Consumer?最主要的,还是为了保证数据的完整性。对Pulsar的设计和实现机制有所了解的同学都会知道,Pulsar的消费模型与RockerMQ、Kafka等上一代MQ的设计区别很大,有兴趣的同学可以参考Pulsar社区的相关文档。如果在InLongSort这里采用Shared方式处理,会有哪些问题呢?InLongSort是一个Flink任务,有算子和并行度的概念,如果Source(订阅PulsarTopic消息的消费者所在的算子)采用Shared方式创建消费者,针对目标Topic创建的消费者都会消费这个Topic的消息,那如何保存消费位点呢?如果,重启时使用Broker端记录的位置开始消费,这样显然是有问题的,因为不能保证重启(正常或非预期)时,这个位置之前的消息已经入库成功了。如果,重启时从Checkpoint恢复,采用对应的State信息中记录的位点,那这里的State信息要如何保存呢?因为,所有的Consumer都会消费每个PartitionTopic的数据,也就是说,每个并行度内的Consumer都会有一份Ack的消费位点信息。那么重启后要从哪个位置开始呢?为了不丢失数据,我们不得不汇聚所有的State信息,针对每个PartitionTopic选一个最小的位置重置消费,这样不可避免的会导致数据重复。不但提高了程序的复杂度,增加了Checkpiont的大小,而且不得不选用UnionState类型做保存,当者类数据过大时,在重启时对任务非常的不友好,甚至可能会导致任务启动失败。 上面,是我在数据分拣的过程中,使用Pulsar时的分析、处理的一些经验,大家可以参考下。总结本文分享了ApacheInLong增量DB数据采集案例。首先,分别对InLongDBAgent、InLongSort、US对账Runner等部分的总体架构和部分能力进行了介绍。之后,着重分享了采集、分拣过程中使用Pulsar的一些经验,供大家做一定的参考。ApacheInLong各个组件的详细设计和实现细节可以围观ApacheInLong社区或相关主题的文档、课程分享。往期推荐《腾讯云微服务产品10月产品动态,TSE治理中心(北极星)实例支持跨地域节点》《腾讯云消息队列产品10月产品动态,RocketMQ支持无感迁移能力》《ApachePulsar在腾讯云上的最佳实践》《KMS在腾讯云的微服务实践助力其降本50%》《ApachePulsar技术系列-PulsarClient实现解析》《小鹅通基于TSE云原生API网关的落地实践》扫描下方二维码关注本公众号,了解更多微服务、消息队列的相关信息!解锁超多鹅厂周边!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-27 14:54 , Processed in 0.547419 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表