duckflew
duckflew
Published on 2024-08-06 / 24 Visits
0
0

阻塞队列批量写入数据库实现

背景

在系统中经常涉及到较大数据的写入问题 例如在系统启动的时候可能瞬间需要写入很多初始化数据到db 或者是在系统运行的高峰期出现大量并发写入耗尽连接池,但是直接采用Kafka等消息队列来解耦写入操作又有点小题大做,因此可以封装一个简易的批处理逻辑 来定时入库。

采用阻塞队列 + 定时读取

用Spring的定时任务循环检查队列中是否有数据,如果达到一定量 就通过DrainTo方法获取 写入到db,这里有几个注意事项

  • 优雅停机问题,在停机的时候需要先保存数据

  • 批量写入的间隔问题,需要根据产生数据的速度进行评估,如果消费过慢 会导致内存压力过大

  • 数据库并发写入的连接池参数配置问题 和第二点有点类似 也需要评估写入一批数据的数量

  • JPA需要开启批处理

  jpa:
    properties:
      hibernate:
        generate_statistics: false
        jdbc:
          batch_size: 40
package com.rms.service;

import com.rms.domain.entity.DeviceLog;
import com.rms.domain.entity.Prop;
import com.rms.domain.entity.Unit;
import com.rms.repo.LogRepo;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Service
@Slf4j
public class LogService {
    private static final int BATCH_SIZE = 40; // 每批次处理的大小
    private final LogRepo logRepo;
    private final BlockingQueue<DeviceLog> queue = new LinkedBlockingQueue<>();
    @Autowired
    public LogService(LogRepo logRepo) {
        this.logRepo = logRepo;
    }

    private Boolean saveBatch(){
        List<DeviceLog> logs = new ArrayList<>();
        queue.drainTo(logs,BATCH_SIZE);
        if(!CollectionUtils.isEmpty(logs)){
            logRepo.saveAll(logs);
            return true;
        }
        return false;
    }
    @PostConstruct
    public void schedule(){
        Thread.ofVirtual().start(()->{
            while (true){
                saveBatch();
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    log.error("log service error",e);
                }
            }
        });
    }
    public  void saveLog(Prop<?> prop, Unit unit) {
        DeviceLog logEntity = new DeviceLog();
        logEntity.setLogMsg(prop.getFieldName());
        logEntity.setValue(prop.getValue().toString());
        logEntity.setUnit(unit);
        logEntity.setCreateTime(new Date());
        logEntity.setLastModifiedTime(new Date());
        queue.add(logEntity);
    }

    @PreDestroy
    public void destroy(){
        saveBatch();
    }
}


Comment