找回密码
 会员注册
查看: 20|回复: 0

买药秒送JADE动态线程池实践及原理浅析

[复制链接]

6

主题

0

回帖

19

积分

新手上路

积分
19
发表于 2024-10-9 21:16:02 | 显示全部楼层 |阅读模式
买药秒送 JADE动态线程池实践及原理浅析 京东健康 荣涛 京东技术 京东技术 北京京东尚科信息技术有限公司 京东官方技术分享平台。你想知道的京东前沿技术、创新思考、开源方案...这里应有尽有! 266篇内容 2024年09月09日 18:01 北京 01背景及JADE介绍理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将买药秒送是京东健康即时零售业务新的核心流量场域,面对京东首页高流量曝光,我们对频道页整个技术架构方案进行升级,保障接口高性能、系统高可用。动态线程池是买药频道应用的技术之一,我们通过3轮高保真压测最终初步确定了线程池的核心参数。但我们仍面临一些保障系统稳定性问题:如何监控线程池运行状态?以及因流量飙升出现任务堆积和拒绝时能否实时报警,线程池核心参数能否做到不重启应用,动态调整即时生效?经调研,业界成熟的动态线程池开源项目有 dynamic-tp(https://github.com/dromara/dynamic-tp) 和 hippo4j(https://github.com/opengoofy/hippo4j),在京东内部应用比较广泛的方案是 JADE ,几种方案实现思路大致相同,感兴趣可自行了解。JADE 是由京东零售中台-研发架构组维护的项目,动态线程池是JADE的组件之一,其稳定性已得到广泛验证(集团应用 300+),与JADE相辅相成的还有万象平台:是可视化的JADE管理端,集成配置、监控、审批等能力的JADE可视化平台,可以更高效的使用JADE组件,进一步提高工作效率。实现效果接入JADE和万象后,买药秒送线程池秒级监控效果如下:实时监控线程池运行状态以及阈值报警。下面我们从实践到原理一探究竟。02JADE动态线程池+万象可视化平台接入实践理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将JADE动态线程池和万象整体流程图如下:应用中需要引入 JADE、DUCC和 PFinder SDK,通过JADE创建线程池,线程池核心参数通过万象平台配置,集成 DUCC 实现动态调参,即时生效。线程池运行状态监控通过 PFinder 实现秒级监控。1、引入JADE POM依赖,jade从1.2.4版本开始支持万象 com.jd.jade jade 1.2.4 com.jd.pfinder pfinder-profiler-sdk 1.1.5-FINAL com.thoughtworks.xstream xstream 1.4.19 com.jd.purchase.config dbconfig-client-api 1.0.82、创建 jade.properties配置文件,并通过 Spring加载该配置文件。注意名字不能修改,JADE初始化会从该命名文件中加载配置属性# 万象平台环境配置jade.wx.env=pre# 以下为调试设置,线上环境无需配置jade.log.level=debugjade.meter.debug-enabled=trueSpring加载 JADE配置文件 classpath:jade.properties UTF-8 3、配置JADE启动类,负责 JADE 自定义初始化。如果不集成万象平台,则可以使用配置的DUCC空间配置和修改线程池参数。【推荐】如果使用万象,万象会为JDOS应用默认创建一个DUCC空间,使用万象的DUCC进行配置和更新。/** * @description:JADE配置类 * @author: rongtao7 * @date: 2024/4/5 1:09 下午 */@Configurationpublic class JadeConfig { @Value("ucc://${ducc.application}{ducc.token}@${ducc.hostPort}/v1/namespace/${ducc.namespace}/config/${ducc.config}/profiles/${ducc.profile}longPolling=15000") private String duccUrl; @Value("${jade.wx.env}") private String wxEnv; @Bean public InitializeBean jadeInitBean() { InitializeBean initializeBean = new InitializeBean(); // 注意这里,如果 uri 中 config 不是命名为 jade,则 name 属性需要设置为 jade ConfiguratorManager instance = new ConfiguratorManager(); instance.addResource("jade", duccUrl); initializeBean.setConfigServiceProvider(instance); // 万象环境 initializeBean.setWxEnv(wxEnv); return initializeBean; }}4、使用JADE创建线程池,并通过PFinder包装增强以支持trace的传递prestart() 用于预热核心线程/** * 线程池配置类,集成JADE和万象平台 */@Configurationpublic class TaskExecutePoolConfig { /** * 买药秒送频道线程池 */ @Bean public ExecutorService msChannelPagePool(){ //JADE组件创建线程池 ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder() .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称 .core(200) // 核心线程数 .max(200) // 最大线程数 .queue(100) // 设置队列长度,此队列支持动态调整 .callerRuns() // 拒绝策略,内置监控、日志 .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间 .prestart() // 预初始化所有核心线程数 .build(); // Pfinder增强 return PfinderContext.executorServiceWrapper(threadPoolExecutor); }}5、万象平台接入1)创建万象环境:第一次接入需要创建预发和生产环境。2)创建万象线程池组件6、验证效果线程池参数动态变更 - 万象,更新后可观测到如下日志,说明修改成功update executor 'MS_CHANNEL_PAGE_POOL' corePoolSize from 500 to 50update executor 'MS_CHANNEL_PAGE_POOL' maxPoolSize from 500 to 200update executor 'MS_CHANNEL_PAGE_POOL' keepAliveTime from 60 to 120 in secondsupdate executor 'MS_CHANNEL_PAGE_POOL' queueCapacity from 100 to 90 线程池监控 - PFinder,key格式为:executor.线程池名称.线程池状态(活跃/核心/最大线程数、队列大小、拒绝任务数)注:应用需开启pfinder监控并且PFinder SDK 要和 agent版本兼容线程池任务RT监控 & 线程池状态监控:线程池队列参数配置异常报警:以上几步操作,就完成了JADE和万象的动态线程池接入。下面从源码角度浅析一下原理。03原理源码浅析理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将动态线程池的核心本质是对JDK的ThreadPoolExecutor包装增强,集成UMP、PFinder、Ducc、万象平台,以实现线程池的可视化管理、动态调参、监控报警能力。线程池参数如何实现变更呢?线程池有4个关键参数,即:核心线程数、最大线程数、队列大小、存活时间4个。核心、最大线程数、存活时间3个参数通过JDK ThreadPoolExecutor 提供了 setCorePoolSize 、setMaximumPoolSize 和 setKeepAliveTime 支持更新参数。但队列长度 capacity 是不支持修改的,其使用private final 修饰。JADE是通过 ResizeableLinkedBlockingQueue 实现队列长度可变,实现方式是继承LinkedBlockingQueue,通过反射修改队列长度。下面是JADE动态线程池简易原理图:从万象平台更新参数开始,万象会将配置数据保存到MySQL数据库中,并通过发布操作将更新的配置推送到JADE的DUCC集成模块 DuccConfigService ,Linstener 监听到配置变更后调用 ThreadPoolExecutorUpdater 更新线程池参数,更新参数是通过继承JDK 的ThreadPoolExecutor 实现更新,以及通过ResizeableLinkedBlockingQueue 修改队列长度。JADE线程池监控能力通过Meter监控点 及 MeterRegistry监控工厂集成PFinder和UMP实现。了解基础原理后,从JADE配置类初始化过程及线程池创建过程,分别看一下源码实现。> JADE配置类初始化过程 - 源码探究 JADE InitBeanBase 注入了Spring容器,并利用Spring InitializingBean afterPropertiesSet() 执行自定义初始化逻辑。JADE 自定义初始化逻辑总共有8个初始化步骤,我们只需要关注其中几个即可。public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener { @Override public void afterPropertiesSet() throws Exception { log.info("jade init begin"); //1.读取配置文件,设置@Val属性值 initProperties(); //2.初始化日志级别 initLogLevel(); //3.初始化零售DBConfig initDbConfig(); //4.初始化DUCC initConfig(); //5.初始化万象配置 initWX(); //6.初始化 jvm ump key initUmps(); //7.初始化PFinderMeterRegistry监控工厂 initMeter(); //8.初始化JSF监听注册 JSF POOL initJsf(); UST.record(getClass()); log.info("jade init end"); }}1、initProperties()用于读取jade.properties配置文件,设置@Val属性值从根目录读取jade.properties配置文件,名字不可变,否则获取不到。public final class JadeConfigs { //从根目录读取 jade.properties private static synchronized Config initConfig() { //略... Object cfg = Thread.currentThread().getContextClassLoader().getResourceAsStream("jade.properties"); }}为Bean的@Val注解标注的属性设置值,如果jade.properties配置了则使用配置的,否则使用默认值。public abstract class InitBeanBase implements InitializingBean, ApplicationContextAware, ApplicationListener { //为@Val注解标注的属性设置值 private void parseSpringValue(Config cfg) { //Spring PropertyPlaceholderHelper:解析和替换占位符的工具 PropertyPlaceholderHelper helper = new PropertyPlaceholderHelper("${", "}", ":", true); //反射获取所有字段 for (Field f : FieldUtils.getAllFields(getClass())) { f.setAccessible(true); if (f.get(this) != null) { // may set explicitly continue; } //获取 @Val 注解 Val valAnno = f.getAnnotation(Val.class); if (valAnno != null & StringUtils.isNotEmpty(valAnno.value())) { try { //从Config(jade.properties) 配置文件读取属性值,没有则为默认值。 String actualVal = helper.replacePlaceholders(valAnno.value(), k -> { String v = cfg.getString(k); if (v == null) { v = applicationContext.getEnvironment().getProperty(k); } return v; }); if (actualVal != null) { Function parser = TYPE_PARSERS.get(f.getType()); if (parser != null) { Object parsedVal = parser.apply(actualVal); f.set(this, parsedVal); } } } catch (Exception e) { log.error("parse field {} error", f.getName()); throw e; } } } }}2、initConfig()初始化配置类中的jade配置的ducc,如果不集成万象,则使用这个ducc配置。使用万象,则使用万象平台配置的ducc。代码与万象初始化逻辑相同,参考下面的即可。3、initWX()初始化万象平台配置。万象初始化流程主要有3步骤:1.拼接使用万象默认配置的Ducc空间;2.启动监听;3.拉取配置更新JADE组件万象的默认Ducc空间格式为:通过应用名和环境Env的拼接:{ns:wxbizapps} {appName:diansong} {env:pre}class WXInit { //万象初始化 private void init0() { //1.万象默认的DUCC配置 String duccHost = DuccResource.getDefautHost(); Config config = JadeConfigs.getConfig(); String app = config.getString("jade.wx.app", "jdos_wxbizapps"); String token = config.getString("jade.wx.token", getDefaultDuccToken(duccHost)); String ns = config.getString("jade.wx.ns", "wxbizapps"); String cfg = config.getString("jade.wx.cfg", Env.getAppName()); if (failOrLog(cfg, "jade.wx.cfg")) { return; } String env = initBean.getWxEnv(); if (StringUtils.isEmpty(env)) { env = config.getString("jade.wx.env"); } if (failOrLog(env, "jade.wx.env")) { return; } String currentApp = Env.getDeployAppName(); if (failOrLog(currentApp, "current app name")) { return; } //DUCC URL拼接 String url = String.format(DuccResource.URL_FORMAT, app, token, duccHost, ns, cfg, env, 1000 * 60, isRequired()); log.info("connect to wanxiang via {}", url); // TODO: mark token //Resource Name jade-wx String resxName = "jade-wx"; ConfiguratorManager cm = new ConfiguratorManager(); cm.setApplication(currentApp); cm.addResource(resxName, url); cm.start(); //2.启动监听Ducc jade-wx ConfigService configService = new DuccConfigService(cm); //3.从万象平台拉配置更新JADE组件 configService.getConfig(resxName, JadeConfig.class); // TODO: not found, throws UST.record(getClass()); }}启动监听DUCC 调用 DuccConfigService init()初始化方法public class DuccConfigService implements ConfigService { //构造方法,注入DUCC ConfiguratorManager public DuccConfigService(@NonNull ConfiguratorManager configuratorManager) { if (configuratorManager == null) { throw new NullPointerException("configuratorManager is marked non-null but is null"); } else { //初始化 this.init(configuratorManager); } }}init()初始化方法中会启动万象DUCC的线程,并添加监听事件,监听Resource name 为 jade-wx的变化,变化后的回调函数通过 DuccConfigService.this.updateConfig(configuration)用来更新JADE组件//初始化方法private void init(ConfiguratorManager configuratorManager) { try { this.configuratorManager = configuratorManager; //1.启动Ducc线程 if (!configuratorManager.isStarted()) { if (StringUtils.isNotEmpty(Env.getDeployAppName())) { System.setProperty("application.name", Env.getDeployAppName()); } configuratorManager.start(); } List resources = DuccUtil.getResources(configuratorManager); Iterator var3 = resources.iterator(); while(var3.hasNext()) { final Resource resource = (Resource)var3.next(); //2.Ducc添加监听事件,Name是:jade-wx configuratorManager.addListener(new ConfigurationListener() { public String getName() { return resource.getName(); } //回调函数更新JADE组件 public void onUpdate(Configuration configuration) { DuccConfigService.this.updateConfig(configuration); } }); } UST.record(this.getClass()); } catch (Throwable var5) { throw var5; }}DuccConfigService更新方法调用 JadeConfig 的init()方法,根据万象平台配置更新JADE各个组件,包括动态线程池。public class JadeConfig implements JadeConfigSupport, InitializingObject { public static void init(JadeConfigSupport cfg) { //JADE-日志组件 更新 //JADE-动态线程池组件 更新 ThreadPoolExecutorUpdater.update(cfg.getExecutorConfig()); //JADE-本地缓存组件 更新 //.... }}5、ThreadPoolExecutorUpdater更新线程池参数核心类核心、最大线程数、存活时间是通过继承JDK ThreadPoolExecutor 实现更新的。在核心类中,当调大核心线程数后,会调用prestartAllCoreThreads() 对核心线程进行预热,所以不必担心调大核心线程数后发生的“抖动”问题(实际是创建线程的开销)。注意 core和max是一起更新的,否则可能会导致更改不生效的问题。ThreadPoolExecutorUpdater 更新线程池主要有以下5个步骤。updatePoolSize更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题setKeepAliveTime更新KeepAliveTime存活时间setCapacity 反射修改队列容量prestartAllCoreThreads() 预热核心线程数updateRejectSetting() 更新拒绝策略private static void update0(ExecutorConfigSupport.ExecutorSetting executorSetting, ThreadPoolExecutor executor) { //1.更新核心、最大线程数,注意需要一起同步更新,否则可能导致更新失败问题 updatePoolSize(executorSetting, executor); //2.更新KeepAliveTime存活时间 if (executorSetting.getKeepAliveSeconds() != null & executorSetting.getKeepAliveSeconds() != executor.getKeepAliveTime(TimeUnit.SECONDS)) { executor.setKeepAliveTime(executorSetting.getKeepAliveSeconds(), TimeUnit.SECONDS); } //3.更新队列 if (executorSetting.getQueueCapacity() != null) { if (executor.getQueue() instanceof LinkedBlockingQueue) { LinkedBlockingQueue currentQueue = (LinkedBlockingQueue) executor.getQueue(); int currentQueueCapacity = ResizableLinkedBlockingQueue.getCapacity(currentQueue); if (executorSetting.getQueueCapacity() > 0 & executorSetting.getQueueCapacity() != currentQueueCapacity) { //反射修改队列数量,signalNotFull ResizableLinkedBlockingQueue.setCapacity(currentQueue, executorSetting.getQueueCapacity()); } else if (executorSetting.getQueueCapacity() == 0) { //调整队列数量为0,注意丢任务风险。 if (BooleanUtils.isTrue(executorSetting.getForceResizeQueue())) { setWorkQueue(executor, new SynchronousQueue()); } else { // log } } } //else 省略 } //4.预热核心线程数 if (BooleanUtils.toBoolean(executorSetting.getPrestartAllCoreThreads()) & executor.getPoolSize() extends LinkedBlockingQueue { //反射设置队列大小 static void setCapacity(LinkedBlockingQueue queue, int capacity) { int oldCapacity = getCapacity(queue); FieldUtils.writeField(queue, FN_CAPACITY, capacity, true); int size = queue.size(); //如果队列中的任务已经达到老队列容量限制,并且新的容量大于队列任务数 if (size >= oldCapacity & capacity > size) { // thanks to https://www.cnblogs.com/thisiswhy/p/15457810.html MethodUtils.invokeMethod(queue, true, "signalNotFull"); } }}这里有一个细节,如果队列容量满了,当调整完队列数后,手动调用signalNotFull发出队列非满通知,唤醒阻塞线程,可以继续向队列插入任务了。> 创建JADE线程池build()- 源码探究以下是我们通过 JADE ThreadPoolExecutorBuilder 创建线程池的 Bean,核心逻辑在 build() 封装。/** * 秒送频道页线程池 */@Beanpublic ExecutorService msChannelPagePool(){ ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutorBuilder.newBuilder() .name(ThreadPoolName.MS_CHANNEL_PAGE_POOL.name()) // 线程池名称 .core(200) // 核心线程数 .max(200) // 最大线程数 .queue(1024) // 设置队列长度,此队列支持动态调整 .callerRuns() // 快捷设置拒绝策略为丢弃,内置监控、日志 .keepAliveTime(60L, TimeUnit.SECONDS) //线程存活时间 .prestart() // 预初始化所有核心线程数 .build(); return PfinderContext.executorServiceWrapper(threadPoolExecutor);}build() 主要逻辑有3步,1.创建线程池 ,2.启动所有核心线程, 3.注册线程池监控点public abstract class AbstractExecutorBuilder { public synchronized E build() { //1.创建线程池 this.executor = createExecutor(); //2.启动所有核心线程 if (this.prestartAllCoreThreads) { executor.prestartAllCoreThreads(); } //3.创建监控 initMonitor(); return this.executor; }}initMonitor()创建PFinder线程池监控,即 活跃线程数、核心/最大线程数,队列数量等。格式为:executor.线程池名.activeCount. (注意线程池一定要有名字)gauge() 方法内部集成PFinder,使用代码编程的方式进行Gauge埋点,用于记录线程池的瞬时值指标:活动线程数、核心/最大、队列大小等。PFinder埋点方式详见PFinder文档。public abstract class MeterRegistry { public List gaugeExecutor(String executorName, ThreadPoolExecutor executor) { String namePrefix = "executor." + executorName; return gaugeExecutor0(namePrefix, executor); } private List gaugeExecutor0(String namePrefix, ThreadPoolExecutor executor) { namePrefix += "."; List gauges = new ArrayList(); if (getConfig().isThreadPoolAllMetricsEnabled()) { gauges.add(gauge(namePrefix + "taskCount", executor::getTaskCount)); gauges.add(gauge(namePrefix + "completedTaskCount", executor::getCompletedTaskCount)); } gauges.add(gauge(namePrefix + "activeCount", executor::getActiveCount)); gauges.add(gauge(namePrefix + "corePoolSize", executor::getCorePoolSize)); gauges.add(gauge(namePrefix + "maxPoolSize", executor::getMaximumPoolSize)); gauges.add(gauge(namePrefix + "poolSize", executor::getPoolSize)); gauges.add(gauge(namePrefix + "queueSize", () -> executor.getQueue().size())); // return gauges; }}04 避坑指南理解,首先 MCube 会依据模板缓存状态判断是否需要网络获取最新模板,当获取到模板后进行模板加载,加载阶段会将产物转换为视图树的结构,转换完成后将通过表达式引擎解析表达式并取得正确的值,通过事件解析引擎解析用户自定义事件并完成事件的绑定,完成解析赋值以及事件绑定后进行视图的渲染,最终将线程池必须有名字,监控依赖,并且不能重名。当系统有问题时也便于通过jstack等工具排查定位问题。应用需开启pfinder监控并且PFinder SDK 要和 agent版本兼容线程池创建后,线程不会立即启动,而是在有任务提交时才启动,启动的瞬间会因为创建线程的开销造成性能“抖动”,可以使用prestartAllCoreThreads() 预热核心线程。线程池的核心线程,默认是不会回收的,如果一个线程池活跃度长时间很低,建议调整核心线程数,过多的线程会浪费内存资源,影响系统稳定性。Future、CompletableFuture异步任务使用线程池时设置合理的超时时间,避免因外部服务故障或网络等问题导致任务长时间阻塞,造成资源浪费,严重甚至拖垮整个线程池,导致线上问题。同理,系统中请求外部Http请求时,必须设置超时时间,避免资源被长时间占用无法释放,影响系统性能和稳定性。推荐阅读万字长文浅谈三高系统建设方法论和实践业务复杂度治理方法论--十年系统设计经验总结Proxyless的多活流量和微服务治理【京东保险-技术平台部-平台研发部】一群AI卖保险的程序员关注【京东技术】后台回复【加入京东】获取专属社招和校招内推码!底层能力:维护用户基础数据、行为数据建模、用户画像分析、精准营销策略的制定功能支撑:会员成长体系、等级计算策略、权益体系、营销底层能力支持用户活跃:会员关怀、用户触达、活跃活动、业务线交叉获客、拉新促活
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-1 09:48 , Processed in 0.712445 second(s), 25 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表