V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
shayang888
V2EX  ›  Java

ThreadPoolTaskScheduler 怎么用多线程去跑任务呢

  •  
  •   shayang888 · 2018-12-27 13:44:04 +08:00 · 4193 次点击
    这是一个创建于 1937 天前的主题,其中的信息可能已经有所发展或是发生改变。
    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
    private ScheduledFuture<?> future;
    
    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    return new ThreadPoolTaskScheduler();
    }
    
    传任务的 id,然后开启对应的任务
    public void taskStartService(int id){
            try {
                Optional<Task> taskOptional = taskRepository.findById(id);
                if (taskOptional.isPresent()) {
                    future = threadPoolTaskScheduler.schedule(() -> {
                            System.out.println("task-" + id + ", " + Thread.currentThread().getName() + "-" + Thread.currentThread().getId());
                            threadMap.put(id, future);
                        }
                    }, new CronTrigger(taskOptional.get().getTaskTime()));
        }
    

    然后我启动了 2 个任务,打印出来确实 2 个任务都在运行,但是为什么线程打印出来只有 1 个线程在跑呢

    task-1, threadPoolTaskScheduler-1-54
    task-2, threadPoolTaskScheduler-1-54
    

    实在是不会了,请大佬给点提示

    第 1 条附言  ·  2018-12-27 19:33:04 +08:00

    我把线程的问题已经解决了,设置了poolsize,但是为什么我打印future得到的结果是一样的?而且在stopService里cancel方法并没有按我传递的任务id来停止对应的任务

    @Bean
        public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
            threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            threadPoolTaskScheduler.setPoolSize(10);
            threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
            return threadPoolTaskScheduler;
        }
    public void taskStartService(int id){
                Optional<Task> taskOptional = taskRepository.findById(id);
                if (taskOptional.isPresent()) {
                    future = threadPoolTaskScheduler.schedule(() -> {
                        System.out.println("task-" + id + ", " + Thread.currentThread().getName() + "-" + Thread.currentThread().getId() + ", " + future);
                    }, new CronTrigger(taskOptional.get().getTaskTime()));
        }
    public void taskStopService(int id){
            if (taskRepository.findById(id).isPresent()) {
                if (future != null) {
                    future.cancel(true);
                }
        }
    
    31 条回复    2018-12-28 18:27:27 +08:00
    wccc
        1
    wccc  
       2018-12-27 15:52:00 +08:00   ❤️ 1
    @Bean(destroyMethod = "shutdown")
    public ThreadPoolTaskScheduler scheduledThreadPool() {
    ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
    scheduler.setThreadNamePrefix("scheduled-thread-");
    scheduler.setPoolSize(10);
    //等待任务完成后关闭
    scheduler.setWaitForTasksToCompleteOnShutdown(true);
    //最多等待 60s
    scheduler.setAwaitTerminationSeconds(60);
    scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    scheduler.initialize();
    return scheduler;
    }
    定义一个线程池
    默认实现只有一个线程的线程池
    shayang888
        2
    shayang888  
    OP
       2018-12-27 16:25:42 +08:00
    @wccc
    你好,谢谢大佬,多个线程执行的问题我已经解决了,不过很奇怪为什么我在停止一个任务的时候 发现都停止了,然后我把 future 打印出来 发现是一摸一样的两个 task,这是怎么回事呢
    ```
    task-2, threadPoolTaskScheduler-1-53, DelegatingErrorHandlingRunnable for com.apitest.service.TaskService$$Lambda$1086/0x0000000800a37840@3ebb0180
    task-1, threadPoolTaskScheduler-2-78, DelegatingErrorHandlingRunnable for com.apitest.service.TaskService$$Lambda$1086/0x0000000800a37840@3ebb0180
    ```
    这是我启动的两个任务,我在 startService 里打印了这两个 future 为什么这个值是一样的?,这样的话 我如果想停止某个任务,让另一个任务不受影响继续运行该怎么办了
    wccc
        3
    wccc  
       2018-12-27 16:29:08 +08:00   ❤️ 1
    建议打印 task 中执行日志
    shayang888
        4
    shayang888  
    OP
       2018-12-27 16:32:49 +08:00
    @wccc
    future = threadPoolTaskScheduler.schedule(() -> {
    System.out.println("task-" + id + ", " + Thread.currentThread().getName() + "-" + Thread.currentThread().getId() + ", " + future);
    }, new CronTrigger(taskOptional.get().getTaskTime()));
    就是这样的,我打印 future 就得到的是一样的,这样我在调用 future.cancel()的时候 整个就停掉了,可是我只想停止单个的
    wccc
        5
    wccc  
       2018-12-27 16:38:48 +08:00
    不了解 ThreadPoolTaskScheduler 用于定时任务多线程的
    多线程 线程池通常不用这个
    shayang888
        6
    shayang888  
    OP
       2018-12-27 16:40:10 +08:00
    @wccc 确实 我现在就是需要创建不同 cron 的任务来独自运行 或者你有比较好的办法吗
    wccc
        7
    wccc  
       2018-12-27 16:46:00 +08:00
    mq 延时消息
    kkjinping
        8
    kkjinping  
       2018-12-27 17:54:15 +08:00
    默认 private volatile int poolSize = 1;
    kkjinping
        9
    kkjinping  
       2018-12-27 17:55:27 +08:00
    需要手动调用 setPoolSize(int size)
    kkjinping
        10
    kkjinping  
       2018-12-27 18:00:54 +08:00   ❤️ 1
    @shayang888 feature 会不会是因为并发导致的,打印出来的都是最后一个。
    kkjinping
        11
    kkjinping  
       2018-12-27 18:02:13 +08:00   ❤️ 1
    @shayang888 threadMap.put(id, future);放到 lambda 外面可以吗
    kkjinping
        12
    kkjinping  
       2018-12-27 18:03:36 +08:00   ❤️ 1
    @shayang888 private ScheduledFuture<?> future; 改成方法内局部变量应该也行
    shayang888
        13
    shayang888  
    OP
       2018-12-27 19:13:15 +08:00
    @kkjinping 哇 谢谢大佬的回复 你有用过这个来做定时任务吗
    shayang888
        14
    shayang888  
    OP
       2018-12-27 19:16:34 +08:00
    @kkjinping 放在局部里不行的 我这后面还有个 stopTaskService(int id) 是用来指定任务的停止的, 不过没按照我的需求生效我不知道为什么
    public void taskStopService(int id){
    if (taskRepository.findById(id).isPresent()) {
    if (future != null) {
    future.cancel(true);
    }
    }
    DsuineGP
        15
    DsuineGP  
       2018-12-28 10:06:22 +08:00
    @shayang888 调用 future.cancel 只是向执行任务的线程发送中断指令,具体怎么响应这个指令需要在执行任务的线程中检测并作出相应的处理
    kkjinping
        16
    kkjinping  
       2018-12-28 10:16:02 +08:00
    @shayang888 threadMap.put(id, future); 你不是有这个吗,id 和 future 映射,那你关闭的时候就可以从 map 中拿到 future 了。所以可以把 future 放到局部变量。threadMap 是实例变量。
    shayang888
        17
    shayang888  
    OP
       2018-12-28 11:00:18 +08:00
    @kkjinping 我这么做了 可是并没有用 拿到的 future 在 map 里显示出来是一摸一样的 future.cancel 就全给取消了
    {1=DelegatingErrorHandlingRunnable for com.apitest.service.TaskService$$Lambda$1106/0x0000000800a92840@45f08f0a, 2=DelegatingErrorHandlingRunnable for com.apitest.service.TaskService$$Lambda$1106/0x0000000800a92840@45f08f0a}
    你看,future 的值是一摸一样的
    kkjinping
        18
    kkjinping  
       2018-12-28 11:56:27 +08:00
    @shayang888 你有改成局部变量吗,future。要两部结合起来。future 放到局部变量。threadMap 是实例变量。
    kkjinping
        19
    kkjinping  
       2018-12-28 11:56:56 +08:00
    @shayang888 threadMap.put(id, future);还要放到 lambda 外面
    kkjinping
        20
    kkjinping  
       2018-12-28 11:57:53 +08:00
    Optional<Task> taskOptional = taskRepository.findById(id);
    if (taskOptional.isPresent()) {
    future = threadPoolTaskScheduler.schedule(() -> {
    System.out.println("task-" + id + ", " + Thread.currentThread().getName() + "-" + Thread.currentThread().getId());

    }
    }, new CronTrigger(taskOptional.get().getTaskTime()));

    threadMap.put(id, future);
    }
    kkjinping
        21
    kkjinping  
       2018-12-28 11:58:52 +08:00
    Optional<Task> taskOptional = taskRepository.findById(id);
    if (taskOptional.isPresent()) {
    ScheduledFuture future = threadPoolTaskScheduler.schedule(() -> {
    System.out.println("task-" + id + ", " + Thread.currentThread().getName() + "-" + Thread.currentThread().getId());

    }
    }, new CronTrigger(taskOptional.get().getTaskTime()));

    threadMap.put(id, future);
    }
    shayang888
        22
    shayang888  
    OP
       2018-12-28 12:16:05 +08:00
    @kkjinping 对呀 我代码就是这样的 map 是全局的 future 是局部的 可是得到的 future 都是一样的 所以在执行 stopTaskService 的时候,future.cancel()就会把所有的任务都取消了
    threadMap.get(id).cancel()
    kkjinping
        23
    kkjinping  
       2018-12-28 14:24:46 +08:00   ❤️ 1
    @shayang888

    @Resource
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    private Map<Integer, Future> futureMap = new HashMap<>(16);

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler(){
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(10);
    return threadPoolTaskScheduler;
    }

    public void start(int id){
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.SECOND,5);
    ScheduledFuture future = threadPoolTaskScheduler.schedule(() -> {
    System.out.println(id + " " + Thread.currentThread().getName());
    }, calendar.getTime());
    futureMap.put(id,future);
    }

    public void stop(int id){
    Future future = futureMap.get(id);
    if(future!=null){
    future.cancel(true);
    }
    }


    我试了下没有问题,关闭不壶关全部的
    shayang888
        24
    shayang888  
    OP
       2018-12-28 14:47:42 +08:00
    @kkjinping 谢谢 按你说的弄好了 不过我很想知道为什么两个 future 的值是一样的 能告诉下吗 或者有相关文档吗 google 了一圈也没找到
    kkjinping
        25
    kkjinping  
       2018-12-28 14:51:38 +08:00   ❤️ 1
    @shayang888 你可以发下代码
    shayang888
        26
    shayang888  
    OP
       2018-12-28 15:48:41 +08:00
    @kkjinping 代码跟你的一样 我就是在 stop 方法里 把 futureMap 打印了一下 发现每个 value 都是一样的 你没发现吗
    kkjinping
        27
    kkjinping  
       2018-12-28 17:04:32 +08:00
    @shayang888 我打印发现不一样呀
    shayang888
        28
    shayang888  
    OP
       2018-12-28 17:15:26 +08:00
    @kkjinping 大佬 我的意思是 比如执行了 2 个 task 即传了两个不同的 id 在 start 方法里 然后在 start 方法里打印出 future 此时 1 的 value 和 2 的 value 是一样的
    kkjinping
        29
    kkjinping  
       2018-12-28 17:56:35 +08:00
    @shayang888 是按照我发的写的吗。我打印了不一样啊。
    shayang888
        30
    shayang888  
    OP
       2018-12-28 17:58:28 +08:00
    @kkjinping 大佬方便加个 qq 吗 请教你下
    kkjinping
        31
    kkjinping  
       2018-12-28 18:27:27 +08:00
    @shayang888 2290968582
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   1634 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 16:53 · PVG 00:53 · LAX 09:53 · JFK 12:53
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.