前言:
当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:
- 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
- 数据同步(从第三方接口拉取数据处理后写入自己的数据库)
以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步
- 数据读取:从数据源读取数据到内存
- 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理
而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。
设计思路
由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:
示例
多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。
模拟服务
import java.util.concurrent.atomic.AtomicLong;/*** 数据批量写入用的模拟服务** @author RJH* create at 2019-04-01*/public class MockService {/*** 可读取总数*/private long canReadTotal;/*** 写入总数*/private AtomicLong writeTotal=new AtomicLong(0);/*** 写入休眠时间(单位:毫秒)*/private final long sleepTime;/*** 构造方法** @param canReadTotal* @param sleepTime*/public MockService(long canReadTotal, long sleepTime) {this.canReadTotal = canReadTotal;this.sleepTime = sleepTime;}/*** 批量读取数据接口** @param num* @return*/public synchronized long readData(int num) {long readNum;if (canReadTotal >= num) {canReadTotal -= num;readNum = num;} else {readNum = canReadTotal;canReadTotal = 0;}//System.out.println("read data size:" + readNum);return readNum;}/*** 写入数据接口*/public void writeData() {try {// 休眠一定时间模拟写入速度慢Thread.sleep(sleepTime);} catch (InterruptedException e) {e.printStackTrace();}// 写入总数自增System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());}/*** 获取写入的总数** @return*/public long getWriteTotal() {return writeTotal.get();}}
批量数据处理器
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/*** 基于线程池的多线程批量写入处理器* @author RJH* create at 2019-04-01*/public class SimpleBatchHandler {private ExecutorService executorService;private MockService service;/*** 每次批量读取的数据量*/private int batch;/*** 线程个数*/private int threadNum;public SimpleBatchHandler(MockService service, int batch,int threadNum) {this.service = service;this.batch = batch;//使用固定数目的线程池this.executorService = Executors.newFixedThreadPool(threadNum);}/*** 开始处理*/public void startHandle() {// 开始处理的时间long startTime = System.currentTimeMillis();System.out.println("start handle time:" + startTime);long readData;while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止for (long i = 0; i < readData; i++) {executorService.execute(() -> service.writeData());}}// 关闭线程池executorService.shutdown();while (!executorService.isTerminated()) {//等待线程池中的线程执行完}// 结束时间long endTime = System.currentTimeMillis();System.out.println("end handle time:" + endTime);// 总耗时System.out.println("total handle time:" + (endTime - startTime) + "ms");// 写入总数System.out.println("total write num:" + service.getWriteTotal());}}
测试类
/*** SimpleBatchHandler的测试类* @author RJH* create at 2019-04-01*/public class SimpleBatchHandlerTest {public static void main(String[] args) {// 总数long total=100000;// 休眠时间long sleepTime=100;// 每次拉取的数量int batch=100;// 线程个数int threadNum=16;MockService mockService=new MockService(total,sleepTime);SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);handler.startHandle();}}
运行结果
start handle time:1554298681755thread:Thread[pool-1-thread-2,5,main] write data:1thread:Thread[pool-1-thread-1,5,main] write data:2...省略部分输出thread:Thread[pool-1-thread-4,5,main] write data:100000end handle time:1554299330202total handle time:648447mstotal write num:100000
分析
在单线程情况下的执行时间应该为total*sleepTime,即10000000ms,而改造为多线程后执行时间为648447ms。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。