找回密码
 会员注册
查看: 15|回复: 0

MySQL亿级数据平滑迁移实战

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
68148
发表于 2024-10-9 16:46:22 | 显示全部楼层 |阅读模式
MySQL亿级数据平滑迁移实战 服务器团队 vivo互联网技术 vivo互联网技术 维沃移动通信有限公司 分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。 436篇内容 2024年08月21日 20:00 广东 作者:来自 vivo 互联网服务器团队- Li Gang本文介绍了一次 MySQL 数据迁移的流程,通过方案选型、业务改造、双写迁移最终实现了亿级数据的迁移。一、背景预约业务是 vivo 游戏中心的重要业务之一。由于历史原因,预约业务数据表与其他业务数据表存储在同一个数据库中。当其他业务出现慢 SQL 等异常情况时,可能会直接影响到预约业务,从而降低系统整体的可靠性和稳定性。为了尽可能提高系统的稳定性和数据隔离性,我们迫切需要将预约相关数据表从原来的数据库中迁移出来,单独建立一个预约业务的数据库。二、方案选型常见的迁移方案大致可以分为以下几类:而预约业务有以下特点:读写场景多,频率高,在用户预约/取消预约/福利发放等场景均涉及到大量的读写。不可接受停机,停机不可避免的会造成经济损失,在有其他方案的情况下不适合选择此方案。大部分的场景能接受秒级的数据不一致,少部分不能。结合这些特点,我们再评估下上面的方案:停机迁移方案需要停机,不适用于预约场景。预约场景存在不活跃的用户数据,如果用渐进式迁移方案的话很难迁移干净,可能还需要再写一个迁移任务来辅助完成迁移。而双写方案最大的优势在于每一步操作都可向上回滚,能尽可能的保证业务不出问题。因此,最终选择的是双写方案。预约业务涉及到的读写场景多,每一个场景单独进行改造的成本大,采用 Mybatis 插件来实现迁移所需的双写等功能,可以有效降低改造成本。三、前期准备3.1 全量同步&增量同步&一致性校验这几步使用了公司提供的数据同步工具。全量同步基于 MySQLDump 实现;增量同步基于 binlog 实现;一致性校验通过在新老库各选一个分块,然后聚合列数据计算并对比其特征值实现。3.2 代码改造引入了新库,那自然就需要在项目里新建数据源,并创建表对应的 Mybatis Mapper 类。这里有一个小细节需要注意,Mybatis 默认的 BeanNameGenerator 是AnnotationBeanNameGenerator,它会使用类名作为 BeanName 注册到 Spring 的 ioc 容器中,Spring 启动时如果发现有了两个重名 Bean 就会启动失败,笔者这里给 Mybatis 设置了一个新的 BeanNameGenerator ,使用类的全路径名作为 BeanName 解决了问题。public class FullPathBeanNameGenerator implements BeanNameGenerator { @Override public String generateBeanName(BeanDefinition definition, BeanDefinitionRegistry registry) { return definition.getBeanClassName(); }}(左右滑动查看更多)还有一点是主键 id,本次预约迁移需要保证新老库主键 id 一致,预约业务没做分库分表,id 都是直接用 MySQL 的自增 id,没有用 id 生成器之类的中间件。因此插入新表时只需要使用插入老表后 Mybatis 自动设置好的 id 即可,这次迁移前先检查了一遍业务代码,确保插入语句都用了 Mybatis 的 useGeneratedKeys 功能来自动设置 id。3.3 插件实现Mybatis 插件可以拦截 SQL 语句执行过程中的某一点进行干预和处理,而 Executor 是 Mybatis 中负责执行 SQL 语句的核心组件。我们可以对 Executor 的 update 和 query 方法进行代理以实现迁移所需的功能。插件需要为读写场景分别实现以下功能:考虑到开关切换部分的代码逻辑较为简单,因此在下文中,笔者将不再过多介绍该部分的具体实现,而是着重介绍如何在插件中使用老库的执行语句来访问新的数据库。此外,代码里会涉及到 Mybatis 相关的一些概念,由于网上已经有较多详尽的资料,这里就不再赘述。迁移插件代理了 Executor 的 query 和 update 方法,首先在插件里获取到当前执行的 SQL 语句所在的 Mapper 路径。@Intercepts( { @Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}), @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}), @Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}), })public class AppointMigrateInterceptor implements Interceptor { @Override public Object intercept(Invocation invocation) throws Throwable { Object[] args = invocation.getArgs(); // Mybatis插件代理的Executor的update或者query方法,第一个参数就是MappedStatement MappedStatement ms = (MappedStatement) args[0]; SqlCommandType sqlCommandType = ms.getSqlCommandType(); String id = ms.getId(); // 从MappedStatement id中获取对应的Mapper接口文件全路径 String sourceMapper = id.substring(0, id.lastIndexOf(".")); // ... } // ...}(左右滑动查看更多)得到老库 Mapper 路径后,将其转换为新库 Mapper 路径,再使用 Class.forName 获取到新库 Mapper 类,然后用新库的 sqlSessionFactory 开启 sqlSession,再获取反射调用所需的方法、对象、参数,在新库上执行语句。protected Object invoke(Invocation invocation, TableConfiguration tableConfiguration) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { // 获取 MappedStatement MappedStatement ms = (MappedStatement) invocation.getArgs()[0]; // 获取 Mybatis 封装好的入参,封装函数 MapperMethod.convertArgsToSqlCommandParam(Object[] args) Object parameter = invocation.getArgs()[1]; // 使用 Class.forName 获取到的新库 Mapper Class targetMapperClass = tableConfiguration.getTargetMapperClazz(); // 使用新库的 sqlSessionFactory 创建 sqlSession SqlSession sqlSession = sqlSessionFactory.openSession(); Object result = null; try{ // 使用新库的 Mapper 路径获取对应的 MapperProxy 对象 Object mapper = sqlSession.getMapper(targetMapperClass); // 将 Mybatis 封装好的参数转换为原始参数 Object[] paramValues = getParamValue(parameter); // 使用 mappedStatement Id 从新库对应的 Mapper 里获取对应的方法 Method method = getMethod(ms.getId(), targetMapperClass, paramValues); paramValues = fixNullParam(method, paramValues); // 反射调用新库 Mapper 的方法,本质上执行的是 MapperProxy.invoke result = method.invoke(mapper, paramValues); } finally { sqlSession.close(); } return result;} private Object[] fixNullParam(Method method, Object[] paramValues) { if (method.getParameterTypes().length > 0 & paramValues.length == 0) { return new Object[]{null}; } return paramValues;}(左右滑动查看更多)上述代码里,getMethod 方法负责从新库 Mapper 类里找到对应的方法,以用于后续的反射调用。private Method getMethod(String id, Class mapperClass) throws NoSuchMethodException { //获取参数对应的 class String methodName = id.substring(id.lastIndexOf(".") + 1); String key = id; // methodCache 用来缓存 MappedStatement 和对应的 Method,避免每次都从 Mapper 里查找 Method method = methodCache.get(key); if (method == null){ method = findMethodByMethodSignature(mapperClass, methodName); if (method == null){ throw new NoSuchMethodException("No such method " + methodName + " in class " + mapperClass.getName()); } methodCache.put(key,method); } return method;} private Method findMethodByMethodSignature(Class mapperClass,String methodName) throws NoSuchMethodException { // mybatis 的 Mapper 内的方法不支持重载,所以这里只要方法名匹配到了就行,不用进行参数的匹配 Method method = null; for (Method m : mapperClass.getMethods()) { if (m.getName().equals(methodName)) { method = m; break; } } return method;}(左右滑动查看更多)得到方法后,还需要得到反射调用所需的参数。Mybatis 执行到 Executor.update/query 方法时,参数已经经过 MapperMethod.convertArgsToSqlCommandParam(Object[] args) 方法封装,不能直接用来执行 MapperProxy.invoke ,需要转换后才可用。下图是MapperMethod.convertArgsToSqlCommandParam(Object[] args) 的封装过程,而下面的 getParamValue 是这个函数的逆过程。private Object[] getParamValue(Object parameter) { List paramValues = new ArrayList(); if (parameter instanceof Map) { Map paramMap = (Map) parameter; if (paramMap.containsKey("collection")) { paramValues.add(paramMap.get("collection")); } else if (paramMap.containsKey("array")) { paramValues.add(paramMap.get("array")); } else { int count = 1; while (count
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2025-1-4 05:27 , Processed in 0.509206 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2025 Discuz! Team.

快速回复 返回顶部 返回列表