|
作者:ninetyhe,腾讯CDG后台开发工程师本文详细描述如何实现:目前手上可用的资源仅剩一个16核剩余4-8G内存的机器,单点完成在1个小时内千万级别feed流数据flush操作(主要包括:读数据,计算综合得分,淘汰低分数据,并更新最新得分,回写缓存和数据库)。背景目前工作负责的一款产品增加了综合得分序的Feed流排序方式:需要每天把(将近1000W数据量)的feed流信息进行算分计算更新后回写到数据层。手上的批跑物理机器是16核(因为混部,无法独享CPU),同时剩下可用内存仅4-8G。显而易见的是:我们可以申请机器,多机部署,分片计算或者通过现有的大数据平台Hadoop进行运算都看似可以解决问题。但是由于更新feed流的操作需要依赖下游服务(这里暂且叫A,后续文中提到下游服务均可称A服务),而下游的服务A-Server本身是个DB强绑定的关系,也就说明了下游的服务瓶颈在于DB的QPS,这也导致了即便我本身的服务多机部署,分片处理,下游服务的短板导致不可行。而针对方案二通过大数据平台完成的话,也就是需要推荐大数据的部门协助处理,显然这个是需要排期处理,而时间上也是不可预估。既然如此,那就借用,朱光潜老先生的一篇文章《朝抵抗力最大的路径走》。我本人相信通过合理的资源调度以及更低的成本可以克服眼前的困难,实现最终的需求效果。当然优化过程中并不是一帆风顺,当然经过两周左右的优化迭代,也终于实现了。业务主要流程流程整个flush的业务流程大致如下:读取DB获取目前所有的feed类别(约2-3w的数据);通过类别读取Cache每一个类别下的feed流元素的索引(约1000-10w的数据);通过每一个信息的索引查询feed流所对应的基础数据信息(需要查约3-4张表);计算每一个feed元数据的得分信息(1000w的数据量),过程中需要淘汰一部分,调用服务A-Server删除当前的索引;根据权重计算每一个feed的元素的信息,调用下游服务A-Server,update索引分值。主要业务流程图具体如下:针对上述的业务逻辑,设计出了最初方案查询DB或者本地缓存获取索引feed流中的现有全集类别;foreach类别集合Collection,查询目前所以的类别下的feed数据流集合并存储到Map中,其中key是类别,value是类别对应的数组集合(key:category,value:colletion);foreach上述获取的Map并发起goruntine查询每一条信息流元素对应的基本信息,并通过粗排来淘汰需要淘汰的元素(考虑到下游的并发和DB的负载问题,每查询一批,sleep一段时间),把最终符合要求的元素存储到map等待后续更新得分,并刷入缓存和DB;foreach上述粗排后的Map,最终并发起goruntine调用下游A-server,更新feed流的索引得分。方案图如下:最初方案缺陷将近1000W的数据虽然在处理过程中,在使用后的集合或者Map都会及时清空:Map=nil []string=nil // 清空已使用的内容runtime.GC() // 发出GC的请求,希望发起GC但是问题还是出现了:内存跑满(由于机器总内存18G,所以基本是内存直接跑满了)Cpu也基本瞬间跑满堆栈中的异常compress@v1.12.2/zstd/blockdec.go:215 +0x149created by github.com/klauspost/compress/zstd.newBlockDec /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166goroutine 61 [chan receive, 438 minutes]:github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d6c0) /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149created by github.com/klauspost/compress/zstd.newBlockDec /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166goroutine 62 [chan receive, 438 minutes]:github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d790) /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149created by github.com/klauspost/compress/zstd.newBlockDec /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166goroutine 63 [chan receive, 438 minutes]:github.com/klauspost/compress/zstd.(*blockDec).startDecoder(0xc006c1d860) /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:215 +0x149created by github.com/klauspost/compress/zstd.newBlockDec /root/go/pkg/mod/github.com/klauspost/compress@v1.12.2/zstd/blockdec.go:118 +0x166因为堆栈给的信息不多,但是从机器上看基本是goruntine开启的太多,并发量太大,同时大量的数据同时加载到内存,导致了机器的内存和Cpu的负载过高。针对上述的问题,设计出了第二套方案:自己实现一套协程池预分配一个内存块,维持一个对象池对象池具体改进点如下:协程池实现比较简单,这里就直接上代码:// 协程池对象type oolBuilder struct { workerNum int // Worker 线程数量 DelJobChan *chan string // 缓冲队列}// 创建一个协程池func (pool *PoolBuilder) listenAdd(num int) { for i := 0; i { req.response() .putHeader("content-type", "text/plain") .end("Hello from Vert.x!"); }).listen(8080); }}对异步代码有兴趣的小伙伴一定要看看:https://vertx.io/优化改造开始借鉴了上述优秀的思想,我对自己的服务做了以下改进:1、我构造了4个协程池,分别是查询类别category、查询DB基本信息、根据算法计算综合得分、和数据更新回写;2、从主协程开始,不做任何阻塞,查询类别的协程协程池,每查询一个类别,结果直接丢到channelA(不阻塞然后继续擦下类别);3、查询DB的协程,监听channelA,当发现有数据的时候,查询DB信息,并将结果丢到channelB(同上不做任何阻塞,继续查询下一条数据的结果集合);4、帖子得分协程池读取channelB的数据,然后根据算法计算处理帖子的得分,并将结果集合丢到channelC(同样不做任何阻塞,继续计算下一次的得分数据);5、而数据回写的协程负责调用下游服务A-Server,处理后完,打log,标记处理的偏移量(由于没有阻塞,需要跟着最终所以数据是否处理完成)。业务架构设计如下:优化效果1、协程数6w->100!,这里协程数从6w降到了100个协程就Cover住了整个项目;2、内存使用情况,从基本跑满到仅仅使用1-2G的正常内存。3、CPU的使用460%的使用率直接降到65%:4、计算数据量1000w的时间6个小时并发算不完到1小时46分钟计算完成。总结:没想到自己的坚持看到了效果,自选股的业务中也因此可以接入综合得分序列的feed流,我相信这个是一个好的开始,在这个基础上,我们可以根据个人画像做更多的智能推荐,期间大伙的建议更多是借用大数据平台计算,而实际的推进和排期让我更愿意用自己的方式以最低的成本最优的结构去优化完成,当然这次很幸运,自己的努力实现了。
|
|