QNX/UNIX: Анатомия параллелизма - Олег Цилюрик
Шрифт:
Интервал:
Закладка:
sigset_t sig;
sigfillset(&sig);
SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL);
// разблокировать реакцию на свой сигнал
sigemptyset(&sig);
sigaddset(&sig, (int)data);
SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL);
// цикл ожидания приходящих сигналов
while (true) pause();
}
int main() {
// для обработки всей группы сигналов управления потоками используем
// единую функцию реакции, иначе все было бы гораздо проще.
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_sigaction = handler;
act.sa_flags = SA_SIGINFO;
// создаем группу однотипных потоков
const int thrnum = 3;
for (int i = SIGRTMIN; i - SIGRTMIN < thrnum; i++) {
sigset_t sig;
sigemptyset(&sig);
sigaddset(&sig, 1);
// нам нужно, чтобы главный поток не реагировал:
sigprocmask(SIG_BLOCK, &sig, NULL);
if (sigaction(i, &act, NULL) < 0) perror("set signal handler: ");
// для передачи номера сигнала используется
// трюк с подменой типа параметра:
pthread_create(NULL, NULL, threadfunc, (void*)(i));
}
// начинаем циклическую синхронизацию потоков.
for (int i = 0; ; i++) {
sleep(1);
// посылку сигнала можно (так даже будет корректнее)
// сделать так:
// union sigval val;
// val.sival_int = i;
// sigqueue(getpid(), SIGRTMIN + i % thrnum, val);
// но мы сознательно демонстрируем и приемлемость kill:
kill(getpid(), SIGRTMIN + i % thrnum);
}
}
В этой программе главный поток циклически по таймеру активизирует поочередно каждый поток. Вот фрагмент вывода работающей программы:
SIG = 41; TID = 2
SIG = 42; TID = 3
SIG = 43; TID = 4
SIG = 41; TID = 2
SIG = 42; TID = 3
SIG = 43; TID = 4
SIG = 41; TID = 2
SIG = 42; TID = 3
SIG = 43; TID = 4
Часто приходится слышать: «…хотелось бы доставить сигнал всем потокам, уведомить всех потребителей и выполнить функцию реакции в каждом потоке…», и именно в такой последовательности действий понимается модель сигналов в потоках при поверхностном с ней ознакомлении. Иногда это представляется очень интересной возможностью, и мы реализуем такую схему взаимодействия в следующем фрагменте (файл s10.cc):
Множественная реакция на сигнал#include <stdio.h>
#include <iostream.h>
#include <signal.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/neutrino.h>
#include <vector>
static void handler(int signo, siginfo_t* info, void* context) {
cout << "SIG = " << signo << ", TID = " << pthread_self() << endl;
}
static void endhandler(int signo) {}
// сигнал, на который реагируют потоки:
const int SIGNUM = SIGRTMIN;
sigset_t sig;
struct threcord {
int tid;
bool noblock;
};
static vector<threcord> tharray; // вектор состояний потоков
void* threadfunc(void* data) {
// блокирование всех прочих сигналов:
sigset_t sigall;
sigfillset(&sigall);
SignalProcmask(0, 0, SIG_BLOCK, &sigall, NULL);
// передеспетчеризация для завершения формирования вектора
sched_yield();
tharray[(int)data].noblock =
(SignalProcmask(0, 0, SIG_UNBLOCK, &sig, NULL) != -1);
while (true) {
pause();
tharray[(int)data].noblock =
!(SignalProcmask(0, 0, SIG_BLOCK, &sig, NULL) != 1);
bool nolast = false;
for (vector<threcord>::iterator i = tharray.begin();
i != tharray.end(); i++)
if (nolast = i->noblock) break;
// последовательная пересылка сигнала следующему потоку
if (nolast) kill(getpid(), SIGNUM);
// ... когда пересылать больше некому -
// переинициализация масок
else
for (vector<threcord>::iterator i = tharray.begin();
i != tharray.end(); i++)
i->noblock = (SignalProcmask(0, i->tid, SIG_UNBLOCK, &sig, NULL) != -1);
}
}
int main() {
// переопределение реакции ^C в старой манере
signal(SIGINT, endhandler);
// маска блокирования-разблокирования
sigemptyset(&sig);
sigaddset(&sig, SIGNUM);
// блокировка в главном потоке приложения
sigprocmask(SIG_BLOCK, &sig, NULL);
cout << "Process " << getpid() << ", waiting for signal " << SIGNUM << endl;
// установка обработчика (для дочерних потоков)
struct sigaction act;
act.sa_mask = sig;
act.sa_sigaction = handler;
act.sa_flags = SA_SIGINFO;
if (sigaction(SIGNUM, &act, NULL) < 0) perror("set signal handler: ");
const int thrnum = 3;
for (int i = 0; i < thrnum; i++) {
threcord threc = { 0, false };
pthread_create(&threc.tid, NULL, threadfunc, (void*)i);
tharray.push_back(three);
}
pause();
// сюда мы попадаем после ^C для завершающих операций...
tharray.erase(tharray.begin(), tharray.end());
cout << "Clean vector" << endl;
}
Это приложение, в отличие от предыдущих, построено уже с использованием специфики С++, в нем используется контейнерный класс vector из библиотеки STL (Standard Template Library). Может быть множество вариаций на подобную тему. Приведенное нами приложение (как одна из вариаций) только подтверждает, что принятая в QNX модель достаточна для описания самых неожиданных потребностей. Логика работы приложения крайне проста: получая сигнал, поток блокирует повторную реакцию на этот сигнал, после чего возбуждает дубликат полученного сигнала от своего имени.
ПримечаниеПоказанное приложение в значительной степени искусственно и неэффективно. Мы приводим его здесь не как образец того, «как нужно делать», а только как иллюстрацию гибкости возможностей, предоставляемых в области параллельного программирования. При некоторой изобретательности можно заставить программу вести себя согласно вашим капризам, какими бы изощренными они ни оказались.
Запускаем полученное приложение:
# s10
Process 2089006, waiting for signal 41
После чего с другого терминала пошлем приложению ожидаемый им сигнал, например командой:
# kill -41 2089006
Посылаем этот сигнал несколько раз (в данном случае 3) и получаем вывод от приложения:
SIG = 41; TID = 4
SIG = 41; TID = 2
SIG = 41; TID = 3
SIG = 41; TID = 3
SIG = 41; TID = 4
SIG = 41; TID = 2
SIG = 41; TID = 2
SIG = 41; TID = 3
SIG = 41; TID = 4
^C
Clean vector
Видно, что реакция на каждый сигнал возбуждается несколько раз (по числу потоков), каждый раз выполняясь в контексте разного потока (TID). Интересно и изменение порядка активизации потоков от сигнала к сигналу, то есть потоки в очереди ожидающих «перетасовываются» при поступлении каждого сигнала.
ПримечаниеВ приложение добавлена реакция на ^C (сигнал SIGINT):
• начиная с некоторой сложности приложений, их завершению должна обязательно предшествовать некоторая последовательность действий; в данном случае мы условно показываем очистку вектора состояний потоков;
• реакция на SIGINT выполнена в «ненадежной» манере в смешении с моделью очереди сигналов для SIGRTMIN, что показывает возможность смешанного применения всех моделей в рамках одного приложения; все определяется требованиями и вопросами удобства.
Как мы уже видели, тот факт, что обработчик сигнала выполняется в контексте потока, который разблокировал реакцию на этот сигнал (независимо от того, в момент выполнения какого потока приходит сигнал), позволяет реализовать в обработчике сигнала обработку любой сложности в интересах этого потока. Для этого лишь требуется разместить все области данных, запрашиваемые в этой обработке, не в стеке потока (объявленные как локальные переменные потоковой функции), а в области собственных данных потока, которые мы детально рассмотрели ранее. Схематично это можно показать в коде так:
• Положим, нам нужно уведомлять о некоторых событиях N потоков.
Будем использовать для этого сигналы SIGRTMIN…SIGRTMIN + (N - 1):
for (int i = SIGRTMIN, i < SIGRTMIN + N; i++) {
pthread_create(NULL, NULL, threadfunc, (void*)(i));
}
• При запуске N потоков (из главного потока) потоковые функции, помимо устанавливания своих индивидуальных сигнальных масок (в точности так, как это показано выше в листинге «Чередование потоковых сигналов»), размещают экземпляры собственных потоковых данных:
class DataBlock {
~DataBlock(void) {...}
};
static pthread_key_t key;
static pthread_once_t once = PTHREAD_ONCE_INIT;