找回密码
 会员注册
查看: 20|回复: 0

以dubbo源码为例-使用lambda重构面向对象模块

[复制链接]

6

主题

0

回帖

19

积分

新手上路

积分
19
发表于 2024-10-12 23:43:17 | 显示全部楼层 |阅读模式
以dubbo源码为例-使用lambda重构面向对象模块 330 本文将以 Dubbo 源码为例,和您讨论如何使用 Lambda 和面向对象两种方法,对 Java 应用进行重构。并以实例展示了两者结合,写出简洁优雅的代码。1. 使用 Lambda 改善可读性和灵活性1.1 改善代码的可读性Java 8 的 Lambda 新特性也可以帮助提升代码的可读性:使用 Lambda,你可以减少冗长的代码,让代码更易于理解通过方法引用和 Stream API,你的代码会变得更直观1.2 使用场景在回调场景,可以使用 Lambda 简化代码。比如,我们在使用 JDBC 的时候,资源用完要马上释放,否则会资源泄漏,如代码示例。public String query(String sql) {        connection = getConnection();        statement = connection.getStatement(sql);        ResultSet rs = statement.executeQuery();        String result;        try {            while (rs.next()) {                //执行具体操作                result = rs.getString("name");            }        } finally {            rs.close();            statement.close;        }    }query 函数中,除了取结果的 result = rs.getString("name") 这句,其他都是样板代码。这个时候,是不是可以使用模板方法的设计模式解决,抽象一个 AbstractQuery,并使用不同的子类实现查询,如下:public class AbstractQuery {public String query(String sql) {        connection = getConnection();        statement = connection.getStatement(sql);        ResultSet rs = statement.executeQuery();        String result;        try {            while (rs.next()) {                //执行具体操作                result = getResult(rs);            }          return result;        } finally {            rs.close();            statement.close;        }    }    abstract String getResult();}public class NameQuery extends AbstractQuery {  public String query(ResultSet rs) {    return rs.getString("name");  }    public static void main(String[] args) {        new NameQuery().query("xxxxx");    }}显然,这种方式不可取,因为查询的 sql 是动态的,得写无数类去实现。这个时候,我们可以使用函数式来实现。public class JbdcUtil {public String query(String sql, Function resFunc) {        connection = getConnection();        statement = connection.getStatement(sql);        ResultSet rs = statement.executeQuery();        String result;        try {            while (rs.next()) {                //执行具体操作                result = resFunc.apply(rs);            }            return result;        } finally {            rs.close();            statement.close;        }    }     public static void main(String[] args) {       String result = JbdcUtil.query("select xxxxx", rs ->rs.getString("name"));    }}这个样例中,使用了 Jdk 自带的 Function 来实现,因为 Function 的 apply 方法一个参数一个返回,和场景对应。通常,我们可以用以下几步来确定函数:1、查看需要的方法数量和是否需要返回2、根据1的结果,选择 jdk 自带函数,没有合适的则自己写。@FunctionalInterfacepublic interface Function {   //根据参数和返回类型确定范型类型。上面示例中,我们需要根据 ResultSet 参数,获取 String 的返回   //则在参数中写  Function resFunc ,从而与 apply 方法对应    R apply(T t); }    JbdcUtil.query("xxxxx", rs ->rs.getString("name")); 这里的实参 ,-> 前面的代表参数,后面则是 apply 的 override 实现,如果不习惯,可以写成以下形式,idea可以自动帮助简写。String result = JbdcUtil.query("select xxxxx", new Function() {            @Override            public String apply(ResultSet rs) {                return rs.getString("name");            }        });1.3 Dubbo 源码对 Lambda 的使用Dubbo 在 3.2 中,引入了完整的可观测性特性。可观测指标,需要对业务代码进行埋点,为统一埋点形式,使用了事件机制。如服务暴露时,需要获取向注册中心的注册 rt 延时、及异常次数指标。// rovider 启动,向注册中心注册当前实例public void register() {   ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);}事件机制埋点    public void register() {        MetricsEventBus.post(new RegisterEvent(System.currentMills()));        try {            ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);        } catch (Exception e) {            MetricsEventBus.error(new RegisterEvent(System.currentMills()));            throw e;        }        MetricsEventBus.finish(new RegisterEvent(System.currentMills()));    }可以看到,加入事件后,比初始代码多了不少样板代码。使用 lambda 优化如下public void register() {MetricsEventBus.post(new RegisterEvent(),                () -> {        ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);                }            );}//使用 jdk 自带 Supplier 函数,因为没有参数,只有返回public class MetricsEventBus {    public static  T post(MetricsEvent event, Supplier targetSupplier) {        dispatcher.publishEvent(event);        T result;        try {            result = targetSupplier.get();        } catch (Throwable e) {            dispatcher.publishErrorEvent(event);            throw e;        }        return result;    }}以上代码对 Dubbo 源码进行了精简,保留了主要逻辑。2. 面向对象重构2.1 单次Rpc的请求指标源码分析Dubbo 收集的指标维度及类型非常的多,比如一次 rpc 请求,需要统计成功、失败次数,rt 方面,需要统计单次 rt,最近一次 rt,平均 rt,总 rt 等,需要一些容器来存放数据。 // lastRT, totalRT, rtCount, avgRT share a container, can utilize the system cache line    private final ConcurrentMap rtSample = new ConcurrentHashMap();    private final ConcurrentMap minRT = new ConcurrentHashMap();    private final ConcurrentMap maxRT = new ConcurrentHashMap();    private final ConcurrentMap> rtGroupSample = new ConcurrentHashMap();    private final ConcurrentMap> groupMinRT = new ConcurrentHashMap();    private final ConcurrentMap> groupMaxRT = new ConcurrentHashMap();这些指标数据,有许多相同之处,也有部分差异,比如:初始化也不相同:不通类型的数据,初始化动作不同。如 int num = 0; long num = 0L; AtomicLongArray = new AtomicLongArray(4);计算方法不同:比较多的是自增,有取当前值(比如最近一次请求的rt), 也有比较大小(比如rt的最大值,每次需要当前值和集合中的最大值进行比较),还有计算平均值等等。导出方法不同:集合中数据,需要导出成统一格式(Dubbo 使用了 micrometer)。因为本身数据格式差异,导出方法也有相应差异。这一系列过程中,很容易把代码写出面向过程,比如对请求 rt 指标的初始化和计算:public void addRT(S source, Long rt) {        MetricsCountSampleConfigurer sampleConfigure = new MetricsCountSampleConfigurer();        sampleConfigure.setSource(source);        this.rtConfigure(sampleConfigure);        M metric = sampleConfigure.getMetric();        // 初始化 AtomicLongArray 类型(不存在时)        AtomicLongArray rtCalculator = ConcurrentHashMapUtils.computeIfAbsent(this.rtSample, metric, k -> new AtomicLongArray(4));        // 设置 rt 类型的值(last类型rt)        rtCalculator.set(0, rt);        // 自增类型的 rt 更新(累加类型rt)        rtCalculator.addAndGet(1, rt);        rtCalculator.incrementAndGet(2);        // 计算 rt 最小值        LongAccumulator min = ConcurrentHashMapUtils.computeIfAbsent(minRT, metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE));        min.accumulate(rt);        LongAccumulator max = ConcurrentHashMapUtils.computeIfAbsent(maxRT, metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE));        max.accumulate(rt);        sampleConfigure.setRt(rt);        sampleConfigure.getFireEventHandler().accept(sampleConfigure);    }以上代码来自 SimpleMetricsCountSampler.addRT(),较明显的面向过程写法,把各容器相同阶段(初始化、计算,这里不包含导出)的不同计算操作(赋值、累加、平均值等),耦合在一个方法中。如果增加了一种容器及新类型计算(假如中位数),只能在addRt方法修改。面向过程的代码特点是,容易出 bug 且不易维护。2.2 注册中心指标重构实践1、容器类的抽象请求指标代码中,对不同数据容器,简单地使用 Map 存储。map 能存储数据,但是没有数据的处理能力,只能依赖调用代码执行处理。我们可以使用功能更全面的独立对象来表示容器。public class LongContainer extends ConcurrentHashMap {    /**     * 指标的 key 类型,比如注册、订阅、通知变更等     */    private final transient MetricsKeyWrapper metricsKeyWrapper;    /**     *  初始化函数      */    private final transient Function initFunc;    /**     *  计算函数      */    private final transient BiConsumer consumerFunc;      /**     *  导出函数      */    private transient Function valueSupplier;    public LongContainer(MetricsKeyWrapper metricsKeyWrapper, Supplier initFunc, BiConsumer consumerFunc) {        this.metricsKeyWrapper = metricsKeyWrapper;        this.initFunc = s -> initFunc.get();        this.consumerFunc = consumerFunc;        this.valueSupplier = k -> this.get(k).longValue();    }   //省略其他代码}以上容器类,包含了数据和数据处理函数,代替之前的 Map。结合函数式编程,对容器类进行初始化,从而简化后续的计算、导出过程。public class RegistryStatComposite implements MetricsExport {    public Map> applicationNumStats = new ConcurrentHashMap();    public Map> serviceNumStats = new ConcurrentHashMap();   //rt 容器变为 LongContainer 的集合,更方便遍历及统一处理    public List> rtStats = new ArrayList();    public RegistryStatComposite() {        for (ApplicationType type : ApplicationType.values()) {            // Application key and increment val            applicationNumStats.put(type, new ConcurrentHashMap());        }        for (ServiceType type : ServiceType.values()) {            // Service key            serviceNumStats.put(type, new ConcurrentHashMap());        }        // App-level        rtStats.addAll(initStats(OP_TYPE_REGISTER, false));        rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE, false));        rtStats.addAll(initStats(OP_TYPE_NOTIFY, false));        // Service-level        rtStats.addAll(initStats(OP_TYPE_REGISTER_SERVICE, true));        rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE_SERVICE, true));              //如果需要新增指标类型,此处增加即可    }    //初始化容器,设置初始、计算及导出函数    private List> initStats(String registryOpType, boolean isServiceLevel) {        List> singleRtStats = new ArrayList();        singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_LAST, isServiceLevel)));        singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MIN, isServiceLevel), new LongAccumulator(Long::min, Long.MAX_VALUE)));        singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MAX, isServiceLevel), new LongAccumulator(Long::max, Long.MIN_VALUE)));        singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_SUM, isServiceLevel), (responseTime, longAccumulator) -> longAccumulator.addAndGet(responseTime)));        // AvgContainer 比较特殊,存储了总数,但是导出函数是平均数计算函数        AtomicLongContainer avgContainer = new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_AVG, isServiceLevel), (k, v) -> v.incrementAndGet());        avgContainer.setValueSupplier(applicationName -> {            LongContainer totalContainer = rtStats.stream().filter(longContainer -> longContainer.isKeyWrapper(MetricsKey.METRIC_RT_SUM, registryOpType)).findFirst().get();            AtomicLong totalRtTimes = avgContainer.get(applicationName);            AtomicLong totalRtSum = (AtomicLong) totalContainer.get(applicationName);            return totalRtSum.get() / totalRtTimes.get();        });        singleRtStats.add(avgContainer);        return singleRtStats;    } }指标计算:public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) {    for (LongContainer container : appRtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) {        Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName, container.getInitFunc());        //使用容器的计算函数,执行埋点后的指标计算        container.getConsumerFunc().accept(responseTime, current);    }}可以看出,数据容器,在使用对象代替 map 后,看上去变的精简,可维护性也上升了。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-26 11:59 , Processed in 0.331856 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表