Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: downloads/boost_1_33_1/libs/thread/src/condition.cpp @ 20

Last change on this file since 20 was 12, checked in by landauf, 18 years ago

added boost

File size: 17.2 KB
RevLine 
[12]1// Copyright (C) 2001-2003
2// William E. Kempf
3//
4// Permission to use, copy, modify, distribute and sell this software
5// and its documentation for any purpose is hereby granted without fee,
6// provided that the above copyright notice appear in all copies and
7// that both that copyright notice and this permission notice appear
8// in supporting documentation.  William E. Kempf makes no representations
9// about the suitability of this software for any purpose.
10// It is provided "as is" without express or implied warranty.
11
12#include <boost/thread/detail/config.hpp>
13
14#include <boost/thread/condition.hpp>
15#include <boost/thread/xtime.hpp>
16#include <boost/thread/thread.hpp>
17#include <boost/thread/exceptions.hpp>
18#include <boost/limits.hpp>
19#include <cassert>
20#include "timeconv.inl"
21
22#if defined(BOOST_HAS_WINTHREADS)
23#   ifndef NOMINMAX
24#      define NOMINMAX
25#   endif
26#   include <windows.h>
27#elif defined(BOOST_HAS_PTHREADS)
28#   include <errno.h>
29#elif defined(BOOST_HAS_MPTASKS)
30#   include <MacErrors.h>
31#   include "mac/init.hpp"
32#   include "mac/safe.hpp"
33#endif
34
35namespace boost {
36
37namespace detail {
38
39#if defined(BOOST_HAS_WINTHREADS)
40condition_impl::condition_impl()
41    : m_gone(0), m_blocked(0), m_waiting(0)
42{
43    m_gate = reinterpret_cast<void*>(CreateSemaphore(0, 1, 1, 0));
44    m_queue = reinterpret_cast<void*>(
45        CreateSemaphore(0, 0, (std::numeric_limits<long>::max)(), 0));
46    m_mutex = reinterpret_cast<void*>(CreateMutex(0, 0, 0));
47
48    if (!m_gate || !m_queue || !m_mutex)
49    {
50        int res = 0;
51        if (m_gate)
52        {
53            res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
54            assert(res);
55        }
56        if (m_queue)
57        {
58            res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
59            assert(res);
60        }
61        if (m_mutex)
62        {
63            res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
64            assert(res);
65        }
66
67        throw thread_resource_error();
68    }
69}
70
71condition_impl::~condition_impl()
72{
73    int res = 0;
74    res = CloseHandle(reinterpret_cast<HANDLE>(m_gate));
75    assert(res);
76    res = CloseHandle(reinterpret_cast<HANDLE>(m_queue));
77    assert(res);
78    res = CloseHandle(reinterpret_cast<HANDLE>(m_mutex));
79    assert(res);
80}
81
82void condition_impl::notify_one()
83{
84    unsigned signals = 0;
85
86    int res = 0;
87    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
88    assert(res == WAIT_OBJECT_0);
89
90    if (m_waiting != 0) // the m_gate is already closed
91    {
92        if (m_blocked == 0)
93        {
94            res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
95            assert(res);
96            return;
97        }
98
99        ++m_waiting;
100        --m_blocked;
101        signals = 1;
102    }
103    else
104    {
105        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
106        assert(res == WAIT_OBJECT_0);
107        if (m_blocked > m_gone)
108        {
109            if (m_gone != 0)
110            {
111                m_blocked -= m_gone;
112                m_gone = 0;
113            }
114            signals = m_waiting = 1;
115            --m_blocked;
116        }
117        else
118        {
119            res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
120            assert(res);
121        }
122    }
123
124    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
125    assert(res);
126
127    if (signals)
128    {
129        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
130        assert(res);
131    }
132}
133
134void condition_impl::notify_all()
135{
136    unsigned signals = 0;
137
138    int res = 0;
139    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
140    assert(res == WAIT_OBJECT_0);
141
142    if (m_waiting != 0) // the m_gate is already closed
143    {
144        if (m_blocked == 0)
145        {
146            res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
147            assert(res);
148            return;
149        }
150
151        m_waiting += (signals = m_blocked);
152        m_blocked = 0;
153    }
154    else
155    {
156        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
157        assert(res == WAIT_OBJECT_0);
158        if (m_blocked > m_gone)
159        {
160            if (m_gone != 0)
161            {
162                m_blocked -= m_gone;
163                m_gone = 0;
164            }
165            signals = m_waiting = m_blocked;
166            m_blocked = 0;
167        }
168        else
169        {
170            res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
171            assert(res);
172        }
173    }
174
175    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
176    assert(res);
177
178    if (signals)
179    {
180        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_queue), signals, 0);
181        assert(res);
182    }
183}
184
185void condition_impl::enter_wait()
186{
187    int res = 0;
188    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
189    assert(res == WAIT_OBJECT_0);
190    ++m_blocked;
191    res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
192    assert(res);
193}
194
195void condition_impl::do_wait()
196{
197    int res = 0;
198    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue), INFINITE);
199    assert(res == WAIT_OBJECT_0);
200
201    unsigned was_waiting=0;
202    unsigned was_gone=0;
203
204    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
205    assert(res == WAIT_OBJECT_0);
206    was_waiting = m_waiting;
207    was_gone = m_gone;
208    if (was_waiting != 0)
209    {
210        if (--m_waiting == 0)
211        {
212            if (m_blocked != 0)
213            {
214                res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1,
215                    0); // open m_gate
216                assert(res);
217                was_waiting = 0;
218            }
219            else if (m_gone != 0)
220                m_gone = 0;
221        }
222    }
223    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
224    {
225        // timeout occured, normalize the m_gone count
226        // this may occur if many calls to wait with a timeout are made and
227        // no call to notify_* is made
228        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
229        assert(res == WAIT_OBJECT_0);
230        m_blocked -= m_gone;
231        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
232        assert(res);
233        m_gone = 0;
234    }
235    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
236    assert(res);
237
238    if (was_waiting == 1)
239    {
240        for (/**/ ; was_gone; --was_gone)
241        {
242            // better now than spurious later
243            res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
244                INFINITE);
245            assert(res == WAIT_OBJECT_0);
246        }
247        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
248        assert(res);
249    }
250}
251
252bool condition_impl::do_timed_wait(const xtime& xt)
253{
254    bool ret = false;
255    unsigned int res = 0;
256
257    for (;;)
258    {
259        int milliseconds;
260        to_duration(xt, milliseconds);
261
262        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
263            milliseconds);
264        assert(res != WAIT_FAILED && res != WAIT_ABANDONED);
265        ret = (res == WAIT_OBJECT_0);
266
267        if (res == WAIT_TIMEOUT)
268        {
269            xtime cur;
270            xtime_get(&cur, TIME_UTC);
271            if (xtime_cmp(xt, cur) > 0)
272                continue;
273        }
274
275        break;
276    }
277
278    unsigned was_waiting=0;
279    unsigned was_gone=0;
280
281    res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_mutex), INFINITE);
282    assert(res == WAIT_OBJECT_0);
283    was_waiting = m_waiting;
284    was_gone = m_gone;
285    if (was_waiting != 0)
286    {
287        if (!ret) // timeout
288        {
289            if (m_blocked != 0)
290                --m_blocked;
291            else
292                ++m_gone; // count spurious wakeups
293        }
294        if (--m_waiting == 0)
295        {
296            if (m_blocked != 0)
297            {
298                res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1,
299                    0); // open m_gate
300                assert(res);
301                was_waiting = 0;
302            }
303            else if (m_gone != 0)
304                m_gone = 0;
305        }
306    }
307    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
308    {
309        // timeout occured, normalize the m_gone count
310        // this may occur if many calls to wait with a timeout are made and
311        // no call to notify_* is made
312        res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_gate), INFINITE);
313        assert(res == WAIT_OBJECT_0);
314        m_blocked -= m_gone;
315        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
316        assert(res);
317        m_gone = 0;
318    }
319    res = ReleaseMutex(reinterpret_cast<HANDLE>(m_mutex));
320    assert(res);
321
322    if (was_waiting == 1)
323    {
324        for (/**/ ; was_gone; --was_gone)
325        {
326            // better now than spurious later
327            res = WaitForSingleObject(reinterpret_cast<HANDLE>(m_queue),
328                INFINITE);
329            assert(res ==  WAIT_OBJECT_0);
330        }
331        res = ReleaseSemaphore(reinterpret_cast<HANDLE>(m_gate), 1, 0);
332        assert(res);
333    }
334
335    return ret;
336}
337#elif defined(BOOST_HAS_PTHREADS)
338condition_impl::condition_impl()
339{
340    int res = 0;
341    res = pthread_cond_init(&m_condition, 0);
342    if (res != 0)
343        throw thread_resource_error();
344}
345
346condition_impl::~condition_impl()
347{
348    int res = 0;
349    res = pthread_cond_destroy(&m_condition);
350    assert(res == 0);
351}
352
353void condition_impl::notify_one()
354{
355    int res = 0;
356    res = pthread_cond_signal(&m_condition);
357    assert(res == 0);
358}
359
360void condition_impl::notify_all()
361{
362    int res = 0;
363    res = pthread_cond_broadcast(&m_condition);
364    assert(res == 0);
365}
366
367void condition_impl::do_wait(pthread_mutex_t* pmutex)
368{
369    int res = 0;
370    res = pthread_cond_wait(&m_condition, pmutex);
371    assert(res == 0);
372}
373
374bool condition_impl::do_timed_wait(const xtime& xt, pthread_mutex_t* pmutex)
375{
376    timespec ts;
377    to_timespec(xt, ts);
378
379    int res = 0;
380    res = pthread_cond_timedwait(&m_condition, pmutex, &ts);
381    assert(res == 0 || res == ETIMEDOUT);
382
383    return res != ETIMEDOUT;
384}
385#elif defined(BOOST_HAS_MPTASKS)
386
387using threads::mac::detail::safe_enter_critical_region;
388using threads::mac::detail::safe_wait_on_semaphore;
389
390condition_impl::condition_impl()
391    : m_gone(0), m_blocked(0), m_waiting(0)
392{
393    threads::mac::detail::thread_init();
394
395    OSStatus lStatus = noErr;
396
397    lStatus = MPCreateSemaphore(1, 1, &m_gate);
398    if(lStatus == noErr)
399        lStatus = MPCreateSemaphore(ULONG_MAX, 0, &m_queue);
400
401    if(lStatus != noErr || !m_gate || !m_queue)
402    {
403        if (m_gate)
404        {
405            lStatus = MPDeleteSemaphore(m_gate);
406            assert(lStatus == noErr);
407        }
408        if (m_queue)
409        {
410            lStatus = MPDeleteSemaphore(m_queue);
411            assert(lStatus == noErr);
412        }
413
414        throw thread_resource_error();
415    }
416}
417
418condition_impl::~condition_impl()
419{
420    OSStatus lStatus = noErr;
421    lStatus = MPDeleteSemaphore(m_gate);
422    assert(lStatus == noErr);
423    lStatus = MPDeleteSemaphore(m_queue);
424    assert(lStatus == noErr);
425}
426
427void condition_impl::notify_one()
428{
429    unsigned signals = 0;
430
431    OSStatus lStatus = noErr;
432    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
433        m_mutex_mutex);
434    assert(lStatus == noErr);
435
436    if (m_waiting != 0) // the m_gate is already closed
437    {
438        if (m_blocked == 0)
439        {
440            lStatus = MPExitCriticalRegion(m_mutex);
441            assert(lStatus == noErr);
442            return;
443        }
444
445        ++m_waiting;
446        --m_blocked;
447    }
448    else
449    {
450        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
451        assert(lStatus == noErr);
452        if (m_blocked > m_gone)
453        {
454            if (m_gone != 0)
455            {
456                m_blocked -= m_gone;
457                m_gone = 0;
458            }
459            signals = m_waiting = 1;
460            --m_blocked;
461        }
462        else
463        {
464            lStatus = MPSignalSemaphore(m_gate);
465            assert(lStatus == noErr);
466        }
467
468        lStatus = MPExitCriticalRegion(m_mutex);
469        assert(lStatus == noErr);
470
471        while (signals)
472        {
473            lStatus = MPSignalSemaphore(m_queue);
474            assert(lStatus == noErr);
475            --signals;
476        }
477    }
478}
479
480void condition_impl::notify_all()
481{
482    unsigned signals = 0;
483
484    OSStatus lStatus = noErr;
485    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
486        m_mutex_mutex);
487    assert(lStatus == noErr);
488
489    if (m_waiting != 0) // the m_gate is already closed
490    {
491        if (m_blocked == 0)
492        {
493            lStatus = MPExitCriticalRegion(m_mutex);
494            assert(lStatus == noErr);
495            return;
496        }
497
498        m_waiting += (signals = m_blocked);
499        m_blocked = 0;
500    }
501    else
502    {
503        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
504        assert(lStatus == noErr);
505        if (m_blocked > m_gone)
506        {
507            if (m_gone != 0)
508            {
509                m_blocked -= m_gone;
510                m_gone = 0;
511            }
512            signals = m_waiting = m_blocked;
513            m_blocked = 0;
514        }
515        else
516        {
517            lStatus = MPSignalSemaphore(m_gate);
518            assert(lStatus == noErr);
519        }
520
521        lStatus = MPExitCriticalRegion(m_mutex);
522        assert(lStatus == noErr);
523
524        while (signals)
525        {
526            lStatus = MPSignalSemaphore(m_queue);
527            assert(lStatus == noErr);
528            --signals;
529        }
530    }
531}
532
533void condition_impl::enter_wait()
534{
535    OSStatus lStatus = noErr;
536    lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
537    assert(lStatus == noErr);
538    ++m_blocked;
539    lStatus = MPSignalSemaphore(m_gate);
540    assert(lStatus == noErr);
541}
542
543void condition_impl::do_wait()
544{
545    OSStatus lStatus = noErr;
546    lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
547    assert(lStatus == noErr);
548
549    unsigned was_waiting=0;
550    unsigned was_gone=0;
551
552    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
553        m_mutex_mutex);
554    assert(lStatus == noErr);
555    was_waiting = m_waiting;
556    was_gone = m_gone;
557    if (was_waiting != 0)
558    {
559        if (--m_waiting == 0)
560        {
561            if (m_blocked != 0)
562            {
563                lStatus = MPSignalSemaphore(m_gate); // open m_gate
564                assert(lStatus == noErr);
565                was_waiting = 0;
566            }
567            else if (m_gone != 0)
568                m_gone = 0;
569        }
570    }
571    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
572    {
573        // timeout occured, normalize the m_gone count
574        // this may occur if many calls to wait with a timeout are made and
575        // no call to notify_* is made
576        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
577        assert(lStatus == noErr);
578        m_blocked -= m_gone;
579        lStatus = MPSignalSemaphore(m_gate);
580        assert(lStatus == noErr);
581        m_gone = 0;
582    }
583    lStatus = MPExitCriticalRegion(m_mutex);
584    assert(lStatus == noErr);
585
586    if (was_waiting == 1)
587    {
588        for (/**/ ; was_gone; --was_gone)
589        {
590            // better now than spurious later
591            lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
592            assert(lStatus == noErr);
593        }
594        lStatus = MPSignalSemaphore(m_gate);
595        assert(lStatus == noErr);
596    }
597}
598
599bool condition_impl::do_timed_wait(const xtime& xt)
600{
601    int milliseconds;
602    to_duration(xt, milliseconds);
603
604    OSStatus lStatus = noErr;
605    lStatus = safe_wait_on_semaphore(m_queue, milliseconds);
606    assert(lStatus == noErr || lStatus == kMPTimeoutErr);
607
608    bool ret = (lStatus == noErr);
609
610    unsigned was_waiting=0;
611    unsigned was_gone=0;
612
613    lStatus = safe_enter_critical_region(m_mutex, kDurationForever,
614        m_mutex_mutex);
615    assert(lStatus == noErr);
616    was_waiting = m_waiting;
617    was_gone = m_gone;
618    if (was_waiting != 0)
619    {
620        if (!ret) // timeout
621        {
622            if (m_blocked != 0)
623                --m_blocked;
624            else
625                ++m_gone; // count spurious wakeups
626        }
627        if (--m_waiting == 0)
628        {
629            if (m_blocked != 0)
630            {
631                lStatus = MPSignalSemaphore(m_gate); // open m_gate
632                assert(lStatus == noErr);
633                was_waiting = 0;
634            }
635            else if (m_gone != 0)
636                m_gone = 0;
637        }
638    }
639    else if (++m_gone == ((std::numeric_limits<unsigned>::max)() / 2))
640    {
641        // timeout occured, normalize the m_gone count
642        // this may occur if many calls to wait with a timeout are made and
643        // no call to notify_* is made
644        lStatus = safe_wait_on_semaphore(m_gate, kDurationForever);
645        assert(lStatus == noErr);
646        m_blocked -= m_gone;
647        lStatus = MPSignalSemaphore(m_gate);
648        assert(lStatus == noErr);
649        m_gone = 0;
650    }
651    lStatus = MPExitCriticalRegion(m_mutex);
652    assert(lStatus == noErr);
653
654    if (was_waiting == 1)
655    {
656        for (/**/ ; was_gone; --was_gone)
657        {
658            // better now than spurious later
659            lStatus = safe_wait_on_semaphore(m_queue, kDurationForever);
660            assert(lStatus == noErr);
661        }
662        lStatus = MPSignalSemaphore(m_gate);
663        assert(lStatus == noErr);
664    }
665
666    return ret;
667}
668#endif
669
670} // namespace detail
671
672} // namespace boost
673
674// Change Log:
675//    8 Feb 01  WEKEMPF Initial version.
676//   22 May 01  WEKEMPF Modified to use xtime for time outs.
677//    3 Jan 03  WEKEMPF Modified for DLL implementation.
Note: See TracBrowser for help on using the repository browser.