Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Ignore:
Timestamp:
Jun 28, 2009, 3:04:30 PM (15 years ago)
Author:
scheusso
Message:

a lot of cleanup
some bugfixes (Thread, ThreadPool)
the biggest part of the network (~80% cpu time) is now multithreaded (1 thread for each client)

Location:
code/branches/netp6/src/core
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • code/branches/netp6/src/core/CorePrereqs.h

    r3231 r3240  
    189189    // multithreading
    190190    class Thread;
    191     class ThreadGroup;
     191    class ThreadPool;
    192192}
    193193
  • code/branches/netp6/src/core/Thread.cc

    r3231 r3240  
    3636
    3737#include "util/Sleep.h"
    38 #include "Functor.h"
     38#include "Executor.h"
    3939
    4040namespace orxonox
     
    4444   
    4545    Thread::Thread():
    46         functor_(0),
     46        executor_(0),
    4747        isWorking_(false),
    4848        stopThread_(false)
    4949    {
    50         this->communicationMutex_ = new boost::mutex;
     50        this->executorMutex_ = new boost::mutex;
     51        this->isWorkingMutex_ = new boost::mutex;
     52        this->stopThreadMutex_ = new boost::mutex;
    5153        this->workerThread_ = new boost::thread( boost::bind(&Thread::threadLoop, this) );
    5254    }
     
    5456    Thread::~Thread()
    5557    {
     58        this->stopThreadMutex_->lock();
    5659        this->stopThread_ = true;
     60        this->stopThreadMutex_->unlock();
    5761        if( !this->workerThread_->timed_join( THREAD_WAIT_BEFORE_DETACH ) )
    5862            assert(0); // this should not happen
    5963        delete this->workerThread_;
    60         delete this->communicationMutex_;
     64        delete this->executorMutex_;
     65        delete this->stopThreadMutex_;
     66        delete this->isWorkingMutex_;
    6167    }
    6268   
    63     bool Thread::evaluateFunctor( Functor* functor )
     69    bool Thread::isWorking()
    6470    {
    65         if( this->communicationMutex_->try_lock() )
    66         {
    67             this->functor_ = functor;
    68             this->communicationMutex_->unlock();
    69             return true;
    70         }
    71         else
    72             return false;
     71      this->isWorkingMutex_->lock();
     72      bool isWorking = this->isWorking_;
     73      this->isWorkingMutex_->unlock();
     74      return isWorking;
     75    }
     76   
     77    bool Thread::evaluateExecutor( Executor* executor )
     78    {
     79        this->isWorkingMutex_->lock();
     80        this->isWorking_=true;
     81        this->isWorkingMutex_->unlock();
     82        this->executorMutex_->lock();
     83        this->executor_ = executor;
     84        this->executorMutex_->unlock();
     85        return true;
    7386    }
    7487   
    7588    void Thread::threadLoop()
    7689    {
    77         while( !this->stopThread_ )
     90        bool stopThread = false;
     91        while( !stopThread )
    7892        {
    79             this->communicationMutex_->lock();
    80             if( this->functor_ )
     93            this->executorMutex_->lock();
     94            Executor* executor = this->executor_;
     95            this->executorMutex_->unlock();
     96            if( executor )
    8197            {
    82                 (*this->functor_)();
    83                 this->communicationMutex_->unlock();
     98                (*executor)();
     99                this->executorMutex_->lock();
     100                delete this->executor_;
     101                this->executor_ = 0;
     102                this->executorMutex_->unlock();
     103                this->isWorkingMutex_->lock();
     104                this->isWorking_=false;
     105                this->isWorkingMutex_->unlock();
    84106            }
    85107            else
    86108            {
    87                 this->communicationMutex_->unlock();
    88109                this->workerThread_->yield();
    89110            }
     111            this->stopThreadMutex_->lock();
     112            stopThread = this->stopThread_;
     113            this->stopThreadMutex_->unlock();
    90114        }
    91115    }
     
    96120        while( stillWorking )
    97121        {
    98             this->communicationMutex_->lock();
     122            this->isWorkingMutex_->lock();
    99123            stillWorking = this->isWorking_;
    100             this->communicationMutex_->unlock();
     124            this->isWorkingMutex_->unlock();
    101125            if( stillWorking )
    102126                msleep( 1 );
  • code/branches/netp6/src/core/Thread.h

    r3231 r3240  
    3232#include "CorePrereqs.h"
    3333
     34namespace boost{
     35  class recursive_mutex;
     36}
     37
    3438 namespace orxonox
    3539{
     
    4044        virtual ~Thread();
    4145
    42         inline bool isWorking() { return this->isWorking_; }
     46        bool isWorking();
    4347        void waitUntilFinished();
    44         bool evaluateFunctor( Functor* functor );
     48        bool evaluateExecutor( Executor* executor );
    4549
    4650    private:
    4751        void            threadLoop();
    4852       
    49         Functor*        functor_;
     53        Executor*       executor_;
    5054        bool            isWorking_;
    5155        bool            stopThread_;
    5256        boost::thread*  workerThread_;
    53         boost::mutex*   communicationMutex_;
     57        boost::mutex*   executorMutex_;
     58        boost::mutex*     isWorkingMutex_;
     59        boost::mutex*   stopThreadMutex_;
    5460    };
    5561
  • code/branches/netp6/src/core/ThreadPool.cc

    r3231 r3240  
    2828
    2929#include "ThreadPool.h"
     30#include "Thread.h"
    3031#include <cassert>
    3132
     
    3940    ThreadPool::~ThreadPool()
    4041    {
     42        unsigned int a = this->setNrOfThreads(0);
     43        assert(a == 0);
    4144    }
    4245   
     
    4447    {
    4548        for( unsigned int i=0; i<nr; i++ )
    46             this->threadPool_.push_back(Thread());
     49            this->threadPool_.push_back(new Thread());
    4750    }
    4851    unsigned int ThreadPool::removeThreads( unsigned int nr )
    4952    {
    5053        unsigned int i=0;
    51         std::vector<Thread>::iterator it;
    52         for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; ++it )
     54        std::vector<Thread*>::iterator it;
     55        for( it = this->threadPool_.begin(); it != threadPool_.end() && i<nr; )
    5356        {
    54             if( ! it->isWorking() )
     57            if( ! (*it)->isWorking() )
    5558            {
    56                 this->threadPool_.erase( it++ );
     59                Thread* temp = *it;
     60                it=this->threadPool_.erase( it );
     61                delete temp;
    5762                ++i;
    5863            }
     64            else
     65              ++it;
    5966        }
    6067        return i;
     
    7481    }
    7582   
    76     bool ThreadPool::passFunction( Functor* functor, bool addThread )
     83    bool ThreadPool::passFunction( Executor* executor, bool addThread )
    7784    {
    78         std::vector<Thread>::iterator it;
     85        std::vector<Thread*>::iterator it;
    7986        for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it )
    8087        {
    81             if ( ! it->isWorking() )
     88            if ( ! (*it)->isWorking() )
    8289            {
    83                 bool b = it->evaluateFunctor( functor );
     90                bool b = (*it)->evaluateExecutor( executor );
    8491                assert(b); // if b is false then there is some code error
    8592                return true;
     
    8996        {
    9097            addThreads( 1 );
    91             this->threadPool_.back().evaluateFunctor( functor ); // access the last element
     98            bool b = this->threadPool_.back()->evaluateExecutor( executor ); // access the last element
     99            assert(b);
    92100            return true;
    93101        }
     
    98106    void ThreadPool::synchronise()
    99107    {
    100         std::vector<Thread>::iterator it;
     108        std::vector<Thread*>::iterator it;
    101109        for ( it=this->threadPool_.begin(); it!=this->threadPool_.end(); ++it )
    102110        {
    103             it->waitUntilFinished();
     111            (*it)->waitUntilFinished();
    104112        }
    105113    }
  • code/branches/netp6/src/core/ThreadPool.h

    r3231 r3240  
    3333
    3434#include <vector>
    35 #include "Thread.h"
    3635
    3736 namespace orxonox
     
    4746        unsigned int setNrOfThreads( unsigned int nr );
    4847       
    49         bool passFunction( Functor* functor, bool addThread=false );
     48        bool passFunction( Executor* executor, bool addThread=false );
    5049        void synchronise();
    5150       
    5251    private:
    53         std::vector<Thread> threadPool_;
     52        std::vector<Thread*> threadPool_;
    5453       
    5554    };
Note: See TracChangeset for help on using the changeset viewer.