找回密码
 会员注册
查看: 18|回复: 0

Oak索引功能在apachedruid的应用实践

[复制链接]

7

主题

0

回帖

22

积分

新手上路

积分
22
发表于 2024-10-9 17:08:11 | 显示全部楼层 |阅读模式
Oak索引功能在apache druid的应用实践 Oak索引功能在apache druid的应用实践 王啸 贝壳产品技术 贝壳产品技术 “贝壳产品技术公众号”作为贝壳官方产品技术号,致力打造贝壳产品、技术干货分享平台,面向互联网/O2O开发/产品从业者,每周推送优质产品技术文章、技术沙龙活动及招聘信息等。欢迎大家关注我们。 242篇内容 2021年07月01日 17:32 一、什么是OakOak(Off-heap Allocated Keys)是用于数据实时分析的可伸缩、并发的键值映射kv-map。它保持所有键和值都在堆外,与使用标准JVM堆管理相比,使用相同的内存占用下允许存储更多的数据(最多3倍的数据)。OakMap实现了行业标准Java8 ConcurrentNavigableMap API。它为读、写、读-修改-写操作以及(非原子)范围查询(扫描)操作(向前和向后)提供强(原子)语义。OakMap针对大键和值进行了优化,特别是针对对象的增量维护。它比流行的Java ConcurrentNavigableMap ConcurrentSkipListMap更快,并且在额外的CPU内核下可以更好地扩展。Oak的设计要点是包含一个堆上索引到堆外键和值。OakMap的索引结构为一个连续内存块的列表,由于访问局部性和缓存友好性,这加快了索引的搜索速度。OakMap的键和值被复制并存储在自管理的离堆字节数组中,数据结构如下图所示:图1.1 Oak map的数据结构OakMap需要为构建器定义多个参数,如下所述。在构造离堆OakMap时,需要指定内存容量(每个OakMap实例)。OakMap在构建时以请求的容量分配堆外内存,然后管理这些内存。OakMap给出了键K和值V,它们被要求与序列化器、反序列化器和大小计算器一起提供,主要创建过程如下面代码所示:OakMapBuilderbuilder=newOakMapBuilder().setKeySerializer(newMyAppKeySerializer()).setValueSerializer(newMyAppValueSerializer()).setMinKey(...).setKeysComparator(newMyAppKeyComparator()).setMemoryCapacity(...);OakMapOak=builder.build();publicOakMapbuild(){...returnnewOakMap(minKey,keySerializer,valueSerializer,comparator,chunkMaxItems,valuesMemoryManager,keysMemoryManager);}总结一下,Oak相比传统的ConcurrentSkipListMap的优势:OakMap提供了出色的性能:它采用细粒度同步,因此可以很好地扩展线程数,它还通过避免内存碎片来实现缓存友好OakMap将键和数据放在堆外,因此允许在没有JVM GC开销的情况下处理巨大的堆(RAM)——甚至超过50G。为了支持堆外操作,OakMap采用了嵌入式的、高效的内存管理,基本上消除了JVM GC的开销。OakMap为原子访问数据提供了丰富的API。降序扫描:OakMap加速降序扫描,而不增加额外的开销由于Oak存在上面所述的优势,开源界也一直讨论将Oak的增量索引作为当前现有apache Druid的增量索引构建方案,希望可以在实际应用中显示出更好的性能结果。二、Oak在Druid的应用研究Oak索引功能在Druid的上应用研究主要从两方面展开,一个是在基于hadoop-index离线摄入任务的应用测试,另一个是基于kafka-index实时摄入任务的应用测试。2.1 离线摄入Druid离线场景下使用的数据源主要以Hive表为主,整个作业过程如下图所示:图2.1 hadoop index离线作业导入过程如上图所示,Oak索引功能的应用主要体现在在hadoop indexing job的index Generator阶段,使用Oak map替代ConcurrentSkipListMap,来实现IncrementalIndex增量索引结构,它实现了Iterable接口,并且支持通过add(InputRow row)方法插入新数据,新数据的Metric通过Aggragator进行聚合。本轮测试我们使用的两个数据源,一个行数较小的数据源,小于2000万行,另外一个行数较多,数据源行数上亿,详细数据源信息如下表所示:表2.1 数据源信息表测试硬件环境:hadoop版本cdh3.0.0,专用队列资源Druid单机部署,硬件条件:128G内存、48个逻辑核、4.8T固态盘2.1.1 数据源A针对数据源A,我们在IndexGenerator阶段针对不同资源配比下统计10轮次再计算平均作业时长,CPU占用时间和内存分配大小,统计数据如下列组图所示:图2.2 数据源A的Index Generator阶段作业时长统计从上图看,我们可以看出当map和reduce设置过小的时候,采用onheap索引功能因为OutOfMemoryError错误无法运行,而Oak索引功能虽然可以在较低的内存配置下运行,但还是会在reduce过程中某几个container因为shuffle的数据过大内存不够用而重试若干次,增加了任务整体运行时长,这样的运行时长参考性不大,所以后面的对比我们直接取后三列的资源配比参数。图2.3 数据源A的Index Generator阶段CPU时间统计图2.4 数据源A的Index Generator阶段内存分配2.1.2 数据源B针对数据源A测试,我们发现统计粒度还不够细化,没有剔除掉map阶段的干扰,因为Oak增量索引正式运行是在reduce阶段。在数据源B的测试中,我们确保队列资源充足,container资源一致,保证map/reduce阶段没有出现一次failed或者killed情况的任务运行速度。因为如果出现failed或者killed情况,map和reduce会进行重试,这样统计出来的运行时间无法真实反映索引能力。在数据源B的测试中我们也补充了IndexGeneratorJob中reduce过程的GC时间和CPU时间统计,统计结果如下图所示:图2.5 数据源B的Index Generator阶段作业时长统计图2.6 数据源B的Index Generator阶段内存分配图2.7 数据源B的Index Generator阶段reduce过程的GC时间图2.8 数据源B的Index Generator阶段reduce过程的CPU时间主要结论:从离线导入数据指标对比来看,针对大数据量/大时间跨度的数据源在同等资源配比下采用Oak索引的导入平均时长只有onheap的80%-85%,这对于加速离线数据导入是很有帮助的。2.2 实时摄入Druid实时数据摄取,我们采用kafka index service服务获得,主要操作过程如下图所示:图2.9 kafka index job主要作业过程在用数据摄取阶段,采用哪种增量索引方式主要根据构建任务时候指定appendableIndexSpec参数决定采取不同类型增量索引构建。实时任务使用数据源,采用的是离线测试的数据源A ,文件格式为json格式,数据量大概在4G左右,通过kafka-topics.sh创建kafka topic,生成测试实时数据集。使用Druid kafka index job任务一次性摄取全部数据,kafka index job资源调整到Druid.indexer.runner.javaOpts=-server -Xms4g -Xmx4g -XX:MaxDirectMemorySize=4g资源配置,这里我们预设最大内存封顶配额是48G。在48G的内存限定,CPU资源未打满的情况下:onheap索引:可启动1到10个并行kafka index作业任务Oak索引:可启动1到12个并行kafka index作业任务详细测试统计信息如下图所示:图2.10 onheap/oak索引实时任务在不同并发数下的CPU利用率统计图2.11 onheap/oak索引实时任务在不同并发数下的内存占用统计图2.12 onheap/oak索引在同资源占比下摄取数据吞吐量对比主要结论:通过测试数据,我们可以看出基于Oak的索引任务的内存利用效果和CPU效率显著提高,达到同样的性能,Oak相比onheap只占用原来60%内存和CPU资源,在内存和CPU资源稀缺的情况下,使用同等预算系统吞入量接近提升一倍。三、主要改动点结合社区的分享经验,我们是在Druid-0.18.1版本上进行改进,OakIncrementalIndex的实现主要借鉴了OnHeap Incremental Index,显著区别有:它将keys和values都存储在堆外它是基于OakMap而不是Java的ConcurrentSkipList(CSL)它不需要保持从行索引到实际行的映射它始终是有序的为了获得实现的最佳性能,对IncrementalIndexRow和IncrementalIndex的接口进行一些修改,改造类和主要方法如下图所示:图3.1 改造涉及的类和方法主要改造点有:IncrementalIndex的修改:修改了getMetric*Value方法以接受IncrementalIndexRow而不是int型的rowOffSet,因为OakIncrementalIndex没有保持从行索引到实际行的映射,这不会影响其他实现的性能,因为在所有使用这些方法的时候,调用者都已经有了IncrementalIndexRow对象。IncrementalIndexRow的修改:允许对堆外键进行延迟计算,添加/修改以下方法:修改public Object[] getDim()为public Object getDim(int index)添加public int getdimlength()添加public boolean isDimNull(int index)添加public IndexedInts getStringDim(final int dimIndex)OakIncremtalIndex的addFacts方法是实现数据预聚合的主要方法,实现时候注意rollup是否开启,如果rollup打开使用Oak的key序列化器分配行索引,序列化器只在插入时调用,如果键已经存在,它不会增加索引,主要实现代码如下:finalOakInputRowContextctx=newOakInputRowContext(rowContainer,row);if(rollup){//Inrollupmode,weletthekey-serializerassigntherowindex.//Uponlookup,thecomparatorignoresthisspecialindexvalueandonlycomparesaccordingtothekeyitself.//Theserializerisonlycalledoninsertion,soitwillnotincrementtheindexifthekeyalreadyexits.key.setRowIndex(OakKey.Serializer.ASSIGN_ROW_INDEX_IF_ABSENT);}else{//Inplainmode,weforceanewrowindex.//Uponlookup,sincethereisnokeywiththisindex,anewkeywillbeinsertedeverytime.key.setRowIndex(rowIndexGenerator.getAndIncrement());}如何使用Oak索引功能:在创建离线作业和实时作业时,在tuningConfig指定相关参数:"tuningConfig":{..."appendableIndexSpec":{"type":"oak"//onheap},...} 预览时标签不可点 大数据4大数据 · 目录#大数据上一篇Druid在贝壳的应用实践下一篇Flink运维体系在贝壳的实践关闭更多小程序广告搜索「undefined」网络结果
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-4 05:37 , Processed in 0.465184 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表