背景
在系统中经常涉及到较大数据的写入问题 例如在系统启动的时候可能瞬间需要写入很多初始化数据到db 或者是在系统运行的高峰期出现大量并发写入耗尽连接池,但是直接采用Kafka等消息队列来解耦写入操作又有点小题大做,因此可以封装一个简易的批处理逻辑 来定时入库。
采用阻塞队列 + 定时读取
用Spring的定时任务循环检查队列中是否有数据,如果达到一定量 就通过DrainTo方法获取 写入到db,这里有几个注意事项
优雅停机问题,在停机的时候需要先保存数据
批量写入的间隔问题,需要根据产生数据的速度进行评估,如果消费过慢 会导致内存压力过大
数据库并发写入的连接池参数配置问题 和第二点有点类似 也需要评估写入一批数据的数量
JPA需要开启批处理
jpa:
properties:
hibernate:
generate_statistics: false
jdbc:
batch_size: 40
package com.rms.service;
import com.rms.domain.entity.DeviceLog;
import com.rms.domain.entity.Prop;
import com.rms.domain.entity.Unit;
import com.rms.repo.LogRepo;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Service
@Slf4j
public class LogService {
private static final int BATCH_SIZE = 40; // 每批次处理的大小
private final LogRepo logRepo;
private final BlockingQueue<DeviceLog> queue = new LinkedBlockingQueue<>();
@Autowired
public LogService(LogRepo logRepo) {
this.logRepo = logRepo;
}
private Boolean saveBatch(){
List<DeviceLog> logs = new ArrayList<>();
queue.drainTo(logs,BATCH_SIZE);
if(!CollectionUtils.isEmpty(logs)){
logRepo.saveAll(logs);
return true;
}
return false;
}
@PostConstruct
public void schedule(){
Thread.ofVirtual().start(()->{
while (true){
saveBatch();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
log.error("log service error",e);
}
}
});
}
public void saveLog(Prop<?> prop, Unit unit) {
DeviceLog logEntity = new DeviceLog();
logEntity.setLogMsg(prop.getFieldName());
logEntity.setValue(prop.getValue().toString());
logEntity.setUnit(unit);
logEntity.setCreateTime(new Date());
logEntity.setLastModifiedTime(new Date());
queue.add(logEntity);
}
@PreDestroy
public void destroy(){
saveBatch();
}
}