Многопоточность в C++. Защёлки и барьеры (latches and barriers)
В C++20 в стандартной библиотеке появились барьеры.
Защелки (latches) и барьеры (barriers) – это механизм синхронизации потоков, который позволяет блокировать любое количество потоков до тех пор, пока ожидаемое количество потоков не достигнет барьера. Защелки нельзя использовать повторно, барьеры можно использовать повторно.
Эти механизмы синхронизации используются, когда выполнение параллельного алгоритма можно разделить на несколько этапов, разделённых барьерами. В частности, с помощью барьера можно организовать точку сбора частичных результатов вычислений, в которой подводится итог этапа вычислений. Например, если стоит задача отфильтровать изображение с помощью двух разных фильтров, и разные потоки фильтруют разные части изображения, то перед началом второй фильтрации следует дождаться, когда первая фильтрация будет полностью завершена, то есть все потоки должны дойти до барьера между двумя этапами фильтрации.
Барьер для группы потоков в исходном коде означает, что каждый поток должен остановиться в этой точке и подождать достижения барьера другими потоками группы. Когда все потоки достигли барьера, их выполнение продолжается.
std::latch
std::latch – это уменьшающийся счетчик. Значение счетчика инициализируется при создании. Потоки уменьшают значение счётчика и блокируются на защёлке до тех пор, пока счетчик не уменьшится до нуля. Нет возможности увеличить или сбросить счетчик, что делает защелку одноразовым барьером.
В отличие от std::barrier , std::latch может быть уменьшен одним потоком более одного раза.
Использовать защёлки очень просто. В нашем распоряжении несколько методов:
- count_down(value) уменьшает значение счётчика на value (по умолчанию 1) без блокировки потока. Если значение счётчика становится отрицательным, то поведение не определено.
- try_wait() позволяет проверить, не достигло ли значение счётчика нуля. С низкой вероятностью может ложно возвращать false .
- wait() блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика уже равно 0, то управление возвращается немедленно.
- arrive_and_wait(value) уменьшает значение счётчика на value (по умолчанию 1) и блокирует текущий поток до тех пор, пока счётчик не достигнет нулевого значения. Если значение счётчика становится отрицательным, то поведение не определено.
std::barrier
Используется почти так же, как std::latch , но является многоразовым: как только ожидающие потоки разблокируются, значение счётчика устанавливается в начальное, и тот же самый барьер может быть использован повторно.
Работу барьера можно разбить на фазы. Фаза заканчивается, когда счётчик барьера обнуляется, затем начинается новая фаза. Фазы работы имеют идентификаторы, которые возвращаются некоторыми методами. Это нужно для того, чтобы мы не ждали конца фазы, которая уже завершена.
Синхронизаторы пакета concurrent
Объекты синхронизации Synchroniser из пакета java.util.concurrent включают :
Semaphore | объект синхронизации, ограничивающий количество потоков, которые могут «войти» в заданный участок кода; |
CountDownLatch | объект синхронизации, разрешающий вход в заданный участок кода при выполнении определенных условий; |
CyclicBarrier | объект синхронизации типа «барьер», блокирующий выполнение определенного кода для заданного количества потоков; |
Exchanger | объект синхронизации, позволяющий провести обмен данными между двумя потоками; |
Phaser | объект синхронизации типа «барьер», но в отличие от CyclicBarrier, предоставляет больше гибкости. |
Объект синхронизации Semaphore
Как было отмечено выше, Semaphore ограничивает доступ к определенному участку кода, иначе говоря, к общему ресурсу, в качестве которого могут выступать программые/аппаратные ресурсы или файловая система.
Для управления доступом к общему ресурсу Semaphore использует счетчик. Если значение счетчика больше нуля, то поток исполнения получает разрешение, после чего значение счетчика семафора уменьшается на единицу. При значении счетчика равным нулю очередному потоку исполнения в доступе будет отказано, и он будет заблокирован до тех пор, пока не будут освобождены ресурсы.
Как только один из потоков исполнения освободит ресурсы, т.е. завершит исполнение определенного участка кода, то значение счетчика семафора увеличивается на единицу. Если в это время имеется ожидающий разрешения другой поток исполнения, то он сразу же его получает.
Конструкторы Semaphore
Класс Semaphore имеет два приведенных ниже конструктора :
Параметр permits определяет исходное значение счетчика разрешений, т.е. количество потоков исполнения, которым может быть одновременно предоставлен доступ к общему ресурсу. По умолчанию ожидающим потокам предоставляется разрешение в неопределенном порядке. Если же использовать второй конструктор и параметру справедливости fair присвоить значение true, то разрешения будут предоставляться ожидающим потокам исполнения в том порядке, в каком они его запрашивали.
Метод получения разрешения acquire
Чтобы получить у семафора разрешение необходимо вызвать у него один из перегруженных методов acquire :
Первый метод без параметра запрашивает одно разрешение, а второй в качестве параметра использует количество разрешений. Обычно используется первый метод. Если разрешение не будет получено, то исполнение вызывающего потока будет приостановлено до тех пор, пока не будет получено разрешение, т.е. поток блокируется.
Освобождение ресурса
Чтобы освободить разрешение у семафора следует вызвать у него один из перегруженных методов release :
В первом методе освобождается одно разрешение, а во втором — количество разрешений, обозначенное параметром number.
Пример использования Semaphore
В примере несколько всадников с лошадьми должны пройти контроль перед скачками. Количество контроллеров меньше количества всадников, поэтому некоторые всадники будут дожидаться, пока не освободиться один из контроллеров.
Общий ресурс CONTROL_PLACES, символизирующий контролеров и используемый всеми потоками, выделен оператором synchronized. С помощью семафора осуществляется контроль доступа только одному потоку.
Результат выполнения примера с семафором
Обратите внимание, что «всадник 3» завершил проверку, после чего «всадник 6» вошел в блокируемый участок кода. А вот «всадник 7» успел раньше вывести сообщение о входе в блокируемый участок кода, чем «всадник 1» сообщил о его «покидании».
Объект синхронизации CountDownLatch
Объект синхронизации потоков CountDownLatch представляет собой «защелку с обратным отсчетом» : несколько потоков, выполняя определенный код, блокируются до тех пор, пока не будут выполнены заданные условия. Количество условий определяются счетчиком. Как только счетчик обнулится, т.е. будут выполнены все условия, самоблокировки выполняемых потоков снимаются, и они продолжают выполнение кода.
Таким образом, CountDownLatch также, как и Semaphore, работает со счетчиком, обнуление которого снимает самоблокировки выполняемых потоков. Конструктор CountDownLatch :
Параметр number определяет количество событий, которые должны произойти до того момента, когда будет снята самоблокировка.
Метод самоблокировки await
CountDownLatch имеет два перегруженных метода await для самоблокировки :
В первом методе ожидание длится до тех пор, пока счетчик CountDownLatch не достигнет нуля. А во втором методе ожидание длится только в течение определенного периода времени, определяемого параметром ожидание wait. Время ожидания указывается в единицах unit объекта перечисления TimeUnit, определяюший временно́е разбиение. Существуют следующие значения данного перечисления :
- DAYS
- HOURS
- MINUTES
- SECONDS
- MICROSECONDS
- MILLISECONDS
- NANOSECONDS
Метод уменьшения счетчика countDown
Чтобы уменьшить счетчик объекта CountDownLatch следует вызвать метод countDown :
Примером CountDownLatch может служить паром, собирающий определенное количество транспорта и пассажиров, или экскурсовод, собирающий группу из заданного количества туристов.
Пример использования CountDownLatch
В примере несколько всадников должны подъехать к барьеру. Как только все всадники выстроятся перед барьером, будут даны команды «На старт», «Внимание», «Марш». После этого барьер опустится и начнутся скачки. Объект синхронизации CountDownLatch выполняет роль счетчика количества всадников и команд.
При «выходе на старт» поток вызывает методы countDown, уменьшая значение счетчика на 1, и await, блокируя самого себя в ожидании обнуления счетчика «защелки». Как только все потоки выстроятся на «старте» с интервалом в 1 сек. подаются команды. Каждая команда сопровождается уменьшением счетчика. После обнуления счетчика «защелки» потоки продолжают выполнение дальнейшего кода.
Результат выполнения примера
Объект синхронизации CyclicBarrier
Объект синхронизации CyclicBarrier представляет собой барьерную синхронизацию, используемую, как правило, в распределённых вычислениях. Особенно эффективно использование барьеров при циклических расчетах. При барьерной синхронизации алгоритм расчета делят на несколько потоков. С помощью барьера организуют точку сбора частичных результатов вычислений, в которой подводится итог этапа вычислений.
В исходном коде барьер для группы потоков означает, что каждый поток должен остановиться в определенном месте и ожидать прихода остальных потоков группы. Как только все потоки достигнут барьера, их выполнение продолжится.
Класс CyclicBarrier имеет 2 конструктора :
В первом конструкторе задается количество потоков, которые должны достигнуть барьера, чтобы после этого одновременно продолжить выполнение кода. Во втором конструкторе дополнительно задается реализующий интерфейс Runnable класс, который должен быть запущен после прихода к барьеру всех потоков. Поток запускать самостоятельно НЕ НУЖНО. CyclicBarrier это делает автоматически.
Для указания потоку о достижении барьера нужно вызвать один из перегруженных методов await :
Назначение параметров wait и unit у второго метода описано выше (см. CountDownLatch).
Циклический барьер CyclicBarrier похож на CountDownLatch. Главное различие между ними связано с тем, что «защелку» нельзя использовать повторно после того, как её счётчик обнулится, а барьер можно использовать (в цикле). С точки зрения API циклический барьер CyclicBarrier имеет только метод самоблокировки await и не имеет метода декрементации счетчика, а также позволяет подключить и автоматически запускать дополнительный потоковый класс при достижении барьера всех исполняемых потоков.
Пример использования CyclicBarrier
В примере организуется переправа. Паром может вместить только 3 автомобиля. Количество автомобилей 9. Роль парома выполняет объект синхронизации FerryBarrier, которому в качестве второго параметра передается реализующий интерфейс Runnable класс FerryBoat. Как только 3 потока достигнут барьера автоматически будет запущен FerryBoat, после завершения работы которого потоки продолжают свою работу.
Обратите внимание, что потоки подходят к барьеру с интервалом в 400 ms. Время задержки у барьера/переправы (после того, как собралось необходимое количество потоков), составляет 500 ms, если не считать время вывода сообщений в консоль. За это время к барьеру успевает подойти еще один поток. Что мы и видим при выводе сообщений в консоль.
Результат выполнения примера
Варьируя временем на переправе и временем прихода автомобилей на переправу, можно либо заставить паром простаивать, либо будут простаивать автомобили на переправе.
Объект синхронизации Exchanger
Класс Exchanger (обменник) предназначен для упрощения процесса обмена данными между двумя потоками исполнения. Принцип действия класса Exchanger связан с ожиданием того, что два отдельных потока должны вызвать его метод exchange. Как только это произойдет, Exchanger произведет обмен данными, предоставляемыми обоими потоками.
Обменник является обобщенным классом, он параметризируется типом объекта передачи :
Необходимо отметить, что обменник поддерживает передачу NULL значения, что дает возможность использовать его для передачи объекта в одну сторону или места синхронизации двух потоков.
Exchanger содержит перегруженный метод exchange, имеющий следующие формы :
Параметр buffer является ссылкой на обмениваемые данные. Метод возвращает данные из другого потока исполнения. Вторая форма метода позволяет определить время ожидания. Параметры wait и unit описаны выше. Метод exchange, вызванный в одном потоке, не завершится успешно до тех пор, пока он не будет вызван из второго потока исполнения.
Пример использования Exchanger
В примере использования объекта синхронизации Exchanger два почтальона из пунктов А и Б отправляются в соседние поселки В и Г доставить письма. Каждый из почтальонов должен доставить по письму в каждый из поселков. Чтобы не делать лишний круг, они встречаются в промежуточном поселке Д и обмениваются одним письмом. В результате этого каждому из почтальонов придется доставить письма только в один поселок. В примере все «шаги» почтальонов фиксируются выводом соответствующих сообщений в консоль.
Результат выполнения примера
В консоль будет выведена следующая информация :
Объект синхронизации Phaser
Phaser (фазировщик), как и CyclicBarrier, является реализацией объекта синхронизации типа «Барьер» (CyclicBarrier). В отличии от CyclicBarrier, Phaser предоставляет больше гибкости. Чтобы лучше понять Phaser, можно привести два наглядно демонстрирующих его использование примера.
В качестве первого примера можно рассмотреть несколько потоков исполнения, реализующих процесс обработки заказов из трех стадий. На первой стадии отдельные потоки исполнения проверяют сведения о клиенте, наличие товара на складе и их стоимость. На второй стадии вычисляется стоимость заказа и стоимость доставки. На заключительной стадии подтверждается оплата и определяется ориентировочное время доставки. Во втором примере несколько потоков реализуют перевозку пассажиров городским транспортом. Пассажиры ожидают транспорт на разных остановках. Транспорт, останавливаясь на остановках, одних пассажиров «сажает», других «высаживает».
В этих примерах общим является то, что один объект синхронизации Phaser, исполняющий роль заказа и транспорта, играет главную роль, а другие потоки вступают в работу при определенном состоянии Phaser. Таким образом, класс Phaser позволяет определить объект синхронизации, ожидающий завершения определенной фазы. После этого он переходит к следующей фазе и снова ожидает ее завершения.
Важные особенности Phaser :
- Phaser может иметь несколько фаз (барьеров). Если количество фаз равно 1, то плавно переходим к CyclicBarrier (осталось только все исполнительные потоки остановить у барьера).
- Каждая фаза (цикл синхронизации) имеет свой номер.
- Количество участников-потоков для каждой фазы жестко не задано и может меняться. Исполнительный поток может регистрироваться в качестве участника и отменять свое участие;
- Исполнительный поток не обязан ожидать, пока все остальные участники соберутся у барьера. Достаточно только сообщить о своем прибытии.
Для создания объекта Phaser используется один из конструкторов :
Параметр parties определяет количество участников, которые должны пройти все фазы. Первый конструктор создает объект Phaser без каких-либо участников. Второй конструктор регистрирует передаваемое в конструктор количество участников. Третий и четвертый конструкторы дополнительно устанавливают родительский объект Phaser.
При создании экземпляр класса Phaser находится в нулевой фазе. В очередном состоянии (фазе) синхронизатор находится в ожидании до тех пор, пока все зарегистрированные потоки не завершат данную фазу. Потоки извещают об этом, вызывая один из методов arrive() или arriveAndAwaitAdvance().
Методы объекта синхронизации Phaser
Метод | Описание |
---|---|
int register() | Метод регистририрует участника и возвращает номер текущей фазы. |
int arrive() | Метод указывает на завершения выполнения текущей фазы и возвращает номер фазы. Если же работа Phaser закончена, то метод вернет отрицательное число. При вызове метода arrive поток не приостанавливается, а продолжает выполняться. |
int arriveAndAwaitAdvance() | Метод вызывается потоком/участником, чтобы указать, что он завершил текущую фазу. Это аналог метода CyclicBarrier.await(), сообщающего о прибытии к барьеру. |
int arriveAndDeregister() | Метод arriveAndDeregister сообщает о завершении всех фаз участником и снимается с регистрации. Данный метод возвращает номер текущей фазы или отрицательное число, если Phaser завершил свою работу |
int getPhase() | Получение номера текущей фазы. |
Следующее анимационное изображение наглядно демонстрирует работу объекта синхронизации Phaser — участник регистрируется в определенной фазе синхронизатора; при переходе синхронизатора в заданное состояние (фазу) участник снимается с регистрации. Количество участников в разных фазах синхронизатора может отличаться.
Пример использования Phaser
В примере PhaserExample создается несколько потоков, играющих роль пассажиров. Phaser играет роль метро, которое должно проследовать вдоль нескольких станций. Каждая станция (фаза) имеет свой номер. Класс Passenger играет роль пассажира, который на одной из станции должен зайти в вагон, а на другой выйти. Количество пассажиров, а также их места посадки и высадки, формируются случайным образом.
Листинг класса Passenger
Конструктор класса Passenger получает значение идентификатора, номера станций посадки и назначения (высадки). При создании объекта в консоль выводится информация о пассажире (метод toString).
Как только Phaser переходит в определенную фазу, номер которой соответствует станции посадки пассажира, то поток данного Passenger стартует (run) и выводит в консоль сообщение, что пассажир вошел в вагон, т.е. находится в ожидании следующей станции/фазы (arriveAndAwaitAdvance). Если следующая станция/фаза не будет соответствовать станции назначения, то Passenger продолжит свой путь. Как только Phaser перейдет в фазу, номер которой соответствует номеру станции назначания пассажира, то цикл контроля завершится и поток продолжит работу. С задержкой в 500 ms он сообщит, что покинул вагон и отменит регистрацию в Phaser (arriveAndDeregister).
Таким образом, поток/пассажир дожидается свой фазы/станции в цикле, выделенной в коде пунктирными комментариями. Вызов метода arriveAndAwaitAdvance возвращает значение следующего номера фазы, т.е. участник будет вызван при переходе Phaser в новое состояние. Если в этом состоянии значение фазы (getPhase) будет соответствовать номеру destination, то цикл прервется, в противном случае, ожидание следующей фазы и повторное выполнение проверки условия while.
Примечание : Passenger является внутренним классом примера/класса PhaserExample, и для описания вынесен из общего кода, чтобы не загромождать листинг.
Листинг примера PhaserExample
В примере сначала создается объект синхронизации PHASER. После этого формируется массив пассажиров. При создании объекта Passenger случайным образом определяются станция посадки и станция назначения. После того, как массив пассажиров подготовлен, PHASER в цикле начинает менять свое состояние. На каждом шаге выполняется проверка «станции посадки пассажира». Если она соответствует значению фазы, то данный пассажир входит в вагон метро, т.е. регистрируется в PHASER и поток стартует. Таким образом, регистрация участников (исполнительных потоков) выполняется при нахождении PHASER в определенном состоянии/фазе. Пассажир покинет вагон при достижении метро станции назначения, т.е. при нахождении PHASER в соответствующей фазе. Но это произойдет уже в коде класса Passenger, рассмотренного выше.
Introduction
Barrier synchronizers (barriers) are a kind of synchronizer that ensures that any threads must stop at a certain point and cannot proceed further until all other threads reach this point.
By purpose, barriers can be grouped into the following categories:
- entry barriers, that prevents threads from starting processing
- exit barriers, that waiting for all threads to finish processing
Barriers also can be grouped by the number of iterations (one-time or cyclic) and by the number of parties/threads (fixed or variable).
In Java 7+ there are 3 predefined barrier classes: CountDownLatch, CyclicBarrier, Phaser.
The CountDownLatch class
The CountDownLatch class is a one-time barrier that allows threads to wait until the given count of operations is performed in other threads.
A latch is initialized with a given count. The await methods (waiting and timed waiting) wait until the current count reaches 0 due to calls of the countDown() method. After that, all waiting threads are released, and any subsequent calls of the await methods return immediately.
Threads registration
The CountDownLatch(int count) constructor creates a latch with the given count. The current count cannot be reset without recreating a new latch object.
Threads waiting
The void await() method causes the current thread to wait until one of the events occurs:
- the latch has counted down to 0 due to calls of the countDown() method
- the thread is interrupted
If the current count is 0 then this method returns immediately.
The boolean await(long timeout, TimeUnit unit) method causes the current thread to wait until one of the events occurs:
- the given timeout elapses
- the latch has counted down to 0 due to calls of the countDown() method
- the thread is interrupted
The method returns true if the current count reached 0 and false if the timeout elapsed before the current count reached 0. If the current count is 0 then this method returns true immediately.
Threads arrival
The countDown() method decrements the current count, releasing all waiting threads if the count reaches 0. If the current count equals 0 then nothing happens.
Latch monitoring
The long getCount() method returns the current count of the latch.
Example
In the example are used 2 latches: first as a one-time entry barrier, second as a one-time exit barrier.
The CyclicBarrier class
The CyclicBarrier class is a reusable synchronization barrier that allows threads to wait for each other at a certain point.
A barrier is initialized with a given number of threads. The await methods (waiting and timed waiting) wait until all threads reach the barrier. Then all threads trip the barrier, and the barrier is automatically reset for the next cycle.
Threads registration
The CyclicBarrier(int parties) constructor creates a new barrier that will trip when the given number of threads are waiting upon it.
The CyclicBarrier(int parties, Runnable barrierAction) constructor creates a new barrier that will trip when the given number of threads are waiting upon it. When the barrier is tripped, the given barrier action will be performed by the last thread entering the barrier.
Threads arrival and waiting
The int await() method causes the current thread to wait until one of the events occurs:
- the last thread arrives at the barrier
- the barrier is broken (by the reasons described below)
If the barrier is broken, then depending on the reason InterruptedException, BrokenBarrierException are thrown.
The int await(long timeout, TimeUnit unit) method causes the current thread to wait until one of the events occurs:
- the given timeout elapses
- the last thread arrives at the barrier
- the barrier is broken (by the reasons described below)
If the specified timeout elapses, then a TimeoutException is thrown. If the barrier is broken, then depending on the reason InterruptedException, BrokenBarrierException are thrown.
The await method returns the arrival index of the current thread.
Barrier reset
The void reset() method resets the barrier to its initial state. If any threads are waiting at the barrier on the await methods, the methods will throw a BrokenBarrierException.
Barrier monitoring
The int getParties() method returns the number of parties required to trip the barrier.
The int getNumberWaiting() method returns the number of parties currently waiting at the barrier.
The boolean isBroken() method returns true if this barrier has been broken by one of the reasons:
- interruption
- timeout elapsing
- calling the reset() method
- the barrier action failure due to an exception
Example
In the example are used 2 barriers: first as a cyclic entry barrier, second as a cyclic exit barrier.
The Phaser class
The Phaser class is a reusable barrier that allows a variable number of parties/threads. Because of this, it’s more flexible, however much more complicated.
To support a variable number of parties, a phaser contains the number of registered, arrived, and unarrived parties. The number of registered parties always equals the sum of the numbers of arrived and unarrived parties (registered==arrived+unarrived). To support cyclic iterations, a phaser contains a number of the current phase.
Parties registration
The Phaser() constructor creates a phaser with initial phase number 0 and no registered parties (phase=0, registered=0). The Phaser(int parties) constructor creates a phaser with initial phase number 0 and the given number of registered parties (phase=0, registered=parties).
The int register() method adds an unarrived party to the phaser (registered++). The int bulkRegister(int parties) method adds the given number of unarrived parties to the phaser (registered+=parties). These methods return the arrival phase number to which this registration is applied.
Parties synchronization
The int arrive() method marks a party arriving at the phaser, without waiting for other parties to arrive (arrived++, unarrived — ).
The int awaitAdvance(int phase) method awaits the phase of the phaser to advance from the given phase number. The method returns immediately if the current phase number is not equal to the given phase number.
The int arriveAndAwaitAdvance() marks a party arriving at the phaser and awaits other parties to arrive (arrived++, unarrived — ).
The int arriveAndDeregister() method marks a party arriving at the phaser and deregisters from it without waiting for other parties to arrive (registered — , arrived++, unarrived — ).
The arrive, arriveAndAwaitAdvance, arriveAndDeregister methods return the arrival phase number. The awaitAdvance method returns the next arrival phase number.
Phases iterations
The current phase is finished when all registered parties arrive (registered==arrived, unarrived==0). To decide whether to start the next phase or to terminate the phaser is used the protected boolean onAdvance(int phase, int registeredParties) method.
If the onAdvance method returns true, then the phaser is terminated (phase<0, terminated=true). If the onAdvance method returns false, then the phaser starts a new phase (phase++, arrived=0, unarrived=registered). The onAdvance method can also be used to perform a barrier action.
By default the onAdvance method returns true when the number of registered parties has become 0 as the result of calls the arriveAndDeregister method:
The overridden onAdvance method for one-time process:
The overridden onAdvance method for infinite iterations:
The overridden onAdvance method for maxPhase iterations:
Phaser termination
Phaser is terminated automatically when the onAdvance method returns true. It’s possible to terminate the phaser manually by calling the forceTermination() method.
The arrive, awaitAdvance, arriveAndAwaitAdvance, arriveAndDeregister methods return negative values if the phaser has already terminated.
Phaser monitoring
The methods to monitor parties numbers:
- int getRegisteredParties() — returns the number of parties registered at the phaser
- int getArrivedParties() — returns the number of registered parties that have arrived at the current phase of the phaser
- int getUnarrivedParties() — returns the number of registered parties that have not yet arrived at the current phase of the phaser
The int getPhase() method returns the current phase number.
The boolean isTerminated() method returns true if this phaser has been terminated.
Examples
In the example are used the basic phaser methods.
In the example, a phaser is used to implement a one-time entry barrier.
In the example, a phaser is used to implement one-time entry and exit barriers.
In the example, a phaser is used to implement cyclic entry and exit barriers.
Conclusion
The CountDownLatch class is suitable for one-time iteration with a fixed number of parties.
The CyclicBarrier class is suitable for one-time and cyclic iterations with a fixed number of parties.
The Phaser class is suitable for one-time and cyclic iterations with a variable number of parties. It also can be used with a fixed number of parties, however, it is an excess.
Справочник по синхронизаторам java.util.concurrent.*
Целью данной публикации не является полный анализ синхронизаторов из пакета java.util.concurrent. Пишу её, прежде всего, как справочник, который облегчит вхождение в тему и покажет возможности практического применения классов для синхронизации потоков (далее поток = thread).
В java.util.concurrent много различных классов, которые по функционалу можно поделить на группы: Concurrent Collections, Executors, Atomics и т.д. Одной из этих групп будет Synchronizers (синхронизаторы).
Синхронизаторы – вспомогательные утилиты для синхронизации потоков, которые дают возможность разработчику регулировать и/или ограничивать работу потоков и предоставляют более высокий уровень абстракции, чем основные примитивы языка (мониторы).
Semaphore
Синхронизатор Semaphore реализует шаблон синхронизации Семафор. Чаще всего, семафоры необходимы, когда нужно ограничить доступ к некоторому общему ресурсу. В конструктор этого класса ( Semaphore(int permits) или Semaphore(int permits, boolean fair) ) обязательно передается количество потоков, которому семафор будет разрешать одновременно использовать заданный ресурс.
Доступ управляется с помощью счётчика: изначально значение счётчика равно int permits , когда поток заходит в заданный блок кода, то значение счётчика уменьшается на единицу, когда поток его покидает, то увеличивается. Если значение счётчика равно нулю, то текущий поток блокируется, пока кто-нибудь не выйдет из блока (в качестве примера из жизни с permits = 1 , можно привести очередь в кабинет в поликлинике: когда пациент покидает кабинет, мигает лампа, и заходит следующий пациент).
Рассмотрим следующий пример. Существует парковка, которая одновременно может вмещать не более 5 автомобилей. Если парковка заполнена полностью, то вновь прибывший автомобиль должен подождать пока не освободится хотя бы одно место. После этого он сможет припарковаться.
CountDownLatch
CountDownLatch (замок с обратным отсчетом) предоставляет возможность любому количеству потоков в блоке кода ожидать до тех пор, пока не завершится определенное количество операций, выполняющихся в других потоках, перед тем как они будут «отпущены», чтобы продолжить свою деятельность. В конструктор CountDownLatch ( CountDownLatch(int count) ) обязательно передается количество операций, которое должно быть выполнено, чтобы замок «отпустил» заблокированные потоки.
Блокировка потоков снимается с помощью счётчика: любой действующий поток, при выполнении определенной операции уменьшает значение счётчика. Когда счётчик достигает 0, все ожидающие потоки разблокируются и продолжают выполняться (примером CountDownLatch из жизни может служить сбор экскурсионной группы: пока не наберется определенное количество человек, экскурсия не начнется).
- Каждый из пяти автомобилей подъехал к стартовой прямой;
- Была дана команда «На старт!»;
- Была дана команда «Внимание!»;
- Была дана команда «Марш!».
CyclicBarrier
CyclicBarrier реализует шаблон синхронизации Барьер. Циклический барьер является точкой синхронизации, в которой указанное количество параллельных потоков встречается и блокируется. Как только все потоки прибыли, выполняется опционное действие (или не выполняется, если барьер был инициализирован без него), и, после того, как оно выполнено, барьер ломается и ожидающие потоки «освобождаются». В конструктор барьера ( CyclicBarrier(int parties) и CyclicBarrier(int parties, Runnable barrierAction) ) обязательно передается количество сторон, которые должны «встретиться», и, опционально, действие, которое должно произойти, когда стороны встретились, но перед тем когда они будут «отпущены».
Барьер похож на CountDownLatch, но главное различие между ними в том, что вы не можете заново использовать «замок» после того, как его счётчик достигнет нуля, а барьер вы можете использовать снова, даже после того, как он сломается. CyclicBarrier является альтернативой метода join() , который «собирает» потоки только после того, как они выполнились.
Рассмотрим следующий пример. Существует паромная переправа. Паром может переправлять одновременно по три автомобиля. Чтобы не гонять паром лишний раз, нужно отправлять его, когда у переправы соберется минимум три автомобиля.
Exchanger<V>
Exchanger (обменник) может понадобиться, для того, чтобы обменяться данными между двумя потоками в определенной точки работы обоих потоков. Обменник — обобщенный класс, он параметризируется типом объекта для передачи.
Обменник является точкой синхронизации пары потоков: поток, вызывающий у обменника метод exchange() блокируется и ждет другой поток. Когда другой поток вызовет тот же метод, произойдет обмен объектами: каждая из них получит аргумент другой в методе exchange() . Стоит отметить, что обменник поддерживает передачу null значения. Это дает возможность использовать его для передачи объекта в одну сторону, или, просто как точку синхронизации двух потоков.
Рассмотрим следующий пример. Есть два грузовика: один едет из пункта A в пункт D, другой из пункта B в пункт С. Дороги AD и BC пересекаются в пункте E. Из пунктов A и B нужно доставить посылки в пункты C и D. Для этого грузовики в пункте E должны встретиться и обменяться соответствующими посылками.
Phaser
Phaser (фазер), как и CyclicBarrier, является реализацией шаблона синхронизации Барьер, но, в отличии от CyclicBarrier, предоставляет больше гибкости. Этот класс позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Как и CyclicBarrier, Phaser является точкой синхронизации, в которой встречаются потоки-участники. Когда все стороны прибыли, Phaser переходит к следующей фазе и снова ожидает ее завершения.
- Каждая фаза (цикл синхронизации) имеет номер;
- Количество сторон-участников жестко не задано и может меняться: поток может регистрироваться в качестве участника и отменять свое участие;
- Участник не обязан ожидать, пока все остальные участники соберутся на барьере. Чтобы продолжить свою работу достаточно сообщить о своем прибытии;
- Случайные свидетели могут следить за активностью в барьере;
- Поток может и не быть стороной-участником барьера, чтобы ожидать его преодоления;
- У фазера нет опционального действия.
Параметр parties указывает на количество сторон-участников, которые будут выполнять фазы действия. Первый конструктор создает объект Phaser без каких-либо сторон, при этом барьер в этом случае тоже «закрыт». Второй конструктор регистрирует передаваемое в конструктор количество сторон. Барьер открывается когда все стороны прибыли, или, если снимается последний участник. (У класса Phaser еще есть конструкторы, в которые передается родительский объект Phaser, но мы их рассматривать не будем.)
- int register() — регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;
- int getPhase() — возвращает номер текущей фазы;
- int arriveAndAwaitAdvance() — указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог CyclicBarrier.await() . Возвращает номер текущей фазы;
- int arrive() — сообщает, что сторона завершила фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;
- int arriveAndDeregister() — сообщает о завершении всех фаз стороной и снимает ее с регистрации. Возвращает номер текущей фазы;
- int awaitAdvance(int phase) — если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.
Рассмотрим следующий пример. Есть пять остановок. На первых четырех из них могут стоять пассажиры и ждать автобуса. Автобус выезжает из парка и останавливается на каждой остановке на некоторое время. После конечной остановки автобус едет в парк. Нам нужно забрать пассажиров и высадить их на нужных остановках.
Если кому-нибудь пригодилось, то я очень рад=)
Более подробно о Phaser здесь.
Почитать ещё о синхронизаторах и посмотреть примеры можно здесь.