|
以dubbo源码为例-使用lambda重构面向对象模块
330 本文将以 Dubbo 源码为例,和您讨论如何使用 Lambda 和面向对象两种方法,对 Java 应用进行重构。并以实例展示了两者结合,写出简洁优雅的代码。1. 使用 Lambda 改善可读性和灵活性1.1 改善代码的可读性Java 8 的 Lambda 新特性也可以帮助提升代码的可读性:使用 Lambda,你可以减少冗长的代码,让代码更易于理解通过方法引用和 Stream API,你的代码会变得更直观1.2 使用场景在回调场景,可以使用 Lambda 简化代码。比如,我们在使用 JDBC 的时候,资源用完要马上释放,否则会资源泄漏,如代码示例。public String query(String sql) { connection = getConnection(); statement = connection.getStatement(sql); ResultSet rs = statement.executeQuery(); String result; try { while (rs.next()) { //执行具体操作 result = rs.getString("name"); } } finally { rs.close(); statement.close; } }query 函数中,除了取结果的 result = rs.getString("name") 这句,其他都是样板代码。这个时候,是不是可以使用模板方法的设计模式解决,抽象一个 AbstractQuery,并使用不同的子类实现查询,如下:public class AbstractQuery {public String query(String sql) { connection = getConnection(); statement = connection.getStatement(sql); ResultSet rs = statement.executeQuery(); String result; try { while (rs.next()) { //执行具体操作 result = getResult(rs); } return result; } finally { rs.close(); statement.close; } } abstract String getResult();}public class NameQuery extends AbstractQuery { public String query(ResultSet rs) { return rs.getString("name"); } public static void main(String[] args) { new NameQuery().query("xxxxx"); }}显然,这种方式不可取,因为查询的 sql 是动态的,得写无数类去实现。这个时候,我们可以使用函数式来实现。public class JbdcUtil {public String query(String sql, Function resFunc) { connection = getConnection(); statement = connection.getStatement(sql); ResultSet rs = statement.executeQuery(); String result; try { while (rs.next()) { //执行具体操作 result = resFunc.apply(rs); } return result; } finally { rs.close(); statement.close; } } public static void main(String[] args) { String result = JbdcUtil.query("select xxxxx", rs ->rs.getString("name")); }}这个样例中,使用了 Jdk 自带的 Function 来实现,因为 Function 的 apply 方法一个参数一个返回,和场景对应。通常,我们可以用以下几步来确定函数:1、查看需要的方法数量和是否需要返回2、根据1的结果,选择 jdk 自带函数,没有合适的则自己写。@FunctionalInterfacepublic interface Function { //根据参数和返回类型确定范型类型。上面示例中,我们需要根据 ResultSet 参数,获取 String 的返回 //则在参数中写 Function resFunc ,从而与 apply 方法对应 R apply(T t); } JbdcUtil.query("xxxxx", rs ->rs.getString("name")); 这里的实参 ,-> 前面的代表参数,后面则是 apply 的 override 实现,如果不习惯,可以写成以下形式,idea可以自动帮助简写。String result = JbdcUtil.query("select xxxxx", new Function() { @Override public String apply(ResultSet rs) { return rs.getString("name"); } });1.3 Dubbo 源码对 Lambda 的使用Dubbo 在 3.2 中,引入了完整的可观测性特性。可观测指标,需要对业务代码进行埋点,为统一埋点形式,使用了事件机制。如服务暴露时,需要获取向注册中心的注册 rt 延时、及异常次数指标。// rovider 启动,向注册中心注册当前实例public void register() { ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);}事件机制埋点 public void register() { MetricsEventBus.post(new RegisterEvent(System.currentMills())); try { ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel); } catch (Exception e) { MetricsEventBus.error(new RegisterEvent(System.currentMills())); throw e; } MetricsEventBus.finish(new RegisterEvent(System.currentMills())); }可以看到,加入事件后,比初始代码多了不少样板代码。使用 lambda 优化如下public void register() {MetricsEventBus.post(new RegisterEvent(), () -> { ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel); } );}//使用 jdk 自带 Supplier 函数,因为没有参数,只有返回public class MetricsEventBus { public static T post(MetricsEvent event, Supplier targetSupplier) { dispatcher.publishEvent(event); T result; try { result = targetSupplier.get(); } catch (Throwable e) { dispatcher.publishErrorEvent(event); throw e; } return result; }}以上代码对 Dubbo 源码进行了精简,保留了主要逻辑。2. 面向对象重构2.1 单次Rpc的请求指标源码分析Dubbo 收集的指标维度及类型非常的多,比如一次 rpc 请求,需要统计成功、失败次数,rt 方面,需要统计单次 rt,最近一次 rt,平均 rt,总 rt 等,需要一些容器来存放数据。 // lastRT, totalRT, rtCount, avgRT share a container, can utilize the system cache line private final ConcurrentMap rtSample = new ConcurrentHashMap(); private final ConcurrentMap minRT = new ConcurrentHashMap(); private final ConcurrentMap maxRT = new ConcurrentHashMap(); private final ConcurrentMap> rtGroupSample = new ConcurrentHashMap(); private final ConcurrentMap> groupMinRT = new ConcurrentHashMap(); private final ConcurrentMap> groupMaxRT = new ConcurrentHashMap();这些指标数据,有许多相同之处,也有部分差异,比如:初始化也不相同:不通类型的数据,初始化动作不同。如 int num = 0; long num = 0L; AtomicLongArray = new AtomicLongArray(4);计算方法不同:比较多的是自增,有取当前值(比如最近一次请求的rt), 也有比较大小(比如rt的最大值,每次需要当前值和集合中的最大值进行比较),还有计算平均值等等。导出方法不同:集合中数据,需要导出成统一格式(Dubbo 使用了 micrometer)。因为本身数据格式差异,导出方法也有相应差异。这一系列过程中,很容易把代码写出面向过程,比如对请求 rt 指标的初始化和计算:public void addRT(S source, Long rt) { MetricsCountSampleConfigurer sampleConfigure = new MetricsCountSampleConfigurer(); sampleConfigure.setSource(source); this.rtConfigure(sampleConfigure); M metric = sampleConfigure.getMetric(); // 初始化 AtomicLongArray 类型(不存在时) AtomicLongArray rtCalculator = ConcurrentHashMapUtils.computeIfAbsent(this.rtSample, metric, k -> new AtomicLongArray(4)); // 设置 rt 类型的值(last类型rt) rtCalculator.set(0, rt); // 自增类型的 rt 更新(累加类型rt) rtCalculator.addAndGet(1, rt); rtCalculator.incrementAndGet(2); // 计算 rt 最小值 LongAccumulator min = ConcurrentHashMapUtils.computeIfAbsent(minRT, metric, k -> new LongAccumulator(Long::min, Long.MAX_VALUE)); min.accumulate(rt); LongAccumulator max = ConcurrentHashMapUtils.computeIfAbsent(maxRT, metric, k -> new LongAccumulator(Long::max, Long.MIN_VALUE)); max.accumulate(rt); sampleConfigure.setRt(rt); sampleConfigure.getFireEventHandler().accept(sampleConfigure); }以上代码来自 SimpleMetricsCountSampler.addRT(),较明显的面向过程写法,把各容器相同阶段(初始化、计算,这里不包含导出)的不同计算操作(赋值、累加、平均值等),耦合在一个方法中。如果增加了一种容器及新类型计算(假如中位数),只能在addRt方法修改。面向过程的代码特点是,容易出 bug 且不易维护。2.2 注册中心指标重构实践1、容器类的抽象请求指标代码中,对不同数据容器,简单地使用 Map 存储。map 能存储数据,但是没有数据的处理能力,只能依赖调用代码执行处理。我们可以使用功能更全面的独立对象来表示容器。public class LongContainer extends ConcurrentHashMap { /** * 指标的 key 类型,比如注册、订阅、通知变更等 */ private final transient MetricsKeyWrapper metricsKeyWrapper; /** * 初始化函数 */ private final transient Function initFunc; /** * 计算函数 */ private final transient BiConsumer consumerFunc; /** * 导出函数 */ private transient Function valueSupplier; public LongContainer(MetricsKeyWrapper metricsKeyWrapper, Supplier initFunc, BiConsumer consumerFunc) { this.metricsKeyWrapper = metricsKeyWrapper; this.initFunc = s -> initFunc.get(); this.consumerFunc = consumerFunc; this.valueSupplier = k -> this.get(k).longValue(); } //省略其他代码}以上容器类,包含了数据和数据处理函数,代替之前的 Map。结合函数式编程,对容器类进行初始化,从而简化后续的计算、导出过程。public class RegistryStatComposite implements MetricsExport { public Map> applicationNumStats = new ConcurrentHashMap(); public Map> serviceNumStats = new ConcurrentHashMap(); //rt 容器变为 LongContainer 的集合,更方便遍历及统一处理 public List> rtStats = new ArrayList(); public RegistryStatComposite() { for (ApplicationType type : ApplicationType.values()) { // Application key and increment val applicationNumStats.put(type, new ConcurrentHashMap()); } for (ServiceType type : ServiceType.values()) { // Service key serviceNumStats.put(type, new ConcurrentHashMap()); } // App-level rtStats.addAll(initStats(OP_TYPE_REGISTER, false)); rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE, false)); rtStats.addAll(initStats(OP_TYPE_NOTIFY, false)); // Service-level rtStats.addAll(initStats(OP_TYPE_REGISTER_SERVICE, true)); rtStats.addAll(initStats(OP_TYPE_SUBSCRIBE_SERVICE, true)); //如果需要新增指标类型,此处增加即可 } //初始化容器,设置初始、计算及导出函数 private List> initStats(String registryOpType, boolean isServiceLevel) { List> singleRtStats = new ArrayList(); singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_LAST, isServiceLevel))); singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MIN, isServiceLevel), new LongAccumulator(Long::min, Long.MAX_VALUE))); singleRtStats.add(new LongAccumulatorContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_MAX, isServiceLevel), new LongAccumulator(Long::max, Long.MIN_VALUE))); singleRtStats.add(new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_SUM, isServiceLevel), (responseTime, longAccumulator) -> longAccumulator.addAndGet(responseTime))); // AvgContainer 比较特殊,存储了总数,但是导出函数是平均数计算函数 AtomicLongContainer avgContainer = new AtomicLongContainer(new MetricsKeyWrapper(registryOpType, MetricsKey.METRIC_RT_AVG, isServiceLevel), (k, v) -> v.incrementAndGet()); avgContainer.setValueSupplier(applicationName -> { LongContainer totalContainer = rtStats.stream().filter(longContainer -> longContainer.isKeyWrapper(MetricsKey.METRIC_RT_SUM, registryOpType)).findFirst().get(); AtomicLong totalRtTimes = avgContainer.get(applicationName); AtomicLong totalRtSum = (AtomicLong) totalContainer.get(applicationName); return totalRtSum.get() / totalRtTimes.get(); }); singleRtStats.add(avgContainer); return singleRtStats; } }指标计算:public void calcApplicationRt(String applicationName, String registryOpType, Long responseTime) { for (LongContainer container : appRtStats.stream().filter(longContainer -> longContainer.specifyType(registryOpType)).collect(Collectors.toList())) { Number current = (Number) ConcurrentHashMapUtils.computeIfAbsent(container, applicationName, container.getInitFunc()); //使用容器的计算函数,执行埋点后的指标计算 container.getConsumerFunc().accept(responseTime, current); }}可以看出,数据容器,在使用对象代替 map 后,看上去变的精简,可维护性也上升了。
|
|