Хобрук: Ваш путь к мастерству в программировании

ExecutorService должен дождаться завершения пакета taksk перед повторным запуском

Эта проблема:

Я анализирую большой файл журнала (около 625_000_000 строк) и сохраняю его в базе данных.

public class LogScheduler {

static int fileNumber = 1;

public Importer(IRequestService service) {
    this.service = service;
}

@Override
public  void run() {
    try {
        service.saveAll(getRequestListFromFile("segment_directory/Log_segment_"+fileNumber+".txt"));
        
    } catch (IOException e) {
        e.printStackTrace();
    }

   }
}

Метод, который запускает этот поток:

 public void scheduledDataSave() throws InterruptedException {
    int availableCores = Runtime.getRuntime().availableProcessors();
    String directory = "segment_directory";

    int filesInDirectory = Objects.requireNonNull(new File(directory).list()).length;

    ExecutorService executorService = Executors.newFixedThreadPool(availableCores);

    for (int i = 1; i <= filesInDirectory; i++) {
        executorService.execute(new Importer(service));
    }
    executorService.shutdown();

    }

Вставка метода Thread.sleep(); после того, как executorService.execute(new Importer(service)); спит после выполнения каждого потока, а не 8 потоков, как должно, поскольку они находятся в Executorservice И я понятия не имею, почему это происходит, поскольку он не должен так себя вести. Насколько я понимаю, ExecutorService должен запускать 8 потоков параллельно, завершать их, спать и снова запускать пул.

Как спать после каждых 8 потоков?


  • The problem is the CPU spike - почему именно? 19.03.2021
  • @Eugene Я имею в виду постоянную 100% загрузку процессора. Я хотел бы найти способ уменьшить его и включить сон для этого. 19.03.2021
  • это может быть связано не с ExecutorService, а с потоками сборки мусора, которые необходимо очистить после всех этих файлов 19.03.2021
  • Ваш вопрос не ясен. Вместо того, чтобы говорить о том, как вы модифицировали показанный код, просто покажите нам код, который вы на самом деле запустили. И расскажите нам о своей цели, чего вы пытаетесь достичь, так как это нечетко и неясно. Я второй, кто проголосовал за закрытие как неясное. Если вопрос закрыт, внесите свой вклад в редактирование своего Вопроса, так как его можно открыть повторно, если недостатки будут исправлены. 19.03.2021
  • @BasilBourque спасибо за подсказки. Надеюсь, это более понятно 19.03.2021
  • Этот код все еще неполный. (a) Ваш первый код ссылается на this.service, но не показывает ни одного такого поля участника. (б) В вашей прозе все еще говорится о звонке Thread.sleep, но я не вижу такого звонка. См.: Как создать минимальный воспроизводимый пример 20.03.2021

Ответы:


1

Спящий поток, отправляющий задачи, не приостанавливает отправленные задачи

Ваш вопрос не ясен, но, по-видимому, он связан с вашим ожиданием того, что добавление Thread.sleep после каждого вызова executorService.execute приведет к спячке всех потоков службы-исполнителя.

    for ( int i = 1 ; i <= filesInDirectory ; i++ ) {
        executorService.execute( new Importer( service ) );   // Executor service assigns this task to one of the background threads in its backing pool of threads.
        Thread.sleep( Duration.ofMillis( 100 ).toMillis() ) ; // Sleeping this thread doing the looping. *Not* sleeping the background threads managed by the executor service.
    }

Ваше ожидание неверно.

Этот Thread.sleep спит в потоке, выполняющем цикл for.

Служба-исполнитель имеет собственный резервный пул потоков. На эти потоки не влияет Thread.sleep какой-то другой поток. Эти фоновые потоки будут спать только в том случае, если вы вызовете Thread.sleep внутри кода, работающего в каждом из этих потоков.

Итак, вы передаете первую задачу службе-исполнителю. Служба-исполнитель немедленно отправляет эту работу одному из своих вспомогательных потоков. Эта задача выполняется немедленно (если поток доступен немедленно и не занят предыдущими задачами).

После назначения этой задачи ваш цикл for спит в течение ста миллисекунд, в этом примере кода, показанном здесь. Пока цикл for находится в спящем режиме, службе-исполнителю не назначаются никакие другие задачи. Но пока цикл for спит, отправленная задача выполняется в фоновом потоке. Этот фоновый поток не спит.

В конце концов, ваш поток цикла for просыпается, назначает вторую задачу и снова засыпает. Тем временем фоновый поток выполняется на полной скорости впереди.

Таким образом, приостановка потока, отправляющего задачи, не приостанавливает уже отправленные задачи.

Ожидание выполнения отправленных задач

Ваш заголовок спрашивает:

ExecutorService должен дождаться завершения пакета taksk перед повторным запуском

После отправки задач вызовите shutdown и awaitTermination в службе-исполнителе. После этих вызовов ваш код блокируется, ожидая завершения/отмены/сбоя всех отправленных задач.

ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
… submit tasks to that executor service …
executorService.shutdown() ;
executorSerivce.awaitTermination() ;  // At this point, the flow-of-control blocks until the submitted tasks are done.
System.out.println( "INFO - Tasks on background threads are done. " + Instant.now() );

Я бы предложил использовать файл ExecutorService#submit вместо метода ExecutorService#execute. Разница в том, что первый метод возвращает объект Future. Вы можете собирать эти Future объекты при отправке задач службе-исполнителю. После shutdown и awaitTermination вы можете изучить свою коллекцию объектов Future, чтобы проверить статус их завершения.

Проект Ткацкий станок

Если Project Loom удастся, такой код будет немного проще и понятнее. Экспериментальные сборки технологии Project Loom доступны сейчас на основе раннего доступа к Java 17. Команда Loom ищет обратную связь сейчас.

В Project Loom ExecutorService становится AutoCloseable. Это означает, что мы можем использовать синтаксис try-with-resources для автоматического вызова нового метода close для ExecutorService. Этот close метод сначала блокируется до тех пор, пока все задачи не будут завершены/отменены/сбоем, а затем отключает службу-исполнитель. Нет необходимости звонить shutdown или awaitTermination.

Кстати, в Project Loom тоже есть виртуальные нити (волокна). Это, вероятно, значительно повысит производительность вашего кода, потому что это требует большого количества блокировок для ввода-вывода в хранилище и доступа к базе данных.

try (
        ExecutorService executorService = Executors.newVirtualThreadExecutor() ;
)
{
    … submit tasks to that executor service …
}
// At this point, with Project Loom technology, the flow-of-control blocks until the submitted tasks are done.
// Also, the `ExecutorService` is automatically closed/shutdown by this point, via try-with-resources syntax.
System.out.println( "INFO - Tasks on background threads are done. " + Instant.now() );

С помощью Project Loom вы можете собрать возвращенные Future объектов таким же образом, как описано выше, чтобы проверить статус завершения.


У вас есть другие проблемы в вашем коде. Но вы не раскрыли достаточно, чтобы ответить на них все.

19.03.2021

2

Как спать после каждых 8 потоков?

Так что, если вы делаете что-то подобное, то это не то, что вы думаете.

for (int i = 1; i <= filesInDirectory; i++) {
    executorService.execute(new Importer(service));
    Thread.sleep(...);
}

Это приводит к тому, что поток, который запускает фоновые задания, приостанавливается и не влияет на выполнение каждого из заданий. Я считаю, что вам не хватает дождаться завершения пула потоков:

 executorService.shutdown();
 executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

Это ожидает завершения всех заданий в пуле потоков, прежде чем продолжить.

Еще кое-что. Я использую executorService.submit(...) вместо execute(...). Вот описание их различий. Для меня еще одно отличие состоит в том, что любые исключения, создаваемые задачами, запущенными с execute(...), приводят к завершению текущего потока и, возможно, к его перезапуску. С submit(...) это позволяет вам получить это исключение, если это необходимо, и предотвращает ненужное повторное создание потоков.

Если вы объясните немного больше о том, что вы пытаетесь достичь, мы должны быть в состоянии помочь.

20.03.2021
Новые материалы

HMTL - Многозадачное обучение для решения задач НЛП
Достижение результатов SOTA путем передачи знаний между задачами Область обработки естественного языка включает в себя десятки задач, среди которых машинный перевод, распознавание именованных..

Решения DBA Metrix
DBA Metrix Solutions предоставляет удаленного администратора базы данных (DBA), который несет ответственность за внедрение, обслуживание, настройку, восстановление базы данных, а также другие..

Начало работы с Блум
Обзор и Codelab для генерации текста с помощью Bloom Оглавление Что такое Блум? Некоторые предостережения Настройка среды Скачивание предварительно обученного токенизатора и модели..

Создание кнопочного меню с использованием HTML, CSS и JavaScript
Вы будете создавать кнопочное меню, которое имеет состояние наведения, а также позволяет вам выбирать кнопку при нажатии на нее. Финальный проект можно увидеть в этом Codepen . Шаг 1..

Внедрите OAuth в свои веб-приложения для повышения безопасности
OAuth — это широко распространенный стандарт авторизации, который позволяет приложениям получать доступ к ресурсам от имени пользователя, не раскрывая его пароль. Это позволяет пользователям..

Классы в JavaScript
class является образцом java Script Object. Конструкция «class» позволяет определять классы на основе прототипов с чистым, красивым синтаксисом. // define class Human class Human {..

Как свинг-трейдеры могут использовать ИИ для больших выигрышей
По мере того как все больше и больше профессиональных трейдеров и активных розничных трейдеров узнают о возможностях, которые предоставляет искусственный интеллект и машинное обучение для улучшения..