[c++, pthread, noob]

serge18

хочу сделать что-то похожее на pipeline с использованием pthreads - хочу заставить треды последовательно циклично получать доступ к ресурсу, например ко входному потоку данных.
 
pthread_mutex_t in_mutex = PTHREAD_MUTEX_INITIALIZER;
int cur_id = 0;

thread_func(int id)
{
while(work)
{
pthread_mutex_lock(&in_mutex);
int cur_val = cur_id;
while(cur_val != id)
{
pthread_mutex_unlock(&in_mutex);
pthread_mutex_lock(&in_mutex);
cur_val = cur_id;
}
read_data;
cur_val = (cur_val + 1) % thread_num;
pthread_mutex_unlock(&in_mutex);
process_data;
...
}
}

наличие огромного количества локов\анлоков во внутреннем цикле в неактивных тредах огорчает, есть возможность что оно просто будет крутиться в одном состоянии
если же добавить засыпание и просыпание через pthread_condition:
 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t in_mutex = PTHREAD_MUTEX_INITIALIZER;
int cur_id = 0;


thread_func(int id)
{
while(work)
{
pthread_mutex_lock(&in_mutex);
int cur_val = cur_id;
while(cur_val != id)
{
pthread_cond_wait( &cond, &in_mutex );
cur_val = cur_id;
pthread_mutex_lock(&in_mutex);
}
read_data;
cur_val = (cur_val + 1) % thread_num;
pthread_cond_broadcast( &cond );
pthread_mutex_unlock(in_mutex);
process_data;
...
}
}

но тут возможна ситуация когда нулевой тред отработает и пошлет сигнал, а следующий тред еще не дошел до pthread_cond_wait тогда на этом все закончится.
можно ли как-то средствами pthreads реализовать последовательный доступ тредов к ресурсу?

karkar

Каждый поток ждет свою cond variable. Дождавшись и поработав с ресурсом, сигнализирует следующему "теперь можешь ты".

elenangel

но тут возможна ситуация когда нулевой тред отработает и пошлет сигнал, а следующий тред еще не дошел до pthread_cond_wait тогда на этом все закончится.
можно использовать блокировку с двойной проверкой
можно попробовать применить барьеры по одному на каждую пару тредов; фактически, для кольца получится N барьеров = N тредов, каждый тред зацеплен на 2 барьера, один - в начале работы, другой - в конце, точка входа в первый тред находится за первым барьером и перед вторым, точка входа в остальные - перед первым барьером.

elenangel

pthread_cond_wait( &cond, &in_mutex ); cur_val = cur_id; pthread_mutex_lock(&in_mutex);
здесь lock не нужен, мьютекс из wait'а возвращается в заблокированном состоянии.

elenangel

ситуация когда нулевой тред отработает и пошлет сигнал, а следующий тред еще не дошел до pthread_cond_wait
вообще говоря, не возможна, если посылать broadcast из под блокировки.
при входе в wait атомарно останавливается и входит в ожидание тред и отпускается мьютекс, который защищает условную переменную. таким образом, broadcast будет послан только одним тредом в момент, когда остальные в wait. если же сигнал послан первым тредом до входа в блокировку вторым тредом, то второй тред уйдет в ожидание только в случае если пропущенный сигнал был не ему, т.к. перед усыплением он проверяет id.

serge18

это вроде не исправляет от проблемы когда сигнал от предыдущего потока может быть отправлен до того как текущий поток начнет ждать. Например, полагаю, в случае с одним потоком это не сработает.

serge18

my bad. невнимательно прочитал man
Upon successful return, the mutex shall have been locked and shall be owned by the calling thread.
хмммм, тогда как работает broadcast? я думал он сразу же релизит все потоки, находящиеся в ожидание на этой cond переменной? если так, то получается что они все одновременно лочат мьютекс? я где-то не прав и не понимаю этого.
с signal в этом случае, похоже, понятно

serge18

звучит разумно, но вчера с дебажными выводами провел пару неудачных опытов. возможно как раз из-зи лишнего lock после wait. пойду поиграюсь еще.
спасибо!

elenangel

после броадкаста будятся все треды и начинают бороться за залочку мьютекса. залочку получает кто-то один, остальные обратно засыпают в ожидании на данном мьютексе. вроде так.

elenangel

получается, тебе надо перед wait в треде делать broadcast остальным. как-то это странно. видимо, лучше действительно каждому треду персональную условную переменную.

kokoc88

хочу заставить треды последовательно циклично получать доступ к ресурсу
Последовательность получения важна? Например, поток номер XXX всегда должен получать доступ первым, а поток YYY всегда вторым?

karkar

это вроде не исправляет от проблемы когда сигнал от предыдущего потока может быть отправлен до того как текущий поток начнет ждать.
Так это не проблема совершенно. Поток работает с ресурсом, только если разрешающий сигнал ему пришел. Не важно, чем он в тот момент занимался - ждал или еще чем-то другим был занят.

serge18

да, я бы хотел добить с такой заморочкой. иначе достаточно было бы того что есть - просто пул тредов, и первый кто готов - хватает задачу

kokoc88

но тут возможна ситуация когда нулевой тред отработает и пошлет сигнал, а следующий тред еще не дошел до pthread_cond_wait тогда на этом все закончится.
Это невозможно, у тебя только один поток может выполнять чтение и запись cur_val

kokoc88

да, я бы хотел добить с такой заморочкой. иначе достаточно было бы того что есть - просто пул тредов, и первый кто готов - хватает задачу
Я бы написал, что тебе не нужна эта заморочка. И действительно, а зачем она? :)

serge18

в данном конкретном случае часть входного потока должна быть доступна двум вычислениям идущим последовательно.
например параллельный поиск подстроки длины 16 в строке, я хочу чтобы каждый тред обрабатывал по 1024 символа - для этого ему нужно будет прочитать 1024 + 15 символов. Эти 15 символов следующий тред должен обработать - для этого их надо как-то вытянуть из данных предыдущего, т.к. во входном стриме их уже нет.

serge18

примерно в этом ошибка и была - я читал переменную не из под мьютекса и на основе этого делал решения, и в этот момент переменная менялась. в общем хреново отконтролил шаренные данные

serge18

в общем, как всегда обманул себя сам. просто еще раз внимательно проследил за обращениями к разделяемым ресурсам и нашел багу =(
всем спасибо!

Dasar

Эти 15 символов следующий тред должен обработать - для этого их надо как-то вытянуть из данных предыдущего
лучше эти 15 байт "передать" как часть задачи для следующего произвольно-проснувшегося треда.
ps
псевдокод примерно такой:

Task(Stream f, byte[] prev)
byte[] buffer = new byte[1024];
f.Read(buffer, 0, 1024);

start new Task(f, buffer.GetRange(buffer.Length-15, 15;

process(prev, buffer);

serge18

т.е. хранить еще одну разделяемую переменную (будет роазделяться вместе с доступом на чтнеие) с указателем на актуальный (последний прочтенный) этот участок памяти?
а насколько обслуживание условной переменной, cond_wait и signal затратны по ресурсам?

kokoc88

в данном конкретном случае часть входного потока должна быть доступна двум вычислениям идущим последовательно.
например параллельный поиск подстроки длины 16 в строке, я хочу чтобы каждый тред обрабатывал по 1024 символа - для этого ему нужно будет прочитать 1024 + 15 символов. Эти 15 символов следующий тред должен обработать - для этого их надо как-то вытянуть из данных предыдущего, т.к. во входном стриме их уже нет.
Ты неправильно решаешь задачу, для этого не обязательно делать такую сложную логику. В одном потоке считывай данные, делай из них задачи, клади задачи на пул потоков.
Оставить комментарий
Имя или ник:
Комментарий: