# 并发编程场景解决方案面试题2025

# 1、设计一个异步任务执行框架,如何利用多线程来执行耗时的任务,并在任务完成后通知主线程(美团地图)

问的这个东西,其实Java中自带的工具已经实现了。利用CompletableFuture就可以实现这个效果,可以点一嘴CompletableFuture的一些API,证明你会。

比如CompletableFuture.supplyAsync知道好具体要执行的任务,然后在后续,追加一个thenApplyAsync,等任务完成后,执行thenApplyAsync的回调。

如果舍弃掉CompletableFuture,自己主动的去实现一个异步任务的执行框架,需要考虑哪些事情,怎么去落地实现。

1、优先构建好Java提供的线程池,ThreadPoolExecutor,用于多线程执行任务。

2、要求主动通知主线程,需要一个回调的函数,不能让主线程自己去get。针对回调函数,至少要提供成功与失败的方法。 onComplete,onError。

3、提供一个提交任务的方法,这个方法至少两个参数,一个是具体的任务Runnable,一个回调Callback。也可以提供一个执行Callable的提交任务的方法。

4、比如任务执行超时后,想要去走超时的回调,onTimeout啥的,直接在提交任务后面去追加。

5、比如任务执行一段时间后,要取消,一样可以提供任务的返回结果,让主线能拿到任务并且走cancel。

package org.example;

import java.util.concurrent.*;

/**
 * 要自己设计的一个异步框架。
 * 1、优先构建好Java提供的线程池,ThreadPoolExecutor,用于多线程执行任务。
 *
 * 2、要求主动通知主线程,需要一个回调的函数,不能让主线程自己去get。针对回调函数,至少要提供成功与失败的方法。 onComplete,onError。
 *
 * 3、提供一个提交任务的方法,这个方法至少两个参数,一个是具体的任务Runnable,一个回调Callback。也可以提供一个执行Callable的提交任务的方法。
 */
public class AsyncTaskFramework {

    // 偷个懒,基于Executors先构建一个。。。
    private final ExecutorService executor = Executors.newFixedThreadPool(10);

    // 利用回调的方式,实现任务完成后,通知主线程。
    private interface Callback{
        default void onComplete(){};
        default void onComplete(Object result){};
        default void onError(Exception e){};
        default void onTimeout(){}
    }

    /**
     * 执行Runnable,提供callback的机制。
     * @param task
     * @param callback
     */
    public void submitTask(Runnable task, Callback callback){
        // 提交任务到线程池时,自己写一套业务,确认可以在任务完成后,执行回调
        executor.execute(() -> {
            // 代码执行到这,已经是线程池里的线程在执行了,已经是异步的了。
            try {
                // 执行任务
                task.run();
                // 任务执行完毕,没出现异常
                if(callback != null){
                    callback.onComplete();
                }
            } catch (Exception e) {
                if(callback != null){
                    callback.onError(e);
                }
            }
        });
    }

    /**
     * 执行Callable,提交callback回调。
     * @param task
     * @param callback
     */
    public void submitTask(Callable task, Callback callback){
        executor.execute(() -> {
            try {
                Object result = task.call();
                if(callback != null){
                    callback.onComplete(result);
                }
            } catch (Exception e) {
                if(callback != null){
                    callback.onError(e);
                }
            }
        });
    }

    /**
     * 追加超时的逻辑
     * @param task
     * @param callback
     * @param timeout
     * @param unit
     */
    public void submitTask(Runnable task, Callback callback, long timeout, TimeUnit unit){
        Future<?> future = executor.submit(() -> {
            try {
                task.run();
            } catch (Exception e) {
                if (callback != null){
                    callback.onError(e);
                }
            }
        });
        // 判断超时与否
        try {
            future.get(timeout, unit);
            // 成功的逻辑
            if(callback != null){
                callback.onComplete();
            }
        } catch (TimeoutException e) {
            // 超时的逻辑
            if(callback != null){
                callback.onTimeout();
            }
        }catch (InterruptedException | ExecutionException e) {
            // 异常的逻辑
            if(callback != null){
                callback.onError(e);
            }
        }
    }

    public static void main(String[] args) {
        AsyncTaskFramework framework = new AsyncTaskFramework();
        framework.submitTask(() -> {

            System.out.println("异步执行:xxxxx");
            int i = 1 / 0;
        }, new Callback() {
            @Override
            public void onComplete() {
                System.out.println("主线程在任务完毕后的回调");
            }

            @Override
            public void onError(Exception e) {
                System.out.println("任务执行出现异常,通知主线程。");
            }
        });
    }

}

# 2、设计一个后台任务调度系统,如何利用多线程来并行执行多个定时任务,提高任务的处理效率(滴滴司乘)

后台调度就是一个定时任务的套路。

正常咱们很少自己去写这种东西,市面上开源好用的很多,比如单体上Quartz,分布式微服务的环境下,可以上Elastic-Job,XXL-Job等等…………开源定时任务框架非常非常多。。。

如果要自己设计的话,可以参考这个开源的框架去自己实现一套,造个轮子。

1、首先第一点,自己设计的定时任务框架要支持cron表达式,为了解析cron表达式,可以先导入一个cron-utils的小工具,来解析cron表达式。

2、其次构建好Java提供的线程池,ThreadPoolExecutor,用于多线程执行多个定时任务。

3、另外,这种基于cron的定时任务,单纯的Runnable没有办法支撑。自己要封装一个任务类。

4、因为是定时任务框架,任务要以周期性执行,需要有一个容器可以把任务存储起来,而且涉及到多个任务,所以需要一个线程安全的容器,直接上ConcurrentHashMap。

5、可以给当前任务调度框架添加上移除任务和添加任务的功能,暂停任务和启动任务的功能,还有立即执行。

6、在定时任务框架开始的时候,就直接要在一个死循环里遍历ConcurrentHashMap中的任务,查询每一个任务是否现在可以执行,可以就直接执行,不可以就拉倒。直到遍历结束,再开启下一次循环~。

优先把上述基本的功能实现,先发版,再迭代!!!!

7、支持分布式锁,还需要考虑在任务执行前,获取一下Redis的分布式锁,确保就一个服务执行……

8、日志,任务失败的重试等等…………

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.cronutils</groupId>
        <artifactId>cron-utils</artifactId>
        <version>9.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>
package org.example;

import com.cronutils.model.Cron;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import lombok.extern.slf4j.Slf4j;
import org.apache.logging.log4j.message.AsynchronouslyFormattable;

import java.time.ZonedDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 异步定时任务框架
 * 1、首先第一点,自己设计的定时任务框架要支持cron表达式,为了解析cron表达式,可以先导入一个cron-util的小工具,来解析cron表达式。
 *
 * 2、其次构建好Java提供的线程池,ThreadPoolExecutor,用于多线程执行多个定时任务。
 *
 * 3、另外,这种基于cron的定时任务,单纯的Runnable没有办法支撑。自己要封装一个任务类。
 *
 * 4、因为是定时任务框架,任务要以周期性执行,需要有一个容器可以把任务存储起来,而且涉及到多个任务,所以需要一个线程安全的容器,直接上ConcurrentHashMap。
 *
 * 5、可以给当前任务调度框架添加上移除任务和添加任务的功能,暂停任务和启动任务的功能,还有立即执行。
 *
 * 6、在定时任务框架开始的时候,就直接要在一个死循环里遍历ConcurrentHashMap中的任务,查询每一个任务是否现在可以执行,可以就直接执行,不可以就拉倒。知道遍历结束。
 */
@Slf4j
public class AsyncScheduleFramework {
    // 偷个懒,基于Executors先构建一个。。。
    private final ExecutorService executor = Executors.newFixedThreadPool(20);
    // 任务需要容器存储,声明CHM
    private final Map<String,ScheduleTask> tasks = new ConcurrentHashMap<>();

    private static class ScheduleTask{
        // 具体的任务
        private Runnable task;
        // 下次什么时间执行
        ZonedDateTime nextExecuteTime;
        // 计算下次执行时间的对象
        ExecutionTime executionTime;
        // 任务是否开启了可以执行的权限!!
        volatile boolean started = true;
        // 任务重复执行,第一次确保running为false,才能去执行,在执行前,将running改为true,在执行执行完毕后,再将任务修改为false
        volatile boolean running = false;

        public ScheduleTask(Runnable task, ZonedDateTime nextExecuteTime,ExecutionTime executionTime) {
            this.task = task;
            this.nextExecuteTime = nextExecuteTime;
            this.executionTime = executionTime;
        }

        // 计算下次执行的时间。
        void computeNextExecutionTime(ZonedDateTime nextExecuteTime) {
            this.nextExecuteTime = executionTime.nextExecution(nextExecuteTime).get();
        }
    }

    /**
     * 添加任务。
     * @param taskName  任务标识,方便后期对任务做启动,停止等操作。
     * @param cronExpression  cron表达式
     * @param task  具体任务
     */
    public void addTask(String taskName,String cronExpression,Runnable task){
        // 1、 创建解析cron表达式的对象
        CronParser parser = new CronParser(CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ));
        // 2、 解析cron表达式
        Cron cron = parser.parse(cronExpression);
        // 3、将解析后的cron表达式转换为ExecutionTime
        ExecutionTime executionTime = ExecutionTime.forCron(cron);
        // 4、基于executionTime获取到下次任务要执行的时间。而且获取到的时间,最好是java.time下的类。
        ZonedDateTime nextExecutionTime = executionTime.nextExecution(ZonedDateTime.now()).get();
        // 5、创建任务对象
        ScheduleTask scheduleTask = new ScheduleTask(task,nextExecutionTime,executionTime);
        // 6、任务添加到ConcurrentHashMap
        // TODO (这里要注意一下,先确保内部没有,咱们再put,忽然会覆盖。)
        tasks.put(taskName,scheduleTask);
    }

    /**
     * 任务启动
     * @param taskName
     */
    public void start(String taskName){
        ScheduleTask scheduleTask = tasks.get(taskName);
        if(scheduleTask != null){
            scheduleTask.started = true;
        }
    }
    /**
     * 任务暂停
     * @param taskName
     */
    public void stop(String taskName){
        ScheduleTask scheduleTask = tasks.get(taskName);
        if(scheduleTask != null){
            scheduleTask.started = false;
        }
    }

    public AsyncScheduleFramework(){
        // 在这个框架创建出来的时候,就开始遍历tasks集合,拿到任务就判断是否到了执行时间,并且isRunning为true,才可以执行。
        Thread t = new Thread(() -> {
            while(true){
                for (Map.Entry<String, ScheduleTask> taskMap : tasks.entrySet()) {
                    // 获取到具体的任务
                    ScheduleTask task = taskMap.getValue();
                    // 判断执行时间是否满足要求,并且isRunning是否为true
                    if(task.nextExecuteTime.isBefore(ZonedDateTime.now()) && task.started && !task.running){
                        // TODO   获取一波Redis分布式锁的资源,key就可以前缀加任务名称,拿到锁的,才能执行后面的代码。拿锁失败的,就直接计算下次执行的时间
                        log.info("各种日志,当前服务的标识,具体的任务名是啥,等等……");
                        // 将running设置true,代表要开始执行了
                        task.running = true;
                        // 执行时间到了,并且isRunning为true,代表可以执行。
                        executor.execute(() -> {
                            try {
                                // 执行任务
                                task.task.run();

                            } catch (Exception e){
                                // TODO  重试,还是做其他的操作~~~
                            }finally {
                                // 计算任务下次执行的时间
                                task.computeNextExecutionTime(task.nextExecuteTime);
                                task.running = false;
                            }
                        });
                    }
                    // 现在设计的ConcurrentHashMap作为容器,就是会存在比较大的时间误差,想误差小,就上优先级队列存储任务。线程始终在task先级队列~
                    // 第一版发出来,后面再优化成阻塞队列的形式。。。
                }
                Thread.yield();
            }
        });
        t.start();
    }

    /**
     * 第一版有点小bug,咱们没排查出来~~~
     * @param args
     */
    public static void main(String[] args) {
        AsyncScheduleFramework framework = new AsyncScheduleFramework();
        framework.addTask("test","0/5 * * ? * *",() -> {
            System.out.println(Thread.currentThread().getName() + "任务执行啦,时间:" + ZonedDateTime.now());
        });
    }

    /*
     * 1、 首先new  AsyncScheduleFramework,就会开启一个线程,去遍历ConcurrentHashMap
     * 获取到里面存储的任务
     * 拿到任务后先判断任务的执行时间是否到了 && 判断任务现在是否开启  && 任务是第一次在定时任务执行
     * 在内部准备开始执行任务。
     *      1.1 先将第一次任务执行的标识修改掉,确保只执行一次。
     *      1.2 开启一个线程开始执行任务 ---  任务执行完毕,计算下次执行任务的时间,并且将第一次任务执行的标识重置
     *
     * 2、添加任务时的操作。
     * 基于cron表达式,计算出这个任务下次执行的时间
     * 封装ScheduleTask任务,并且将任务添加到ConcurrentHashMap集合里。
     */

}

# 第二题当时的问题解决

直接画图说了。

image.png


image.png


image.png


image.png


image.png


image.png

这样问题就出现了,所以解决问题的方式很简单,把判断的顺序更改一下就可以了。

将这个顺序

if (task.nextExecuteTime.isBefore(ZonedDateTime.now()) && task.started && !task.running) {

修改为

if (!task.running && task.started && task.nextExecuteTime.isBefore(ZonedDateTime.now())) {

这样再走上述的逻辑,就不会出现重复执行任务的问题。

只要线程池的线程没有执行完毕task.running = false,就不会进入到if的逻辑,哪怕出现了上述图中的逻辑,也会基于时间的第三个判断,进不去if,规避重复问题。

# 3、描述如何使用多线程来处理大数据集,例如在数据分析应用中,将数据拆分成多个部分并行处理(58同城)

这里最基本的线程池 + CountDownLatch就可以解决,至于代码实现。

核心思路自然就将大数据集拆分成多个部分,分别交给线程池去处理,等到所有任务才完成后,汇总即可。

实现部署步骤,大致分为几步:

  • 首先需要将大数据集拆分成若干部分,比如数据体量在1000W,那可以将每个任务拆分成100W,分别交给线程去处理。
  • 还需要计算好任务的个数,利用CountDownLatch来做计数器,确保得知任务是否全部完成。
  • 讲封装好的任务投递到线程池中执行。
  • 主线程等待任务完毕后,拿到每一个结果进行汇总即可。

这里有几个小细节需要考虑。

  • 首先是任务的切分,这里需要根据任务的情况来考虑
    • 如果分析是单纯的CPU密集任务,那需要跟CPU内核个数考虑好,做好测试,如果并行的线程出太多,导致线程的上下文切换,会带来一定的性能损耗。
    • 如果分析是IO密集的任务,这种IO密集的任务,线程数需要的数量会大一些,这里具体的线程数需要通过压测去测试,其次如果是JDK21版本,可以考虑引入虚拟线程来提升处理的效率。
  • 另外就是这种大规模的数据集,JVM内存是否能够一次性装下,如果在内存无法满足一次性获取时,需要做好分批次的处理流程。
  • 另外如果数据集的处理之前存在一些前后相互依赖的关系时,做好同步操作的事情,最好的方式,是利用CompletableFuture做好任务的编排工作。
  • 还有就是这种异步处理,咱们需要考虑可能存在一些异常问题,提前添加好处理异常的逻辑,出现问题时,要有一个补偿的措施。

# 4、描述如何实现一个网络爬虫,利用多线程并行抓取多个网页,提高数据抓取的效率(百度AI)

咱们面的是Java岗位,别一看爬虫就想Python,首先Python爬数据,确实比Java好,但是Java也能抓一些数据的。

爬取数据的整体过程,无非是获取对应的URL,访问URL获取对应的响应信息,Java就是利用Jsoup提供的API对数据进行解析,拿到对应的数据,落库即可。

影响抓取数据效率的位置:

  • 访问URL的过程。网络IO的时间成本高。
  • 利用Jsoup去解析数据时,如果单个URL反馈的数据体量太大,串行解析的成本比较高。
  • URL可能重复,重复获取或者解析数据
  • URL体量比较大,每次解析完都单独落库的话,对应MySQL的压力不小,而且多次找MySQL的网络IO,磁盘IO成本都很高。

问题有了,解决起来就方便了。

  • 解决URL的网络IO成本,那就是直接线程池并行去访问URL,避免单个线程阻塞长时间等待。多个线程并行去访问URL。
  • 每个线程在获取的URL之后,根据单个URL的数据体量考虑,是否需要多个线程并行的利用Jsoup去解析,这里属于CPU密集的业务类型,线程个数不宜太高,当然如果单个URL数据体量不大,这里不需要并行去解析。
  • 可以在获取URL之后,甚至是发起请求到URL前,先利用Redis或者本地的CHM,确保访问一次即可,减少重复访问的问题。
  • 可以让每个解析完数据的线程,将数据扔到一个阻塞队列里,当数据个数达到一定程度,比如500,比如1000等等,根据具体的情况,做批量入库操作。

还可以再考虑点事:

  • 做过爬虫的同学们都知道,不能随便爬,最好看看平台提供的rebots文件,确保满足对方的协议。。。
  • 毕竟是爬数据嘛,请求的体量不要太大,避免人家有反扒机制,封你IP,不过封IP也不怕,咱们可以上代理高动态IP。
  • 比如访问URL失败,做好重试或者是记录的一些操作。

# 5、描述如何在Java应用中使用多线程执行数据库的批量插入或更新操作,以提高数据库的写入效率(猫眼)

核心就是任务划分,多线程并行执行批量插入操作。

1、你需要根据你的数据集大小以及系统资源情况来合理的划分任务。比如你有10W的数据需要插入,咱们每个线程批量插入1000条数据,那就可以家境任务拆分成100份,让每个线程每次批量插入1000条数据即可。

2、准备线程池,内部的线程个数要根据MySQL服务器那边的系统资源考虑,插入操作IO涉及的多一些,CPU资源还好,内存资源和连接数需要重点考虑。

可以提前测试批量插入,查看一下MySQL所在服务的负载情况,主要查看越好CPU和内存的资源,不要达到太高的负载。。

3、同样,Java端也需要考虑好线程池中的资源,比如阻塞队列,数据体量如果太大,内存能否抗住。线程数也要考虑好CPU的资源情况。

一样,也是要做好压测的事情。

4、准备好线程池,准备好任务,干活!!!!


毕竟批量跟数据库操作,而且是多线程,需要考虑好死锁的问题。

建议事务的隔离级别,上read committed。可以规避掉好多修改和插入并行的死锁问题。。。