QNX/UNIX: Анатомия параллелизма - Олег Цилюрик
Шрифт:
Интервал:
Закладка:
const char VERSION[] = "vers. 1.03";
// имя, под которым будет регистрироваться в пространстве
// имен наш тестовый менеджер ресурса
static const char DEVNAME[_POSIX_PATH_MAX] = "/dev/srr";
// "базовая часть" команды devctl(), конкретный код команды будет
// формироваться динамически на основе этой части, но исходя
// из фактической длины блока передаваемых данных
const unsigned int DCMD_CMD = 1,
DCMD_SRR = _POSIX_DEVDIR_TOFROM + (_DCMD_NET << 8) + DCMD_CMD;
// структура ответов менеджера ресурса по запросу read()
struct result {
pid_t pid;
int chid;
uint64_t cps;
result(void) {
pid = getpid();
// если уж возвращать, то и служебную информацию ;)
cps = SYSPAGE_ENTRY(qtime)->cycles_per_sec;
}
}
// завершение с извещением кода причины
inline void exit(const char *msg) {
cout << 'r';
perror(msg);
exit(EXIT_FAILURE);
}
В этой части каких-либо комментариев заслуживает разве что структура result. Наш сервер устроен «наоборот»: информационный обмен данными он осуществляет по запросу devctl(), запрос read() нам «не нужен», и мы используем его только для возврата информации (PID + CHID) об автономном канале обмена сообщениями. Заодно мы передаем в поле cps этой структуры значение тактовой частоты процессора сервера, что позволяет знать, «с кем мы имеем дело» при экспериментах в сети.
Теперь мы вполне готовы написать код сервера. Этот сервер (файл srv.cc), в отличие от традиционных, имеет два независимых канала подключения: как менеджер ресурсов и как сервер простого обмена сообщениями. Как менеджер он по запросу read() возвращает адресные компоненты (PID, CHID) для обмена сообщениями (ND клиент должен восстановить самостоятельно). По запросу devctl(), а также по запросу по автономному каналу обмена сообщениями сервер просто ретранслирует обратно полученный от клиента блок данных (в каком-то смысле по обоим каналам обмена наш сервер является «зеркалом», отражающим данные).
Сервер запросовresult data;
//---------------------------------------------------------
// реализация обработчика read()
static int readfunc(resmgr_context_t *ctp, io_read_t *msg,
iofunc_ocb_t *ocb) {
int sts = iofunc_read_verify(ctp, msg, ocb, NULL);
if (sts != EOK) return sts;
// возвращать одни и те же статические данные...
MsgReply(ctp->rcvid, sizeof(result), &data, sizeof(result));
return _RESMGR_NOREPLY;
}
//---------------------------------------------------------
// реализация обработчика devctl.
static int devctlfunc(resmgr_context_t *ctp, io_devctl_t *msg,
iofunc_ocb_t *ocb) {
int sts = iofunc_devctl_default(ctp, msg, ocb);
if (sts != _RESMGR_DEFAULT) return sts;
// разбор динамически создаваемого кода devctl(),
// извлечение из него длины принятого блока
unsigned int nbytes = (msg->i.dcmd - DCMD_SRR) >> 16;
msg->o.nbytes = nbytes;
// и тут же ретрансляция блока назад
return _RESMGR_PTR(ctp, &msg->i, sizeof(msg->i) + nbytes);
}
//---------------------------------------------------------
// установка однопоточного менеджера, выполняемая
// в отдельном потоке
static void* install(void* data) {
dispatch_t *dpp;
if ((dpp = dispatch_create()) == NULL)
exit("dispatch allocate");
resmgr_attr_t resmgr_attr;
memset(&resmgr_attr, 0, sizeof(resmgr_attr));
resmgr_attr.nparts_max = 1;
resmgr_attr.msg_max_size = 2048;
static resmgr_connect_funcs_t connect_funcs;
static resmgr_io_funcs_t io_funcs;
iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs,
_RESMGR_IO_NFUNCS, &io_funcs);
// определяем обработку, отличную от обработки по умолчанию,
// только для двух команд: read() & devctl()
io_funcs.read = &readfunc;
io_funcs.devctl = &devctlfunc;
static iofunc_attr_t attr;
iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
// связываем менеджер с его префиксным именем
if (resmgr_attach(dpp, &resmgr_attr, DEVNAME,
_FTYPE_ANY, 0, &connect_funcs, &io_funcs, &attr) == -1)
exit("prefix attach");
dispatch_context_t* ctp = dispatch_context_alloc(dpp);
while (true) {
if ((ctp = dispatch_block(ctp)) == NULL)
exit("block error");
dispatch_handler(ctp);
}
}
// размер буфера для обмена сообщениями,
// этого нам хватит с большим запасом и надолго ;)
const int blk = 100000;
// обработчик низкоуровневых сообщений,
// также работающий в отдельном потоке
void* msginout(void* с) {
static uint8_t bufin[blk];
struct _msg_info info;
while (true) {
int rcvid = MsgReceive(data chid, &bufin, blk, &info);
if (rcvid < 0) exit("message receive");
if (MsgReply(rcvid, EOK, &bufin, info.msglen) < 0)
exit("message reply");
}
}
//--------------------------------------------------------
// "пустой" обработчик реакции на ^C (сигнал SIGINT)
inline static void empty(int signo) {}
//--------------------------------------------------------
// главная программа, которая все это "хозяйство" установит
// и будет безропотно ждать завершения по ^C ;)
int main(int argc, char *argv[]) {
cout << "SRR server: " << VERSION << endl;
// открывается менеджер ресурса ...
int fd = open(DEVNAME, O_RDONLY);
// если менеджер открылся, то это нам не нужно -
// дубликаты не создавать!
if (fd > 0)
close(fd), cout << "already in use " << DEVNAME << endl, exit(EXIT_FAILURE);
// перехватываем реакцию ^C:
cout << ". . . . . . waiting ^C. . . . . ." << flush;
signal(SIGINT, empty);
// создается канал для обмена низкоуровневыми сообщениями
data.chid = ChannelCreate(0);
// и запускается отдельным потоком ретранслятор с этого канала
if (pthread_create(NULL, NULL, msginout, NULL) != EOK)
exit("message thread");
// запускается менеджер ресурса
if (pthread_create(NULL, NULL, install, NULL) != EOK)
exit("manager thread");
// ... все! Мы свое дело сделали и ожидаем ^C ...
pause();
cout << "rFinalization... " << endl;
// ... очистка, завершение ...
ChannelDestroy(data.chid);
return EXIT_SUCCESS;
}
Первая клиентская программа (файл cli.cc) посылает серверу блок данных указанной длины (длина может изменяться в широких пределах указанием при запуске ключа -b) и ожидает от него ретрансляции, после чего замеряет время ответа от сервера. Этот процесс повторяется многократно (ключ -m).
Первый клиентский процесс#include "common.h"
static uint64_t *tim;
static int num = 10;
// вывод результатов с оценкой статистики: среднее, С.К.О...
static void outtim(void) {
double m = 0., s = 0.;
for (int i = 0; i < num; i++) {
double d = (double)tim[i];
m += d;
s += d * d;
}
m /= num;
s = sqrt(s / num - m * m);
cout << 't' << (uint64_t)floor(m + 5) << "t~" << (uint64_t)floor(s + .5) <<
"t{" << (uint64_t)floor(s / m * 100 + .5) << "%}" << endl;
}
int main(int argc, char **argv) {
cout << "SRR client: " << VERSION << endl;
int opt, val;
unsigned int blk = 100;
char PATH[_POSIX_PATH_MAX] = "";
while ((opt = getopt(argc, argv, "n:b:m:")) != -1) {
switch (opt) {
case 'n': // имя хоста сервера
strcpy(PATH, "/net/");
strcat(PATH, optarg);
break;
case 'b': // размер блока обмена (байт)
if (sscanf(optarg, "%i", &blk) != 1)
exit("parse command line failed");
break;
case 'm': // число повторений таких блоков
if (sscanf(optarg, "%i", &num) != 1)
exit("parse command line failed");
break;
default:
exit(EXIT_FAILURE);
}
}
// "составить" полное имя менеджера
strcat(PATH, DEVNAME);
cout << "server path. " << PATH << ", block size = "
<< blk << " bytes, repeat = " << num << endl;
// при инициализации мы сразу получаем скорость процессора клиента
result data;
cout << "CPU speed [с.p.s ]: client = " << data.cps;
// пытаемся подключиться к серверу-менеджеру
int fd = open(PATH, O_RDONLY);
if (fd < 0) exit("server not found");
// читаем его параметры
if (read(fd, &data, sizeof(result)) == -1)
exit("parameter block read");
cout << ", server = " << data.cps << endl;