博主这里的大数据量、高并发业务处理优化基于博主线上项目实践以及全网资料整理而来,在这里分享给大家
线上业务后台项目有一个消息推送的功能,通过上传包含用户 id 的文件,给指定用户推送系统消息
通常情况下大部分用户都会使用 excel 文件,但是相比 excel 文件还有一种更加推荐的文件格式,那就是 csv 文件,相比 excel 文件它可以直接在记事本编辑,excel 也可以打开 cvs 文件,且占用内存更少(画重点),对于上传的 csv 文件过于庞大,也可以采用流式读取,读一部分写一部分
由于大批量数据插入是一个耗时操作(可能几秒也可能几分钟),所以需要保存批量插入是否成功的状态,在后台中可以显现出这条消息推送记录是成功还是失败,方便运营回溯消息推送状态
博主这里给出两种方案利弊:
综上:在大数据量下,我们要是追求极致性能可以不启用事务,具体选择也需各位结合自身业务情况
建议功能设计上,可以屏蔽对失败消息再进行操作,这样不需要再处理之前推送失败写入的脏数据,直接新增消息推送即可
rewriteBatchedStatements=true
在 jdbc 驱动上启动批量写入功能,如下spring.datasource.master.jdbc-url=jdbc:mysql://localhost:3306/test_db?allowMultiQueries=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&rewriteBatchedStatements=true
insert into table(id, name) values(1, 'tom'),(2, 'jack')
模式,建议一次写入个数不要太多,MySQL 对于 sql 长度是有限制的,对于这种字段少的表,一次写入 500 - 1000 问题不大,字段多了需要降低这个写入量insert into im_notice_app_ref(notice_id, app_id, create_time)
values
<foreach collection="list" separator="," item="item">
(#{item.noticeId}, #{item.appId}, #{item.createTime})
</foreach>
一般情况下大家都知道第二条优化,但是可能会忽略 jdbc 参数携带 rewriteBatchedStatements=true
,这个参数能在第二条的基础上启用批量执行 SQL ,进一步提升写入性能
@Transactional
大于 Spring
提供得事务注解,许多人都知道,但是在高并发下,不建议使用,推荐通过编程式事务来手动控制事务提交或者回滚,减少事务影响范围
如下是一段订单超时未支付回滚业务数据得代码,采用 @Transactional
事务注解
@Transactional(rollbackFor = Exception.class)
public void doUnPaidTask(Long orderId) {
// 1. 查询订单是否存在
Order order = orderService.getById(orderId);
if (order == null) {
throw new BusinessException(String.format("订单不存在,orderId:%s", orderId));
}
if (order.getOrderStatus() != OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()) {
throw new BusinessException(String.format("订单状态错误,order:%s", order));
}
// 2. 设置订单为已取消状态
order.setOrderStatus((byte) OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());
order.setUpdateTime(new Date());
if (!orderService.updateById(order)) {
throw new BusinessException("更新数据已失效");
}
// 3.商品货品数量增加
LambdaQueryWrapper<OrderItem> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(OrderItem::getOrderId, orderId);
List<OrderItem> orderItems = orderItemService.list(queryWrapper);
for (OrderItem orderItem : orderItems) {
if (orderItem.getSeckillId() != null) { // 秒杀单商品项处理
Long seckillId = orderItem.getSeckillId();
SeckillService seckillService = SpringContextUtil.getBean(SeckillService.class);
if (!seckillService.addStock(seckillId)) {
throw new BusinessException("秒杀商品货品库存增加失败");
}
} else { // 普通单商品项处理
Long goodsId = orderItem.getGoodsId();
Integer goodsCount = orderItem.getGoodsCount();
if (!goodsDao.addStock(goodsId, goodsCount)) {
throw new BusinessException("秒杀商品货品库存增加失败");
}
}
}
// 4. 返还优惠券
couponService.releaseCoupon(orderId);
log.info("---------------订单 orderId:{},未支付超时取消成功", orderId);
}
采用编程式事务对其优化,代码如下:
@Resource
private PlatformTransactionManager platformTransactionManager;
@Resource
private TransactionDefinition transactionDefinition;
public void doUnPaidTask(Long orderId) {
// 启用编程式事务
// 1. 在开启事务钱查询订单是否存在
Order order = orderService.getById(orderId);
if (order == null) {
throw new BusinessException(String.format("订单不存在,orderId:%s", orderId));
}
if (order.getOrderStatus() != OrderStatusEnum.ORDER_PRE_PAY.getOrderStatus()) {
throw new BusinessException(String.format("订单状态错误,order:%s", order));
}
// 2. 开启事务
TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);
try {
// 3. 设置订单为已取消状态
order.setOrderStatus((byte) OrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());
order.setUpdateTime(new Date());
if (!orderService.updateById(order)) {
throw new BusinessException("更新数据已失效");
}
// 4. 商品货品数量增加
LambdaQueryWrapper<OrderItem> queryWrapper = Wrappers.lambdaQuery();
queryWrapper.eq(OrderItem::getOrderId, orderId);
List<OrderItem> orderItems = orderItemService.list(queryWrapper);
for (OrderItem orderItem : orderItems) {
if (orderItem.getSeckillId() != null) { // 秒杀单商品项处理
Long seckillId = orderItem.getSeckillId();
SeckillService seckillService = SpringContextUtil.getBean(SeckillService.class);
RedisCache redisCache = SpringContextUtil.getBean(RedisCache.class);
if (!seckillService.addStock(seckillId)) {
throw new BusinessException("秒杀商品货品库存增加失败");
}
redisCache.increment(Constants.SECKILL_GOODS_STOCK_KEY + seckillId);
redisCache.deleteCacheSet(Constants.SECKILL_SUCCESS_USER_ID + seckillId, order.getUserId());
} else { // 普通单商品项处理
Long goodsId = orderItem.getGoodsId();
Integer goodsCount = orderItem.getGoodsCount();
if (!goodsDao.addStock(goodsId, goodsCount)) {
throw new BusinessException("秒杀商品货品库存增加失败");
}
}
}
// 5. 返还优惠券
couponService.releaseCoupon(orderId);
// 6. 所有更新操作完成后,提交事务
platformTransactionManager.commit(transaction);
log.info("---------------订单 orderId:{},未支付超时取消成功", orderId);
} catch (Exception e) {
log.info("---------------订单 orderId:{},未支付超时取消失败", orderId, e);
// 7. 发生异常,回滚事务
platformTransactionManager.rollback(transaction);
}
}
可以看到采用编程式事务后,我们将查询逻辑排除在事务之外,减小了其影响范围,也就提升了性能,在高并发场景下,性能优先的场景,我们甚至可以考虑不适用事务
线上项目客户端,采用 tcp 协议与日志采集服务建立连接,上报日志数据。业务高峰期下,会有同时成千个客户端建立连接实时上报日志数据
如上场景,高峰期下,对日志采集服务会造成不小的压力,处理服务处理不当,会造成高峰期下,服务卡顿、CPU 占用过高、内存溢出等。
这里给出海量日志高并发下优化点:
ArrayBlockingQueue
得生产者消费者模式,对日志数据进行异步批量处理,在此场景下,通过生产者将数据缓存再内存中,然后再消费者中批量保存入库。Disruptor
队列,也是基于内存队列的生产者消费者模型,消费速度对比 ArrayBlockingQueue
有一个数量级得性能提升,附简介说明: https://www.jianshu.com/p/bad7b4b44e48kfaka
消息队列中间件,持久日志数据,慢慢消费。虽然引入第三方依赖会增加系统复杂度,但是相比 kfaka
在大数据场景下提供的优秀表现,这一点也是值得。如上三种方案:大家可以结合自身项目实际体量选择
对上报后的日志如果要再发送给其他服务,推荐是对其进行压缩处理,避免消耗过多网络带宽以及最终数据落库选型:
Java
里通常是指序列化方式,Jdk
自带得序列化方式对比 Protobuf 、fst 、Hession
等在序列化速度和大小的表现上都没有优势,甚至可以用垃圾形容,博主这里直接给出 Java
得几种序列化方式对比链接: https://segmentfault.com/a/1190000039934578 ,
建议对传输大小要求较高可以使用 Avro
序列化, 对综合要求较高可采用 Protobuf
Clickhouse
进行存储,相同数据量下对比 MySql
占用存储更少,查询性能更高最后,附博主 github
地址: https://github.com/wayn111
1
byte10 2022-12-08 09:33:11 +08:00
想的有点复杂了吧,一个带事务的 MQ 就可以解决你的问题了??
|
3
byte10 2022-12-08 09:52:23 +08:00
1 、上传的数据写入 MQ ,消息推送事务,回滚 mq 消息。2 、读取 mq 数据写入数据库,这个瓶颈一般在于数据库上,多个客户端同时消费。3 、日志那边没啥问题
|