分布式系统设计
RuoYi-Plus-UniApp 全栈框架分布式架构设计方案
本文档详细介绍项目的分布式系统设计,包括数据源切换、分布式锁、消息队列、任务调度等分布式特性的实现方案与最佳实践。
架构概览
分布式架构设计
RuoYi-Plus-UniApp采用单体架构 + 分布式组件的混合设计模式,在保持单体应用简洁性的同时,通过引入分布式组件来解决高并发、高可用等问题。
┌─────────────────────────────────────────────────────────────────┐
│ RuoYi-Plus-UniApp 应用 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web 应用层 │ │ Mobile 应用层 │ │ Admin 应用层 │ │
│ │ (Nginx) │ │ (UniApp) │ │ (Vue 3) │ │
│ └──────┬───────┘ └──────┬────────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┴───────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────┐ │
│ │ Spring Boot 应用服务层 │ │
│ │ ┌────────────────────────────────┐ │ │
│ │ │ 业务模块 (ruoyi-modules) │ │ │
│ │ ├────────────────────────────────┤ │ │
│ │ │ 通用模块 (ruoyi-common-*) │ │ │
│ │ └────────────────────────────────┘ │ │
│ └──────────────────┬──────────────────┘ │
│ │ │
│ ┌───────────────────────┴───────────────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ 持久化层 │ │ 分布式组件 │ │
│ ├──────────────┤ ├──────────────┤ │
│ │ 主库(Master) │ │ Redis 集群 │ │
│ │ 从库(Slave) │ │ (缓存+锁) │ │
│ │ 读写分离 │ └──────────────┘ │
│ └──────────────┘ │ │
│ ┌────────┴────────┐ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐│
│ │ RocketMQ │ │ SnailJob ││
│ │ (消息队列) │ │ (任务调度) ││
│ └──────────────┘ └──────────────┘│
└─────────────────────────────────────────────────────────────────┘核心特性
RuoYi-Plus-UniApp的分布式设计具有以下核心特性:
- 读写分离 - 基于Dynamic-Datasource实现主从数据库自动切换
- 分布式锁 - 支持Redisson和Lock4j两种分布式锁方案
- 消息队列 - 集成RocketMQ实现异步解耦和削峰填谷
- 任务调度 - 集成SnailJob实现分布式定时任务管理
- 多数据源 - 支持MySQL、Oracle、PostgreSQL、SQLServer多数据库
- 事务管理 - 支持分布式事务和跨数据源事务
参考: ruoyi-plus-uniapp/pom.xml:1-540
读写分离
架构设计
项目使用Dynamic-Datasource实现读写分离,通过主从数据库配置和@DS注解实现数据源的动态切换。
┌─────────────────┐
│ 应用服务层 │
└────────┬────────┘
│
┌───────────▼──────────┐
│ Dynamic-Datasource │
│ (动态数据源路由) │
└────┬────────────┬────┘
│ │
写操作 ────►│ │◄──── 读操作
│ │
┌────▼────┐ ┌────▼────┐
│ Master │ │ Slave │
│ (主库) │ │ (从库) │
│ 写+读 │ │ 只读 │
└─────────┘ └─────────┘
│ │
MySQL Replication ──►配置方式
1. 数据源配置
在application.yml中配置主从数据源:
################## 数据源配置 ##################
spring:
datasource:
type: com.zaxxer.hikari.HikariDataSource
dynamic:
# 设置默认数据源(写操作)
primary: master
# 严格模式 匹配不到数据源则报错
strict: true
# 各个数据源配置
datasource:
# 主库数据源(读写)
master:
type: ${spring.datasource.type}
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://${DB_HOST:127.0.0.1}:${DB_PORT:3306}/${DB_NAME:ryplus_uni}?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true
username: ${DB_USERNAME:root}
password: ${DB_PASSWORD:root}
# 从库数据源(只读)
slave:
lazy: true # 懒加载
type: ${spring.datasource.type}
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://${DB_SLAVE_HOST:127.0.0.1}:${DB_SLAVE_PORT:3306}/${DB_SLAVE_NAME:ryplus_uni}?useUnicode=true&characterEncoding=utf8
username: ${DB_SLAVE_USERNAME:root}
password: ${DB_SLAVE_PASSWORD:root}
# 连接池配置
hikari:
maxPoolSize: 20
minIdle: 10
connectionTimeout: 30000
idleTimeout: 600000
maxLifetime: 1800000参考: ruoyi-admin/src/main/resources/application-dev.yml:28-100
2. 使用@DS注解切换数据源
在Service或Mapper方法上使用@DS注解指定数据源:
import com.baomidou.dynamic.datasource.annotation.DS;
import org.springframework.stereotype.Service;
/**
* 代码生成业务表服务实现类
*/
@Service
public class GenTableServiceImpl implements IGenTableService {
/**
* 分页查询数据库表列表
* 使用动态数据源切换到指定数据源
*
* @param genTable 查询条件
* @param pageQuery 分页参数
* @return 分页结果
*/
@DS("#genTable.dataName") // 动态切换到指定数据源
@Override
public PageResult<GenTable> pageDbTables(GenTable genTable, PageQuery pageQuery) {
// 查询数据库表元数据
LinkedHashMap<String, Table<?>> tablesMap = ServiceProxy.metadata().tables();
// 处理查询结果...
return PageResult.of(page);
}
/**
* 根据表名查询列信息
*
* @param tableName 表名称
* @param dataName 数据源名称
* @return 列信息列表
*/
@DS("#dataName") // SpEL表达式,支持参数动态切换
@Override
public List<GenTableColumn> listDbTableColumnsByName(String tableName, String dataName) {
Table<?> table = ServiceProxy.metadata().table(tableName);
// 处理列信息...
return tableColumns;
}
}参考: ruoyi-modules/ruoyi-generator/src/main/java/plus/ruoyi/generator/service/GenTableServiceImpl.java:124-362
@DS注解详解
1. 支持的注解位置
// 1. 类级别 - 整个类的方法都使用该数据源
@DS("slave")
public class UserServiceImpl {
// 所有方法默认使用slave数据源
}
// 2. 方法级别 - 方法级别优先级高于类级别
@DS("slave")
public class UserServiceImpl {
@DS("master") // 该方法使用master数据源
public void saveUser(User user) {
// 写操作使用主库
}
public List<User> listUsers() {
// 读操作使用从库(类级别的slave)
}
}2. SpEL表达式支持
// 1. 参数值切换
@DS("#dataName")
public void query(String dataName) {
// 根据参数值动态切换数据源
}
// 2. 对象属性切换
@DS("#user.tenantId")
public void saveUser(User user) {
// 根据对象属性动态切换
}
// 3. 方法返回值切换
@DS("#session.getAttribute('tenant')")
public void process() {
// 根据会话属性切换
}3. 数据源标识
内置数据源标识:
master- 主库(默认)slave- 从库- 其他自定义标识(如
oracle、postgres等)
分布式事务支持
1. @DSTransactional注解
Dynamic-Datasource提供了@DSTransactional注解来支持跨数据源事务:
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
/**
* 导入数据库表结构
* 跨数据源事务支持
*/
@DSTransactional // 支持跨数据源事务
@Override
public void importGenTables(List<GenTable> tableList, String dataName) {
try {
for (GenTable table : tableList) {
// 初始化表信息
GenUtils.initTable(table);
table.setDataName(dataName);
// 保存表信息(可能在不同数据源)
boolean result = genTableDao.insert(table);
if (result) {
// 查询并保存列信息(使用@DS切换数据源)
List<GenTableColumn> columns =
SpringUtils.getAopProxy(this)
.listDbTableColumnsByName(tableName, dataName);
genTableColumnDao.batchSave(columns);
}
}
} catch (Exception e) {
throw ServiceException.of("导入失败:" + e.getMessage());
}
}参考: ruoyi-modules/ruoyi-generator/src/main/java/plus/ruoyi/generator/service/GenTableServiceImpl.java:289-323
2. 同步数据库事务
/**
* 同步数据库表结构
* 支持跨数据源事务
*/
@DSTransactional
@Override
public void syncDatabase(Long tableId) {
GenTable table = genTableDao.getGenTableById(tableId);
// 从数据库重新获取列信息(可能跨数据源)
List<GenTableColumn> dbTableColumns =
SpringUtils.getAopProxy(this)
.listDbTableColumnsByName(table.getTableName(), table.getDataName());
// 批量保存更新后的列信息
if (CollUtil.isNotEmpty(saveColumns)) {
genTableColumnDao.batchSave(saveColumns);
}
// 删除数据库中已不存在的列
if (CollUtil.isNotEmpty(delColumns)) {
List<Long> ids = StreamUtils.toList(delColumns, GenTableColumn::getColumnId);
genTableColumnDao.deleteByIds(ids);
}
}参考: ruoyi-modules/ruoyi-generator/src/main/java/plus/ruoyi/generator/service/GenTableServiceImpl.java:710-786
读写分离最佳实践
1. 主库写,从库读
public class UserServiceImpl {
/**
* 保存用户 - 使用主库
*/
@DS("master") // 明确指定主库
public void saveUser(User user) {
userMapper.insert(user);
}
/**
* 查询用户列表 - 使用从库
*/
@DS("slave") // 使用从库
public List<User> listUsers() {
return userMapper.selectList(null);
}
}2. 主从延迟处理
/**
* 处理主从延迟问题
* 插入后立即查询使用主库
*/
public Long saveAndGetId(User user) {
// 1. 插入使用主库
userMapper.insert(user);
Long userId = user.getUserId();
// 2. 立即查询也使用主库(避免主从延迟)
User result = userMapper.selectById(userId);
return result.getUserId();
}3. 强制使用主库
某些场景下需要强制使用主库读取最新数据:
/**
* 强制从主库读取
* 场景: 支付后立即查询订单状态
*/
@DS("master") // 强制主库
public Order getOrderById(Long orderId) {
return orderMapper.selectById(orderId);
}分布式锁
架构设计
项目支持两种分布式锁实现方案:
- Redisson分布式锁 - 基于Redis实现,支持可重入、公平锁、读写锁等
- Lock4j注解锁 - 基于AOP实现,支持注解声明式锁
┌─────────────────┐
│ 应用服务层 │
└────────┬────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Redisson │ │ Lock4j │ │手动锁 │
│ 锁API │ │ @Lock4j │ │ RedLock │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└───────────────┴───────────────┘
│
┌───────▼───────┐
│ Redis 集群 │
│ (锁存储) │
└───────────────┘Redisson分布式锁
1. 基本用法
import org.redisson.api.RLock;
import plus.ruoyi.common.redis.utils.RedisUtils;
/**
* Redisson分布式锁基本使用
*/
public class OrderService {
/**
* 使用分布式锁防止重复下单
*/
public void createOrder(Long userId, Long productId) {
// 构建锁的key
String lockKey = "order:create:" + userId + ":" + productId;
// 获取锁对象
RLock lock = RedisUtils.getLock(lockKey);
try {
// 尝试获取锁,最多等待10秒,锁自动释放时间30秒
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (acquired) {
try {
// 执行业务逻辑
// 1. 检查库存
// 2. 创建订单
// 3. 扣减库存
processOrder(userId, productId);
} finally {
// 释放锁
lock.unlock();
}
} else {
throw new ServiceException("系统繁忙,请稍后再试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ServiceException("获取锁失败");
}
}
}2. RedisUtils工具类
项目封装了RedisUtils工具类简化锁的使用:
package plus.ruoyi.common.redis.utils;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
/**
* Redis工具类
*/
public class RedisUtils {
private static RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
/**
* 获取可重入锁
*
* @param key 锁的key
* @return RLock对象
*/
public static RLock getLock(String key) {
return CLIENT.getLock(key);
}
/**
* 获取公平锁
*
* @param key 锁的key
* @return RLock对象
*/
public static RLock getFairLock(String key) {
return CLIENT.getFairLock(key);
}
/**
* 获取读锁
*
* @param key 锁的key
* @return RLock对象
*/
public static RLock getReadLock(String key) {
return CLIENT.getReadWriteLock(key).readLock();
}
/**
* 获取写锁
*
* @param key 锁的key
* @return RLock对象
*/
public static RLock getWriteLock(String key) {
return CLIENT.getReadWriteLock(key).writeLock();
}
}参考: ruoyi-common/ruoyi-common-redis/src/main/java/plus/ruoyi/common/redis/utils/RedisUtils.java:618-627
3. 可重入锁示例
/**
* 可重入锁示例
* 同一线程可以多次获取同一把锁
*/
public class InventoryService {
public void updateInventory(Long productId, Integer quantity) {
String lockKey = "inventory:" + productId;
RLock lock = RedisUtils.getLock(lockKey);
try {
lock.lock(30, TimeUnit.SECONDS);
// 业务逻辑1
deductInventory(productId, quantity);
// 可重入: 同一线程再次获取锁
updateInventoryLog(productId, quantity);
} finally {
lock.unlock();
}
}
private void updateInventoryLog(Long productId, Integer quantity) {
String lockKey = "inventory:" + productId;
RLock lock = RedisUtils.getLock(lockKey);
try {
// 可重入: 再次获取同一把锁
lock.lock(10, TimeUnit.SECONDS);
// 记录日志
inventoryLogMapper.insert(new InventoryLog(productId, quantity));
} finally {
lock.unlock();
}
}
}4. 公平锁示例
/**
* 公平锁示例
* 按照请求的顺序获取锁
*/
public class TicketService {
/**
* 抢购火车票(按先后顺序)
*/
public void bookTicket(Long userId, Long trainId) {
String lockKey = "ticket:book:" + trainId;
RLock lock = RedisUtils.getFairLock(lockKey); // 公平锁
try {
// 公平锁会按照请求的顺序依次获取
lock.lock(30, TimeUnit.SECONDS);
// 1. 检查余票
// 2. 创建订单
// 3. 扣减余票
processBooking(userId, trainId);
} finally {
lock.unlock();
}
}
}5. 读写锁示例
/**
* 读写锁示例
* 读锁共享,写锁独占
*/
public class ConfigService {
/**
* 读取配置(共享锁)
*/
public String getConfig(String key) {
String lockKey = "config:" + key;
RLock readLock = RedisUtils.getReadLock(lockKey);
try {
readLock.lock(10, TimeUnit.SECONDS);
// 多个线程可以同时持有读锁
return configMapper.selectByKey(key);
} finally {
readLock.unlock();
}
}
/**
* 更新配置(独占锁)
*/
public void updateConfig(String key, String value) {
String lockKey = "config:" + key;
RLock writeLock = RedisUtils.getWriteLock(lockKey);
try {
writeLock.lock(30, TimeUnit.SECONDS);
// 写锁是独占的,其他读写都会阻塞
configMapper.updateByKey(key, value);
// 清除缓存
RedisUtils.deleteObject("cache:config:" + key);
} finally {
writeLock.unlock();
}
}
}Lock4j注解锁
1. @Lock4j注解基本用法
Lock4j提供了声明式的分布式锁,通过注解即可使用:
import com.baomidou.lock.annotation.Lock4j;
/**
* 登录服务
*/
@Service
public class SysLoginService {
/**
* 绑定第三方用户
* 使用Lock4j防止并发绑定
*/
@Lock4j // 自动加锁,方法执行完自动释放
public void bindSocialAccount(AuthUser authUserData) {
// 构建第三方用户唯一标识
String authId = authUserData.getSource() + authUserData.getUuid();
// 转换第三方用户信息
SysSocialBo bo = BeanUtil.toBean(authUserData, SysSocialBo.class);
BeanUtil.copyProperties(authUserData.getToken(), bo);
// 设置关联信息
Long userId = LoginHelper.getUserId();
bo.setUserId(userId);
bo.setAuthId(authId);
// 检查该第三方账号是否已被其他用户绑定
List<SysSocialVo> checkList = sysSocialService.listSocialsByAuthId(authId);
if (CollUtil.isNotEmpty(checkList)) {
throw ServiceException.of("此三方账号已经被绑定!");
}
// 查询当前用户是否已绑定同平台的其他账号
SysSocialBo params = new SysSocialBo();
params.setUserId(userId);
params.setSource(bo.getSource());
List<SysSocialVo> list = sysSocialService.list(params);
if (CollUtil.isEmpty(list)) {
// 新增绑定
sysSocialService.add(bo);
} else {
// 更新绑定
bo.setId(list.get(0).getId());
sysSocialService.update(bo);
}
}
}参考: ruoyi-modules/ruoyi-system/src/main/java/plus/ruoyi/system/auth/service/SysLoginService.java:86-126
2. 自定义锁key
/**
* 使用SpEL表达式自定义锁key
*/
@Service
public class OrderService {
/**
* 创建订单
* key支持SpEL表达式
*/
@Lock4j(keys = {"#userId", "#productId"},
expire = 30000, // 锁过期时间30秒
acquireTimeout = 10000) // 获取锁超时时间10秒
public void createOrder(Long userId, Long productId, Integer quantity) {
// 锁的key为: order:userId:productId
// 业务逻辑...
}
/**
* 支付订单
* 使用订单号作为锁key
*/
@Lock4j(keys = "#orderNo")
public void payOrder(String orderNo) {
// 锁的key为: order:orderNo
// 业务逻辑...
}
}3. Lock4j注解参数详解
@Lock4j(
// 锁的名称(支持SpEL表达式)
name = "custom-lock",
// 锁的key(支持SpEL表达式)
keys = {"#userId", "#orderId"},
// 锁过期时间(毫秒)
expire = 30000,
// 获取锁超时时间(毫秒)
acquireTimeout = 10000,
// 锁类型(可重入锁、公平锁、读锁、写锁)
lockType = Lock4jType.REENTRANT
)分布式锁最佳实践
1. 锁粒度要细
// ❌ 错误: 锁粒度太粗
@Lock4j
public void updateUserInfo(User user) {
// 更新用户信息
}
// ✅ 正确: 按用户ID加锁
@Lock4j(keys = "#user.userId")
public void updateUserInfo(User user) {
// 只锁定当前用户
}2. 设置合理的超时时间
// ❌ 错误: 没有设置超时,可能永久等待
RLock lock = RedisUtils.getLock(lockKey);
lock.lock(); // 永久等待
// ✅ 正确: 设置超时时间
RLock lock = RedisUtils.getLock(lockKey);
boolean acquired = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (acquired) {
try {
// 业务逻辑
} finally {
lock.unlock();
}
}3. 避免死锁
// ❌ 错误: 可能导致死锁
public void transfer(Long fromId, Long toId, BigDecimal amount) {
RLock lock1 = RedisUtils.getLock("account:" + fromId);
RLock lock2 = RedisUtils.getLock("account:" + toId);
lock1.lock();
lock2.lock(); // 可能导致死锁
// 业务逻辑
}
// ✅ 正确: 按ID顺序加锁,避免死锁
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// 按ID大小排序,保证加锁顺序一致
Long first = fromId < toId ? fromId : toId;
Long second = fromId < toId ? toId : fromId;
RLock lock1 = RedisUtils.getLock("account:" + first);
RLock lock2 = RedisUtils.getLock("account:" + second);
try {
lock1.lock(30, TimeUnit.SECONDS);
lock2.lock(30, TimeUnit.SECONDS);
// 业务逻辑
} finally {
lock2.unlock();
lock1.unlock();
}
}4. 锁续期
/**
* 长时间任务使用看门狗机制
*/
public void processLongTask(Long taskId) {
String lockKey = "task:" + taskId;
RLock lock = RedisUtils.getLock(lockKey);
try {
// 不指定leaseTime,使用看门狗机制自动续期
lock.lock();
// 长时间任务(看门狗会自动续期)
processTask(taskId);
} finally {
lock.unlock();
}
}消息队列
架构设计
项目集成了RocketMQ消息队列,用于实现异步处理、系统解耦和削峰填谷。
┌─────────────────────────────────────────────────────────────┐
│ RocketMQ 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Producer │───── 发送消息 ─────────►│ NameServer │ │
│ │ (生产者) │ │ (路由中心) │ │
│ └──────────────┘ └───────┬──────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Broker │◄────── 拉取路由 ───────│ Consumer │ │
│ │ (消息存储) │ │ (消费者) │ │
│ │ │ │ │ │
│ │ ┌──────────┐ │ │ ┌──────────┐ │ │
│ │ │ Topic1 │ │ │ │ Listener │ │ │
│ │ ├──────────┤ │ │ └──────────┘ │ │
│ │ │ Topic2 │ │ └──────────────┘ │
│ │ └──────────┘ │ │
│ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘配置方式
1. RocketMQ配置
在application.yml中配置RocketMQ:
################## 消息队列配置 ##################
--- # rocketmq 配置
rocketmq:
# 是否启用 RocketMQ
enabled: ${ROCKETMQ_ENABLED:false}
# NameServer 地址(多个用分号分隔)
name-server: ${ROCKETMQ_NAME_SERVER:127.0.0.1:9876}
# 集群名称
cluster-name: ${ROCKETMQ_CLUSTER_NAME:RuoYiCluster}
# Broker 地址
broker-addr: ${ROCKETMQ_BROKER_ADDR:127.0.0.1:10911}
# 生产者配置
producer:
# 生产者组名
group: ${ROCKETMQ_PRODUCER_GROUP:ruoyi-producer-group-dev}
# 发送消息超时时间(毫秒)
send-message-timeout: ${ROCKETMQ_PRODUCER_MESSAGE_TIMEOUT:3000}
# 消息最大大小(4MB)
max-message-size: ${ROCKETMQ_PRODUCER_MESSAGE_SIZE:4194304}
# 发送失败重试次数
retry-times-when-send-failed: ${ROCKETMQ_PRODUCER_RETRY_FAILED:2}
# 异步发送失败重试次数
retry-times-when-send-async-failed: ${ROCKETMQ_PRODUCER_RETRY_ASYNC_FAILED:2}
# 压缩消息阈值(超过 4KB 自动压缩)
compress-message-body-threshold: ${ROCKETMQ_PRODUCER_MESSAGE_BODY_THRESHOLD:4096}
# 是否自动创建 Topic
auto-create-topic: ${ROCKETMQ_PRODUCER_AUTO_CREATE_TOPIC:true}
# 批量发送消息的最大数量
batch-size: ${ROCKETMQ_PRODUCER_BATCH_SIZE:100}
# 是否启用消息发送日志
enable-log: ${ROCKETMQ_PRODUCER_ENABLE_LOG:true}
# 消费者配置
consumer:
# 消费线程池最小线程数
consume-thread-min: 20
# 消费线程池最大线程数
consume-thread-max: 64
# 消息拉取批次大小
pull-batch-size: 32参考: ruoyi-admin/src/main/resources/application-dev.yml:164-200
2. 自动配置类
项目封装了RocketMQ自动配置类:
package plus.ruoyi.common.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import plus.ruoyi.common.rocketmq.util.RMDiagnosticUtil;
import plus.ruoyi.common.rocketmq.util.RMSendUtil;
import plus.ruoyi.common.rocketmq.util.RMTopicUtil;
/**
* RocketMQ 自动配置类
* <p>
* 集成 RocketMQ 消息队列框架,提供以下功能:
* <ul>
* <li>自动配置生产者和消费者</li>
* <li>提供 RocketMQTemplate 工具类</li>
* <li>支持注解式消费者 @RocketMQMessageListener</li>
* <li>自动初始化静态工具类(RMSendUtil、RMTopicUtil、RMDiagnosticUtil)</li>
* </ul>
* <p>
* 配置启用条件:需要在配置文件中设置 rocketmq.enabled=true
*
* @author 路北
* @date 2025-11-02
*/
@Slf4j
@AutoConfiguration
@ConditionalOnProperty(prefix = "rocketmq", name = "enabled", havingValue = "true")
@EnableConfigurationProperties(RocketMQProperties.class)
public class RocketMQAutoConfiguration {
private final RocketMQProperties properties;
/**
* 构造函数:打印 RocketMQ 配置信息
*/
public RocketMQAutoConfiguration(RocketMQProperties properties) {
this.properties = properties;
log.info("========================================");
log.info("🚀 RocketMQ 模块开始初始化");
log.info(" - NameServer: {}", properties.getNameServer());
log.info(" - 集群名称: {}", properties.getClusterName());
log.info(" - Broker地址: {}", properties.getBrokerAddr());
log.info(" - 生产者组: {}", properties.getProducer().getGroup());
log.info(" - 发送超时: {}ms", properties.getProducer().getSendMsgTimeout());
log.info(" - 自动创建Topic: {}", properties.getProducer().getAutoCreateTopic());
log.info(" - 消费线程: {}-{}",
properties.getConsumer().getConsumeThreadMin(),
properties.getConsumer().getConsumeThreadMax());
log.info("========================================");
}
/**
* 初始化 RMSendUtil 工具类
*/
@Bean
public Object rmSendUtilInitializer(RocketMQTemplate rocketMQTemplate) {
RMSendUtil.init(rocketMQTemplate, properties);
return new Object();
}
/**
* 初始化 RMTopicUtil 工具类
*/
@Bean
public Object rmTopicUtilInitializer() {
RMTopicUtil.init(properties);
return new Object();
}
/**
* 初始化 RMDiagnosticUtil 工具类
*/
@Bean
public Object rmDiagnosticUtilInitializer() {
RMDiagnosticUtil.init(properties);
return new Object();
}
/**
* RocketMQ 启动完成日志 + 自动诊断
*/
@Bean
public Object rocketMQStartupLogger() {
log.info("========================================");
log.info("✅ RocketMQ 客户端启动完成!");
log.info("========================================");
// 自动诊断 Broker 连接状态
try {
RMDiagnosticUtil.diagnose();
} catch (Exception e) {
log.warn("⚠️ Broker 状态检查失败: {}", e.getMessage());
}
return new Object();
}
}参考: ruoyi-common/ruoyi-common-rocketmq/src/main/java/plus/ruoyi/common/rocketmq/config/RocketMQAutoConfiguration.java:1-112
生产者
1. RMSendUtil工具类
项目封装了RMSendUtil工具类简化消息发送:
package plus.ruoyi.common.rocketmq.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import plus.ruoyi.common.rocketmq.enums.DelayLevel;
/**
* RocketMQ 消息生产者工具类
* <p>
* 提供简化的消息发送 API,封装 RocketMQTemplate 的常用操作
* </p>
*
* @author 路北
* @date 2025-11-03
*/
@Slf4j
public class RMSendUtil {
private static RocketMQTemplate rocketMQTemplate;
private static RocketMQProperties properties;
/**
* 初始化工具类
*/
public static void init(RocketMQTemplate template, RocketMQProperties props) {
rocketMQTemplate = template;
properties = props;
log.debug("RMSendUtil 初始化完成");
}
// ==================== 同步发送 ====================
/**
* 同步发送消息
* <p>最简单的发送方式,阻塞等待响应,保证可靠性</p>
*
* @param topic Topic 名称
* @param message 消息对象
* @return 发送结果
*/
public static SendResult send(String topic, Object message) {
return send(topic, message, properties.getProducer().getSendMsgTimeout());
}
/**
* 同步发送消息(自定义超时时间)
*
* @param topic Topic 名称
* @param message 消息对象
* @param timeout 超时时间(毫秒)
* @return 发送结果
*/
public static SendResult send(String topic, Object message, int timeout) {
checkTemplateAvailable();
// 自动创建 Topic(如果启用)
if (properties.getProducer().getAutoCreateTopic()) {
autoCreateTopic(topic);
}
return doSyncSend(() -> rocketMQTemplate.syncSend(topic, message, timeout),
topic, "同步发送");
}
// ==================== 异步发送 ====================
/**
* 异步发送消息(完整回调)
* <p>不阻塞主线程,通过回调通知发送结果</p>
*
* @param topic Topic 名称
* @param message 消息对象
* @param callback 回调接口
*/
public static void sendAsync(String topic, Object message, SendCallback callback) {
sendAsync(topic, message, callback, properties.getProducer().getSendMsgTimeout());
}
/**
* 异步发送消息(简化回调,仅处理成功)
*
* @param topic Topic 名称
* @param message 消息对象
* @param successCallback 成功回调
*/
public static void sendAsync(String topic, Object message,
Consumer<SendResult> successCallback) {
sendAsync(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
successCallback.accept(sendResult);
}
@Override
public void onException(Throwable e) {
log.error("❌ 异步消息发送失败: topic={}", topic, e);
}
});
}
// ==================== 单向发送 ====================
/**
* 单向发送消息(不等待响应)
* <p>性能最高但不保证可靠性,适合日志、监控等不重要消息</p>
*
* @param topic Topic 名称
* @param message 消息对象
*/
public static void sendOneWay(String topic, Object message) {
checkTemplateAvailable();
if (properties.getProducer().getAutoCreateTopic()) {
autoCreateTopic(topic);
}
if (properties.getProducer().getEnableLog()) {
log.info("📤 单向发送消息: topic={}, message={}", topic, message);
}
rocketMQTemplate.sendOneWay(topic, message);
}
// ==================== 延迟消息 ====================
/**
* 发送延迟消息
* <p>RocketMQ 支持 18 个固定延迟级别</p>
*
* @param topic Topic 名称
* @param message 消息对象
* @param delayLevel 延迟级别枚举
* @return 发送结果
*/
public static SendResult sendDelay(String topic, Object message, DelayLevel delayLevel) {
return sendDelay(topic, message, delayLevel.getLevel());
}
// ==================== 带标签消息 ====================
/**
* 发送带标签的消息
* <p>消费者可以通过标签过滤消息</p>
*
* @param topic Topic 名称
* @param tag 消息标签
* @param message 消息对象
* @return 发送结果
*/
public static SendResult sendWithTag(String topic, String tag, Object message) {
return sendWithTag(topic, tag, message,
properties.getProducer().getSendMsgTimeout());
}
// ==================== 批量发送 ====================
/**
* 批量发送消息
*
* @param topic Topic 名称
* @param messages 消息集合
*/
public static void sendBatch(String topic, Collection<?> messages) {
checkTemplateAvailable();
if (messages == null || messages.isEmpty()) {
log.warn("⚠️ 批量发送消息为空,跳过发送");
return;
}
if (properties.getProducer().getAutoCreateTopic()) {
autoCreateTopic(topic);
}
if (properties.getProducer().getEnableLog()) {
log.info("📤 批量发送消息: topic={}, count={}", topic, messages.size());
}
// 分批发送
int batchSize = properties.getProducer().getBatchSize();
int count = 0;
for (Object message : messages) {
rocketMQTemplate.sendOneWay(topic, message);
count++;
if (count % batchSize == 0 && properties.getProducer().getEnableLog()) {
log.info(" 已发送 {} / {} 条消息", count, messages.size());
}
}
if (properties.getProducer().getEnableLog()) {
log.info("✅ 批量发送完成: 共发送 {} 条消息", messages.size());
}
}
// ==================== 事务消息 ====================
/**
* 发送事务消息
* <p>事务消息需要配合 @RocketMQTransactionListener 使用</p>
*
* @param topic Topic 名称
* @param message 消息对象
*/
public static void sendTransaction(String topic, Object message) {
String transactionId = String.valueOf(System.currentTimeMillis());
sendTransaction(topic, message, transactionId, null);
}
// ==================== 私有辅助方法 ====================
private static void checkTemplateAvailable() {
if (rocketMQTemplate == null) {
throw new IllegalStateException(
"RocketMQTemplate 未初始化,请检查:\n" +
"1. rocketmq.enabled 是否设置为 true\n" +
"2. rocketmq-spring-boot-starter 依赖是否已引入\n" +
"3. Spring 容器是否已启动"
);
}
}
private static void autoCreateTopic(String topic) {
try {
String topicName = topic.contains(":") ? topic.split(":")[0] : topic;
RMTopicUtil.createTopic(topicName);
} catch (Exception e) {
log.warn("⚠️ 自动创建Topic失败: {}, 错误: {}", topic, e.getMessage());
}
}
}参考: ruoyi-common/ruoyi-common-rocketmq/src/main/java/plus/ruoyi/common/rocketmq/util/RMSendUtil.java:1-544
2. 同步发送示例
/**
* 同步发送消息
* 适用于重要消息,需要确认发送结果
*/
public class OrderService {
/**
* 创建订单后发送消息
*/
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 同步发送消息
SendResult result = RMSendUtil.send("order-topic", order);
// 3. 检查发送结果
if (result.getSendStatus() == SendStatus.SEND_OK) {
log.info("订单消息发送成功: msgId={}", result.getMsgId());
} else {
log.error("订单消息发送失败");
// 处理发送失败情况
}
}
}3. 异步发送示例
/**
* 异步发送消息
* 适用于不需要立即确认结果的场景
*/
public class NotificationService {
/**
* 发送通知消息
*/
public void sendNotification(Notification notification) {
// 异步发送,不阻塞主流程
RMSendUtil.sendAsync("notification-topic", notification, result -> {
// 成功回调
log.info("通知消息发送成功: msgId={}", result.getMsgId());
});
// 立即返回,不等待发送结果
}
}4. 延迟消息示例
/**
* 延迟消息示例
* 用于订单超时取消等场景
*/
public class OrderTimeoutService {
/**
* 发送订单超时检查消息
* 30分钟后检查订单是否支付
*/
public void scheduleOrderTimeout(Long orderId) {
OrderTimeoutMessage message = new OrderTimeoutMessage();
message.setOrderId(orderId);
message.setCreateTime(System.currentTimeMillis());
// 发送延迟消息: 30分钟后消费
SendResult result = RMSendUtil.sendDelay(
"order-timeout-topic",
message,
DelayLevel.THIRTY_MINUTES // 延迟级别
);
log.info("订单超时检查消息已发送: orderId={}, msgId={}",
orderId, result.getMsgId());
}
}5. 带标签消息示例
/**
* 带标签消息
* 消费者可以根据标签过滤消息
*/
public class UserService {
/**
* 发送用户注册消息
*/
public void registerUser(User user) {
// 根据用户类型设置标签
String tag = user.getUserType(); // VIP, NORMAL等
// 发送带标签的消息
RMSendUtil.sendWithTag("user-register-topic", tag, user);
}
}消费者
1. @RocketMQMessageListener注解
使用@RocketMQMessageListener注解创建消费者:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 订单消息消费者
*/
@Component
@RocketMQMessageListener(
topic = "order-topic", // Topic名称
consumerGroup = "order-consumer", // 消费者组
selectorExpression = "*" // Tag过滤表达式, *表示接收所有
)
public class OrderMessageListener implements RocketMQListener<Order> {
/**
* 消费消息
*
* @param order 订单对象
*/
@Override
public void onMessage(Order order) {
try {
log.info("收到订单消息: orderId={}", order.getOrderId());
// 处理业务逻辑
// 1. 更新库存
// 2. 发送通知
// 3. 记录日志
processOrder(order);
log.info("订单消息处理成功: orderId={}", order.getOrderId());
} catch (Exception e) {
log.error("订单消息处理失败: orderId={}", order.getOrderId(), e);
// 抛出异常会触发重试
throw e;
}
}
private void processOrder(Order order) {
// 业务处理逻辑
}
}2. Tag过滤消费
/**
* VIP用户注册消息消费者
* 只消费tag=VIP的消息
*/
@Component
@RocketMQMessageListener(
topic = "user-register-topic",
consumerGroup = "vip-user-consumer",
selectorExpression = "VIP" // 只消费VIP标签的消息
)
public class VipUserListener implements RocketMQListener<User> {
@Override
public void onMessage(User user) {
log.info("VIP用户注册: userId={}", user.getUserId());
// VIP用户专属处理逻辑
}
}
/**
* 普通用户注册消息消费者
* 只消费tag=NORMAL的消息
*/
@Component
@RocketMQMessageListener(
topic = "user-register-topic",
consumerGroup = "normal-user-consumer",
selectorExpression = "NORMAL" // 只消费NORMAL标签的消息
)
public class NormalUserListener implements RocketMQListener<User> {
@Override
public void onMessage(User user) {
log.info("普通用户注册: userId={}", user.getUserId());
// 普通用户处理逻辑
}
}3. 并发消费
/**
* 并发消费示例
*/
@Component
@RocketMQMessageListener(
topic = "log-topic",
consumerGroup = "log-consumer",
consumeMode = ConsumeMode.CONCURRENTLY, // 并发消费模式
consumeThreadMax = 64 // 最大消费线程数
)
public class LogMessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String logMessage) {
// 多线程并发消费,提升吞吐量
processLog(logMessage);
}
}4. 顺序消费
/**
* 顺序消费示例
* 保证同一订单的消息按顺序消费
*/
@Component
@RocketMQMessageListener(
topic = "order-status-topic",
consumerGroup = "order-status-consumer",
consumeMode = ConsumeMode.ORDERLY // 顺序消费模式
)
public class OrderStatusListener implements RocketMQListener<OrderStatus> {
@Override
public void onMessage(OrderStatus status) {
// 同一订单的状态变更消息会按顺序消费
// 创建 -> 支付 -> 发货 -> 完成
updateOrderStatus(status);
}
}消息队列最佳实践
1. 消息幂等性
/**
* 保证消息幂等性
* 防止重复消费
*/
@Component
@RocketMQMessageListener(
topic = "payment-topic",
consumerGroup = "payment-consumer"
)
public class PaymentListener implements RocketMQListener<PaymentMessage> {
@Override
public void onMessage(PaymentMessage message) {
String messageId = message.getMessageId();
// 1. 检查消息是否已经处理过
if (RedisUtils.hasKey("payment:processed:" + messageId)) {
log.warn("消息已处理,跳过: messageId={}", messageId);
return;
}
try {
// 2. 处理支付逻辑
processPayment(message);
// 3. 标记消息已处理(设置过期时间24小时)
RedisUtils.setCacheObject(
"payment:processed:" + messageId,
true,
Duration.ofHours(24)
);
} catch (Exception e) {
log.error("支付处理失败: messageId={}", messageId, e);
throw e;
}
}
}2. 消息重试策略
/**
* 消息重试策略
*/
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
maxReconsumeTimes = 3 // 最大重试次数
)
public class OrderListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
// 处理订单
processOrder(order);
} catch (BusinessException e) {
// 业务异常,不重试,记录日志
log.error("业务处理失败: orderId={}, reason={}",
order.getOrderId(), e.getMessage());
// 不抛出异常,消息不会重试
} catch (Exception e) {
// 系统异常,重试
log.error("系统异常: orderId={}", order.getOrderId(), e);
// 抛出异常,触发重试
throw e;
}
}
}3. 消息积压处理
/**
* 动态调整消费线程数处理积压
*/
@Component
@RocketMQMessageListener(
topic = "high-traffic-topic",
consumerGroup = "high-traffic-consumer",
consumeThreadMin = 20, // 最小线程数
consumeThreadMax = 64 // 最大线程数
)
public class HighTrafficListener implements RocketMQListener<Message> {
@Override
public void onMessage(Message message) {
// 快速处理,避免积压
processMessage(message);
}
}任务调度
架构设计
项目集成了SnailJob分布式任务调度框架,支持定时任务的统一管理和执行。
┌─────────────────────────────────────────────────────────────┐
│ SnailJob 任务调度架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ SnailJob 调度中心(Server) │ │
│ │ ┌────────────────────────────────────────────────┐ │ │
│ │ │ 任务管理 │ 调度管理 │ 日志收集 │ 监控告警 │ │ │
│ │ └────────────────────────────────────────────────┘ │ │
│ └──────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌──────────┼──────────┐ │
│ │ │ │ │
│ ┌────▼────┐┌────▼────┐┌───▼─────┐ │
│ │ Client1 ││ Client2 ││ Client3 │ │
│ │ (节点1) ││ (节点2) ││ (节点3) │ │
│ └─────────┘└─────────┘└─────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 任务执行流程 │ │
│ │ 1. Server: 触发任务调度 │ │
│ │ 2. Client: 接收任务,执行@Scheduled方法 │ │
│ │ 3. Client: 上报执行结果和日志 │ │
│ │ 4. Server: 记录执行日志,监控任务状态 │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘配置方式
1. SnailJob配置
在application.yml中配置SnailJob:
################## 定时任务配置 ##################
--- # snail-job 配置
snail-job:
# 是否启用定时任务
enabled: ${SNAIL_JOB_ENABLED:false}
# 需要在 SnailJob 后台组管理创建对应名称的组
group: ${app.id}
# SnailJob 接入验证令牌 详见 script/sql/ry_job.sql `sj_group_config`表
token: ${SNAIL_JOB_TOKEN:SJ_cKqBTPzCsWA3VyuCfFoccmuIEGXjr5KT}
server:
# 调度中心地址
host: ${SNAIL_JOB_HOST:127.0.0.1}
# 调度中心端口
port: ${SNAIL_JOB_PORT:17888}
# 命名空间UUID 详见 script/sql/ry_job.sql `sj_namespace`表`unique_id`字段
namespace: ${spring.profiles.active}
# 随主应用端口漂移
port: 2${server.port}
# 客户端ip指定
host: ${SNAIL_JOB_CLIENT_HOST:}
# RPC类型: netty, grpc
rpc-type: grpc参考: ruoyi-admin/src/main/resources/application-dev.yml:142-162
2. 自动配置类
package plus.ruoyi.common.job.config;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.spi.ILoggingEvent;
import com.aizuda.snailjob.client.common.appender.SnailLogbackAppender;
import com.aizuda.snailjob.client.common.event.SnailClientStartingEvent;
import com.aizuda.snailjob.client.starter.EnableSnailJob;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* SnailJob 定时任务自动配置类
* <p>
* 集成 SnailJob 分布式任务调度框架,提供以下功能:
* <ul>
* <li>启用 Spring 定时任务支持</li>
* <li>启用 SnailJob 客户端</li>
* <li>配置日志收集器,用于任务执行日志的统一管理</li>
* </ul>
* <p>
* 配置启用条件:需要在配置文件中设置 snail-job.enabled=true
*
* @author opensnail
* @date 2024-05-17
*/
@AutoConfiguration
@ConditionalOnProperty(prefix = "snail-job", name = "enabled", havingValue = "true")
@EnableScheduling // 启用Spring定时任务
@EnableSnailJob // 启用SnailJob客户端
public class JobAutoConfiguration {
/**
* SnailJob 客户端启动事件监听器
* <p>
* 在 SnailJob 客户端启动时自动配置日志收集器,
* 将应用的日志输出发送到 SnailJob 服务端进行统一管理和查看
*
* @param event SnailJob 客户端启动事件
*/
@EventListener(SnailClientStartingEvent.class)
public void onStarting(SnailClientStartingEvent event) {
// 获取 Logback 日志上下文
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
// 创建 SnailJob 专用的日志追加器
SnailLogbackAppender<ILoggingEvent> ca = new SnailLogbackAppender<>();
ca.setName("snail_log_appender");
ca.start();
// 将日志追加器添加到根日志记录器,实现日志统一收集
Logger rootLogger = lc.getLogger(Logger.ROOT_LOGGER_NAME);
rootLogger.addAppender(ca);
}
}参考: ruoyi-common/ruoyi-common-job/src/main/java/plus/ruoyi/common/job/config/JobAutoConfiguration.java:1-60
定时任务使用
1. 创建定时任务
使用Spring的@Scheduled注解创建定时任务:
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 系统数据统计任务
*/
@Component
public class DataStatisticsJob {
/**
* 每小时统计一次用户数据
* Cron表达式: 0 0 * * * ? (每小时的0分0秒执行)
*/
@Scheduled(cron = "0 0 * * * ?")
public void statisticsUserData() {
log.info("开始统计用户数据...");
try {
// 1. 统计活跃用户数
// 2. 统计新增用户数
// 3. 统计用户留存率
doStatistics();
log.info("用户数据统计完成");
} catch (Exception e) {
log.error("用户数据统计失败", e);
}
}
/**
* 每天凌晨2点清理过期数据
* Cron表达式: 0 0 2 * * ? (每天凌晨2点执行)
*/
@Scheduled(cron = "0 0 2 * * ?")
public void cleanExpiredData() {
log.info("开始清理过期数据...");
try {
// 1. 清理过期日志
// 2. 清理过期文件
// 3. 清理过期会话
cleanData();
log.info("过期数据清理完成");
} catch (Exception e) {
log.error("过期数据清理失败", e);
}
}
/**
* 每5分钟检查一次订单超时
* fixedDelay: 上次执行完成后5分钟再执行
*/
@Scheduled(fixedDelay = 300000) // 5分钟 = 300000毫秒
public void checkOrderTimeout() {
log.info("开始检查订单超时...");
try {
// 1. 查询未支付订单
// 2. 检查是否超时
// 3. 取消超时订单
checkTimeout();
log.info("订单超时检查完成");
} catch (Exception e) {
log.error("订单超时检查失败", e);
}
}
}2. Cron表达式说明
Cron表达式格式: 秒 分 时 日 月 周 [年]
常用示例:
- 每天凌晨2点: 0 0 2 * * ?
- 每小时整点: 0 0 * * * ?
- 每30分钟: 0 */30 * * * ?
- 每天中午12点: 0 0 12 * * ?
- 每周一上午9点: 0 0 9 ? * MON
- 每月1号凌晨1点: 0 0 1 1 * ?
- 每季度第一天: 0 0 0 1 1,4,7,10 ?3. SnailJob后台管理
在SnailJob后台可以:
- 任务管理: 启动/停止/暂停任务
- 执行记录: 查看任务执行历史
- 日志查看: 实时查看任务执行日志
- 告警配置: 配置任务失败告警
- 手动触发: 手动执行定时任务
分布式任务调度特性
1. 任务分片
/**
* 任务分片示例
* 将大任务拆分到多个节点并行执行
*/
@Component
public class DataProcessJob {
@Scheduled(cron = "0 0 1 * * ?")
public void processData() {
// SnailJob会自动分配分片参数
// 例如: 总分片数=10, 当前分片=0
int shardingTotal = getShardingTotal();
int shardingIndex = getShardingIndex();
log.info("开始处理数据: 分片={}/{}", shardingIndex, shardingTotal);
// 根据分片参数处理数据
// 例如: 节点1处理userId % 10 = 0的数据
// 节点2处理userId % 10 = 1的数据
List<User> users = userMapper.selectBySharding(shardingIndex, shardingTotal);
for (User user : users) {
processUser(user);
}
log.info("数据处理完成: 分片={}/{}", shardingIndex, shardingTotal);
}
}2. 任务故障转移
/**
* 任务故障转移
* 某个节点宕机后,任务会自动转移到其他节点执行
*/
@Component
public class ImportantJob {
@Scheduled(cron = "0 */10 * * * ?")
public void execute() {
log.info("节点 {} 开始执行任务", getNodeId());
try {
// 执行重要任务
executeImportantTask();
} catch (Exception e) {
log.error("任务执行失败", e);
// 如果节点宕机,SnailJob会将任务转移到其他节点
}
}
}3. 任务依赖
/**
* 任务依赖关系
* 在SnailJob后台配置任务依赖
*/
@Component
public class DependentJobs {
/**
* 任务A: 数据同步
*/
@Scheduled(cron = "0 0 1 * * ?")
public void syncData() {
log.info("开始同步数据...");
// 同步数据
}
/**
* 任务B: 数据统计
* 依赖任务A完成后执行
*/
@Scheduled(cron = "0 0 2 * * ?")
public void statisticsData() {
log.info("开始统计数据...");
// 统计数据
}
/**
* 任务C: 生成报表
* 依赖任务B完成后执行
*/
@Scheduled(cron = "0 0 3 * * ?")
public void generateReport() {
log.info("开始生成报表...");
// 生成报表
}
}分布式事务
本地事务
1. @Transactional注解
import org.springframework.transaction.annotation.Transactional;
/**
* 本地事务示例
*/
@Service
public class OrderService {
/**
* 创建订单
* 使用Spring事务管理
*/
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 保存订单
orderMapper.insert(order);
// 2. 扣减库存
inventoryMapper.deduct(order.getProductId(), order.getQuantity());
// 3. 增加积分
pointsMapper.add(order.getUserId(), order.getAmount());
// 如果任何步骤失败,所有操作都会回滚
}
}2. 事务传播行为
/**
* 事务传播行为示例
*/
@Service
public class UserService {
/**
* REQUIRED: 加入当前事务,如果没有则新建
*/
@Transactional(propagation = Propagation.REQUIRED)
public void method1() {
// 加入外层事务
}
/**
* REQUIRES_NEW: 总是新建事务
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void method2() {
// 独立事务,不受外层事务影响
}
/**
* NESTED: 嵌套事务
*/
@Transactional(propagation = Propagation.NESTED)
public void method3() {
// 嵌套事务,可以独立回滚
}
}跨数据源事务
1. @DSTransactional注解
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
/**
* 跨数据源事务示例
* 支持多数据源之间的事务
*/
@Service
public class DataSyncService {
/**
* 同步数据到多个数据源
*/
@DSTransactional // 跨数据源事务
public void syncData(Data data) {
// 1. 保存到主库
saveTurbo(data);
// 2. 同步到其他数据源
syncToOracle(data);
syncToPostgres(data);
// 如果任何步骤失败,所有数据源的操作都会回滚
}
@DS("master")
private void saveToMaster(Data data) {
dataMapper.insert(data);
}
@DS("oracle")
private void syncToOracle(Data data) {
oracleMapper.insert(data);
}
@DS("postgres")
private void syncToPostgres(Data data) {
postgresMapper.insert(data);
}
}参考: ruoyi-modules/ruoyi-generator/src/main/java/plus/ruoyi/generator/service/GenTableServiceImpl.java:289-323
最终一致性
对于跨系统的分布式事务,推荐使用消息队列实现最终一致性:
/**
* 基于消息队列的最终一致性
*/
@Service
public class OrderService {
/**
* 创建订单
*/
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 本地事务: 保存订单
orderMapper.insert(order);
// 2. 发送消息(本地事务提交后)
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronization() {
@Override
public void afterCommit() {
// 事务提交后发送消息
RMSendUtil.send("order-created-topic", order);
}
}
);
}
}
/**
* 库存服务消费消息
*/
@Component
@RocketMQMessageListener(topic = "order-created-topic",
consumerGroup = "inventory-consumer")
public class InventoryListener implements RocketMQListener<Order> {
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(Order order) {
// 3. 本地事务: 扣减库存
inventoryMapper.deduct(order.getProductId(), order.getQuantity());
// 如果扣减失败,消息会重试,保证最终一致性
}
}服务容错
限流
1. 基于Redis的限流
/**
* 基于Redis的限流
* 使用令牌桶算法
*/
public class RateLimitService {
/**
* 限流: 每秒最多10个请求
*/
public boolean tryAcquire(String key) {
// 令牌桶算法
// rate: 每秒生成的令牌数
// rateInterval: 时间间隔(秒)
long permits = RedisUtils.rateLimiter(
"rate:limit:" + key,
RateType.OVERALL, // 全局限流
10, // 每秒10个令牌
1 // 1秒
);
return permits >= 0; // >= 0表示获取成功
}
/**
* API接口限流
*/
public void apiRequest(String apiKey) {
if (!tryAcquire(apiKey)) {
throw new ServiceException("请求过于频繁,请稍后再试");
}
// 执行业务逻辑
processApi(apiKey);
}
}参考: ruoyi-common/ruoyi-common-redis/src/main/java/plus/ruoyi/common/redis/utils/RedisUtils.java:587-602
2. @RateLimiter注解限流
import plus.ruoyi.common.ratelimiter.annotation.RateLimiter;
/**
* 注解式限流
*/
@RestController
@RequestMapping("/api")
public class ApiController {
/**
* 登录接口限流
* 每个IP每分钟最多5次请求
*/
@RateLimiter(key = "#request.remoteAddr",
time = 60,
count = 5,
limitType = LimitType.IP)
@PostMapping("/login")
public R<LoginVo> login(@RequestBody LoginBo loginBo, HttpServletRequest request) {
// 登录逻辑
return R.ok(loginVo);
}
/**
* 发送短信限流
* 每个用户每天最多10次
*/
@RateLimiter(key = "#phone",
time = 86400, // 24小时
count = 10,
limitType = LimitType.DEFAULT)
@PostMapping("/sms/send")
public R<Void> sendSms(@RequestParam String phone) {
// 发送短信
return R.ok();
}
}降级
/**
* 服务降级示例
*/
@Service
public class ProductService {
/**
* 查询商品详情
* 失败时返回降级数据
*/
public Product getProductDetail(Long productId) {
try {
// 尝试从数据库查询
return productMapper.selectById(productId);
} catch (Exception e) {
log.error("查询商品详情失败,返回降级数据: productId={}", productId, e);
// 返回降级数据(缓存或默认数据)
return getProductFromCache(productId);
}
}
/**
* 从缓存获取降级数据
*/
private Product getProductFromCache(Long productId) {
Product product = RedisUtils.getCacheObject("product:" + productId);
if (product == null) {
// 返回默认数据
product = new Product();
product.setProductId(productId);
product.setProductName("商品暂时无法查看");
}
return product;
}
}熔断
/**
* 熔断器示例
*/
@Service
public class PaymentService {
private AtomicInteger failureCount = new AtomicInteger(0);
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
private long circuitOpenTime = 0;
private static final int FAILURE_THRESHOLD = 5; // 失败阈值
private static final long TIMEOUT = 60000; // 熔断器打开时间60秒
/**
* 调用第三方支付
*/
public PaymentResult callPayment(PaymentRequest request) {
// 1. 检查熔断器状态
if (circuitOpen.get()) {
// 熔断器打开,检查是否可以半开
if (System.currentTimeMillis() - circuitOpenTime > TIMEOUT) {
// 尝试半开状态
circuitOpen.set(false);
failureCount.set(0);
} else {
// 熔断器仍然打开,快速失败
throw new ServiceException("支付服务暂时不可用,请稍后再试");
}
}
try {
// 2. 调用第三方支付
PaymentResult result = thirdPartyPayment(request);
// 3. 成功,重置计数器
failureCount.set(0);
return result;
} catch (Exception e) {
// 4. 失败,增加计数
int failures = failureCount.incrementAndGet();
if (failures >= FAILURE_THRESHOLD) {
// 5. 达到阈值,打开熔断器
circuitOpen.set(true);
circuitOpenTime = System.currentTimeMillis();
log.error("支付服务熔断器打开: 连续失败{}次", failures);
}
throw new ServiceException("支付失败", e);
}
}
}最佳实践
1. 数据源使用原则
// ✅ 推荐: 明确指定数据源
@DS("slave")
public List<User> queryUsers() {
return userMapper.selectList(null);
}
// ❌ 不推荐: 依赖默认数据源
public List<User> queryUsers() {
// 不明确,容易出错
return userMapper.selectList(null);
}2. 分布式锁使用原则
// ✅ 推荐: 锁粒度细,超时时间合理
@Lock4j(keys = "#userId", expire = 30000, acquireTimeout = 10000)
public void updateUser(Long userId) {
// 只锁定当前用户
}
// ❌ 不推荐: 锁粒度粗,没有超时
@Lock4j
public void updateUser(Long userId) {
// 锁定整个方法,粒度太粗
}3. 消息队列使用原则
// ✅ 推荐: 保证幂等性
@Override
public void onMessage(Order order) {
String messageId = order.getMessageId();
if (RedisUtils.hasKey("processed:" + messageId)) {
return; // 已处理,跳过
}
processOrder(order);
RedisUtils.setCacheObject("processed:" + messageId, true, Duration.ofHours(24));
}
// ❌ 不推荐: 没有幂等性保护
@Override
public void onMessage(Order order) {
processOrder(order); // 可能重复消费
}4. 任务调度使用原则
// ✅ 推荐: 任务执行时间合理,有异常处理
@Scheduled(cron = "0 0 2 * * ?")
public void cleanData() {
try {
doClean();
} catch (Exception e) {
log.error("清理数据失败", e);
// 发送告警
}
}
// ❌ 不推荐: 任务执行时间长,没有异常处理
@Scheduled(fixedDelay = 1000) // 每秒执行
public void heavyTask() {
// 执行耗时任务,可能导致任务堆积
}常见问题
1. 读写分离主从延迟问题
问题: 主库写入后立即从从库读取,读不到最新数据
原因: 主从数据库存在同步延迟
解决方案:
/**
* 方案1: 强制读主库
*/
@DS("master")
public User getUserAfterSave(Long userId) {
// 强制从主库读取
return userMapper.selectById(userId);
}
/**
* 方案2: 延迟读取
*/
public User getUserWithDelay(Long userId) {
// 等待主从同步
Thread.sleep(100);
return userMapper.selectById(userId);
}
/**
* 方案3: 使用缓存
*/
public void saveUser(User user) {
// 1. 保存到主库
userMapper.insert(user);
// 2. 同步写入缓存
RedisUtils.setCacheObject("user:" + user.getUserId(), user);
}2. 分布式锁死锁问题
问题: 多个线程相互等待锁,导致死锁
原因: 加锁顺序不一致
解决方案:
/**
* 解决方案: 按固定顺序加锁
*/
public void transfer(Long fromId, Long toId, BigDecimal amount) {
// 按ID大小排序,保证加锁顺序一致
Long first = fromId < toId ? fromId : toId;
Long second = fromId < toId ? toId : fromId;
RLock lock1 = RedisUtils.getLock("account:" + first);
RLock lock2 = RedisUtils.getLock("account:" + second);
try {
lock1.lock(30, TimeUnit.SECONDS);
lock2.lock(30, TimeUnit.SECONDS);
// 转账逻辑
} finally {
lock2.unlock();
lock1.unlock();
}
}3. 消息重复消费问题
问题: 消息被重复消费,导致数据重复
原因: 网络抖动、消费失败重试等
解决方案:
/**
* 解决方案: 幂等性处理
*/
@Override
public void onMessage(PaymentMessage message) {
String messageId = message.getMessageId();
// 1. 使用分布式锁+缓存保证幂等
String lockKey = "payment:lock:" + messageId;
RLock lock = RedisUtils.getLock(lockKey);
try {
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
// 2. 检查是否已处理
if (RedisUtils.hasKey("payment:processed:" + messageId)) {
return;
}
// 3. 处理支付
processPayment(message);
// 4. 标记已处理
RedisUtils.setCacheObject(
"payment:processed:" + messageId,
true,
Duration.ofDays(1)
);
}
} finally {
lock.unlock();
}
}4. 任务重复执行问题
问题: 多个节点同时执行同一个定时任务
原因: 每个节点都会执行@Scheduled任务
解决方案:
/**
* 解决方案: 使用分布式锁
*/
@Component
public class ScheduledTask {
@Scheduled(cron = "0 0 1 * * ?")
public void execute() {
String lockKey = "scheduled:task:daily";
RLock lock = RedisUtils.getLock(lockKey);
try {
// 尝试获取锁,只有一个节点能获取成功
if (lock.tryLock(0, 3600, TimeUnit.SECONDS)) {
try {
// 执行任务
executeTask();
} finally {
lock.unlock();
}
} else {
log.info("任务正在其他节点执行,跳过");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}5. RocketMQ消息积压问题
问题: 消费速度慢,消息大量积压
原因: 消费线程数不足,业务处理慢
解决方案:
/**
* 解决方案1: 增加消费线程数
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
consumeThreadMin = 20, // 增加最小线程数
consumeThreadMax = 64 // 增加最大线程数
)
public class OrderListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 快速处理
}
}
/**
* 解决方案2: 异步处理
*/
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer"
)
public class OrderListener implements RocketMQListener<Order> {
@Async // 异步处理
@Override
public void onMessage(Order order) {
// 耗时处理放到线程池
processOrder(order);
}
}文档版本: v1.0.0 最后更新: 2025-01-10 维护者: RuoYi-Plus-UniApp 开发团队
