Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/branches/console/src/core/TclThreadManager.cc @ 1255

Last change on this file since 1255 was 1255, checked in by landauf, 16 years ago
  • after a total rewrite of the TclThreadManager, there are some new problems, but most of the old ones are fixed. i commit this version to have a backup, not because it's finished.
  • 'exec' command is now called 'source' (tcl and orxonox)
  • 'orxonox' command is now called 'query' (tcl only)
  • added 'crossquery' command (tcl only)
File size: 18.6 KB
RevLine 
[1230]1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Fabian 'x3n' Landau
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include <iostream>
30#include <string>
31
32#include <boost/thread/thread.hpp>
33#include <boost/bind.hpp>
34
35#include <OgreTimer.h>
36
37#include "CoreIncludes.h"
38#include "ConsoleCommand.h"
39#include "CommandExecutor.h"
40#include "Debug.h"
41#include "TclBind.h"
42#include "TclThreadManager.h"
43#include "util/Convert.h"
44
[1255]45#define TCLTHREADMANAGER_MAX_QUEUE_LENGTH 1024
46#define TCLTHREADMANAGER_MAX_CPU_USAGE 0.50
[1230]47
48namespace orxonox
49{
[1255]50//    ConsoleCommand(TclThreadManager, tclthread, AccessLevel::None, true);
[1230]51    ConsoleCommand(TclThreadManager, create,    AccessLevel::None, false);
52    ConsoleCommand(TclThreadManager, destroy,   AccessLevel::None, false);
53    ConsoleCommand(TclThreadManager, execute,   AccessLevel::None, false);
54    ConsoleCommand(TclThreadManager, query,     AccessLevel::None, false);
[1255]55//    ConsoleCommand(TclThreadManager, status,    AccessLevel::None, false);
56//    ConsoleCommand(TclThreadManager, dump,      AccessLevel::None, false);
[1230]57
58    TclThreadManager* instance = &TclThreadManager::getInstance();
59
60    TclThreadManager::TclThreadManager()
61    {
62        RegisterRootObject(TclThreadManager);
63
[1255]64        this->threadCounter_ = 0;
65        this->orxonoxInterpreterBundle_.id_ = 0;
66        this->orxonoxInterpreterBundle_.interpreter_ = TclBind::getInstance().getTclInterpreter();
[1230]67    }
68
69    TclThreadManager& TclThreadManager::getInstance()
70    {
71        static TclThreadManager instance;
72        return instance;
73    }
74
[1255]75    unsigned int TclThreadManager::create()
[1230]76    {
[1255]77        boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_);
78        TclThreadManager::getInstance().threadCounter_++;
79        std::string name = getConvertedValue<unsigned int, std::string>(TclThreadManager::getInstance().threadCounter_);
[1230]80
[1255]81        TclInterpreterBundle* bundle = new TclInterpreterBundle;
82        bundle->id_ = TclThreadManager::getInstance().threadCounter_;
83        bundle->interpreter_ = TclThreadManager::getInstance().createNewTclInterpreter(name);
84        bundle->interpreterName_ = name;
85        bundle->running_ = true;
86
87        TclThreadManager::getInstance().interpreterBundles_[TclThreadManager::getInstance().threadCounter_] = bundle;
88        COUT(0) << "Created new Tcl-interpreter with ID " << TclThreadManager::getInstance().threadCounter_ << std::endl;
89        return TclThreadManager::getInstance().threadCounter_;
[1230]90    }
91
92    void TclThreadManager::destroy(unsigned int threadID)
93    {
[1255]94        TclInterpreterBundle* bundle = TclThreadManager::getInstance().getInterpreterBundle(threadID);
95        if (bundle)
[1230]96        {
[1255]97            {
98                boost::mutex::scoped_lock lock(bundle->runningMutex_);
99                bundle->running_ = false;
100            }
101            {
102                boost::mutex::scoped_lock lock(bundle->finishedMutex_);
103                while (!bundle->finished_)
104                    bundle->finishedCondition_.wait(lock);
105            }
106            {
107                boost::mutex::scoped_lock lock(bundle->interpreterMutex_);
108                delete bundle->interpreter_;
109            }
110            delete bundle;
111            {
112                boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_);
113                TclThreadManager::getInstance().interpreterBundles_.erase(threadID);
114            }
[1230]115        }
116    }
117
118    void TclThreadManager::execute(unsigned int threadID, const std::string& command)
119    {
[1255]120        TclThreadManager::getInstance().pushCommandToQueue(threadID, command);
[1230]121    }
122
123    std::string TclThreadManager::query(unsigned int threadID, const std::string& command)
124    {
[1255]125        return TclThreadManager::getInstance().evalQuery(TclThreadManager::getInstance().orxonoxInterpreterBundle_.id_, threadID, command);
[1230]126    }
127
[1255]128    void TclThreadManager::tcl_execute(Tcl::object const &args)
[1247]129    {
[1255]130        std::string command = args.get();
131        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
132            command = command.substr(1, command.size() - 2);
[1247]133
[1255]134        TclThreadManager::getInstance().pushCommandToQueue(command);
[1247]135    }
136
[1255]137    std::string TclThreadManager::tcl_query(int querierID, Tcl::object const &args)
[1247]138    {
[1255]139        std::string command = args.get();
140        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
141            command = command.substr(1, command.size() - 2);
[1247]142
[1255]143        return TclThreadManager::getInstance().evalQuery((unsigned int)querierID, command);
[1247]144    }
145
[1255]146    std::string TclThreadManager::tcl_crossquery(int querierID, int threadID, Tcl::object const &args)
[1230]147    {
148        std::string command = args.get();
[1255]149        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
[1230]150            command = command.substr(1, command.size() - 2);
151
[1255]152        return TclThreadManager::getInstance().evalQuery((unsigned int)querierID, (unsigned int)threadID, command);
[1230]153    }
154
[1255]155    bool TclThreadManager::tcl_running(int threadID)
[1230]156    {
[1255]157        TclInterpreterBundle* bundle = TclThreadManager::getInstance().getInterpreterBundle((unsigned int)threadID);
158        if (bundle)
[1247]159        {
[1255]160            boost::mutex::scoped_lock lock(bundle->runningMutex_);
161            return bundle->running_;
[1247]162        }
[1255]163        return false;
[1230]164    }
165
[1255]166    Tcl::interpreter* TclThreadManager::createNewTclInterpreter(const std::string& threadID)
[1230]167    {
[1247]168        Tcl::interpreter* i = 0;
169        i = new Tcl::interpreter(TclBind::getInstance().getTclLibPath());
[1230]170
[1247]171        try
172        {
[1255]173            i->def("orxonox::query", TclThreadManager::tcl_query, Tcl::variadic());
174            i->def("orxonox::crossquery", TclThreadManager::tcl_crossquery, Tcl::variadic());
175            i->def("orxonox::execute", TclThreadManager::tcl_execute, Tcl::variadic());
176            i->def("orxonox::running", TclThreadManager::tcl_running);
177
178            i->def("execute", TclThreadManager::tcl_execute, Tcl::variadic());
179            i->eval("proc query args { orxonox::query " + threadID + " $args }");
180            i->eval("proc crossquery {id args} { orxonox::crossquery " + threadID + " $id $args }");
181            i->eval("set id " + threadID);
182
183            i->eval("rename exit tcl::exit");
184            i->eval("proc exit {} { orxonox TclThreadManager destroy " + threadID + " }");
185
[1247]186            i->eval("redef_puts");
[1255]187
188            i->eval("rename while tcl::while");
189            i->eval("proc while {test command} { tcl::while {$test && [orxonox::running " + threadID + "]} \"$command\" }");
190//            i->eval("rename for tcl::for");
191//            i->eval("proc for {start test next command} { uplevel tcl::for \"$start\" \"$test\" \"$next\" \"$command\" }");
[1247]192        }
193        catch (Tcl::tcl_error const &e)
[1255]194        {   COUT(1) << "Tcl error while creating Tcl-interpreter (" << threadID << "): " << e.what() << std::endl;   }
[1247]195        catch (std::exception const &e)
[1255]196        {   COUT(1) << "Error while creating Tcl-interpreter (" << threadID << "): " << e.what() << std::endl;   }
[1230]197
198        return i;
199    }
200
[1255]201    TclInterpreterBundle* TclThreadManager::getInterpreterBundle(unsigned int threadID)
[1230]202    {
[1255]203        if (threadID == 0)
204            return &this->orxonoxInterpreterBundle_;
205
206        boost::mutex::scoped_lock lock(this->bundlesMutex_);
207        std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.find(threadID);
208        if (it != this->interpreterBundles_.end())
[1230]209        {
[1255]210            return (*it).second;
[1230]211        }
[1255]212        else
[1230]213        {
[1255]214            this->forceCommandToFrontOfQueue("error Error: No Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + " existing.");
215            return 0;
[1230]216        }
217    }
218
[1255]219    std::string TclThreadManager::dumpList(const std::list<unsigned int>& list)
[1230]220    {
[1255]221        std::string output = "";
222        for (std::list<unsigned int>::const_iterator it = list.begin(); it != list.end(); ++it)
[1230]223        {
[1255]224            if (it != list.begin())
225                output += " ";
[1230]226
[1255]227            output += getConvertedValue<unsigned int, std::string>(*it);
[1230]228        }
[1255]229        return output;
[1230]230    }
231
[1255]232    void TclThreadManager::pushCommandToQueue(const std::string& command)
[1247]233    {
[1255]234        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
235        while (this->orxonoxInterpreterBundle_.queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH)
236            this->fullQueueCondition_.wait(lock);
237
238        this->orxonoxInterpreterBundle_.queue_.push_back(command);
[1247]239    }
240
[1255]241    void TclThreadManager::forceCommandToFrontOfQueue(const std::string& command)
[1247]242    {
[1255]243        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
244        this->orxonoxInterpreterBundle_.queue_.push_front(command);
[1247]245    }
246
[1255]247    std::string TclThreadManager::popCommandFromQueue()
[1230]248    {
[1255]249        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
250        std::string temp = this->orxonoxInterpreterBundle_.queue_.front();
251        this->orxonoxInterpreterBundle_.queue_.pop_front();
252        this->fullQueueCondition_.notify_one();
253        return temp;
[1230]254    }
255
[1255]256    bool TclThreadManager::queueIsEmpty()
[1230]257    {
[1255]258        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
259        return this->orxonoxInterpreterBundle_.queue_.empty();
[1230]260    }
261
[1255]262    void TclThreadManager::pushCommandToQueue(unsigned int threadID, const std::string& command)
[1230]263    {
[1255]264        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
265        if (bundle)
266        {
267            boost::mutex::scoped_lock lock(bundle->queueMutex_);
268            if (bundle->queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH)
269            {
270                this->forceCommandToFrontOfQueue("error Error: Queue of Tcl-interpreter " + getConvertedValue<unsigned int, std::string>(threadID) + " is full, couldn't add command.");
271                return;
272            }
[1230]273
[1255]274            bundle->queue_.push_back(command);
275        }
[1230]276    }
277
[1255]278    std::string TclThreadManager::popCommandFromQueue(unsigned int threadID)
[1230]279    {
[1255]280        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
281        if (bundle)
[1230]282        {
[1255]283            boost::mutex::scoped_lock lock(bundle->queueMutex_);
284            std::string temp = bundle->queue_.front();
285            bundle->queue_.pop_front();
286            return temp;
[1230]287        }
[1255]288        return "";
[1230]289    }
290
[1255]291    bool TclThreadManager::queueIsEmpty(unsigned int threadID)
[1230]292    {
[1255]293        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
294        if (bundle)
[1230]295        {
[1255]296            boost::mutex::scoped_lock lock(bundle->queueMutex_);
297            return bundle->queue_.empty();
[1230]298        }
[1255]299        return true;
[1230]300    }
301
[1255]302    bool TclThreadManager::updateQueriersList(TclInterpreterBundle* querier, TclInterpreterBundle* target)
[1230]303    {
[1255]304        if (querier == target)
305            return false;
[1230]306
[1255]307        boost::mutex::scoped_lock lock(target->queriersMutex_);
308
[1230]309        {
[1255]310            boost::mutex::scoped_lock lock(querier->queriersMutex_);
311            target->queriers_.insert(target->queriers_.end(), querier->queriers_.begin(), querier->queriers_.end());
[1230]312        }
[1255]313
314        target->queriers_.insert(target->queriers_.end(), querier->id_);
315
316        if (std::find(target->queriers_.begin(), target->queriers_.end(), target->id_) != target->queriers_.end())
[1230]317        {
[1255]318            this->forceCommandToFrontOfQueue("error Error: Circular query (" + this->dumpList(target->queriers_) + " -> " + getConvertedValue<unsigned int, std::string>(target->id_) + "), couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(target->id_) + " from other interpreter with ID " + getConvertedValue<unsigned int, std::string>(querier->id_) + ".");
319            return false;
[1230]320        }
[1255]321
322        return true;
[1230]323    }
324
[1255]325    std::string TclThreadManager::evalQuery(unsigned int querierID, const std::string& command)
[1230]326    {
[1255]327        TclInterpreterBundle* querier = this->getInterpreterBundle(querierID);
328        std::string output = "";
329        if (querier)
330        {
331            if (this->updateQueriersList(querier, &this->orxonoxInterpreterBundle_))
332            {
333                boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);
334                this->orxonoxEvalCondition_.wait(lock);
[1230]335
[1255]336                if (!CommandExecutor::execute(command, false))
337                    this->forceCommandToFrontOfQueue("error Error: Can't execute command \"" + command + "\"!");
[1230]338
[1255]339                if (CommandExecutor::getLastEvaluation().hasReturnvalue())
340                    output = CommandExecutor::getLastEvaluation().getReturnvalue().toString();
341            }
[1230]342
[1255]343            boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queriersMutex_);
344            this->orxonoxInterpreterBundle_.queriers_.clear();
345        }
346        return output;
[1230]347    }
348
[1255]349    std::string TclThreadManager::evalQuery(unsigned int querierID, unsigned int threadID, const std::string& command)
[1230]350    {
[1255]351        TclInterpreterBundle* target = this->getInterpreterBundle(threadID);
352        std::string output = "";
353        if (target)
[1247]354        {
[1255]355            TclInterpreterBundle* querier = 0;
356            if (querierID)
357                querier = this->getInterpreterBundle(querierID);
358            else
359                querier = &this->orxonoxInterpreterBundle_;
[1230]360
[1255]361            if (querier)
[1230]362            {
[1255]363                if (this->updateQueriersList(querier, target))
[1230]364                {
[1255]365                    boost::mutex::scoped_lock lock(target->interpreterMutex_, boost::defer_lock_t());
366                    bool successfullyLocked = false;
[1247]367                    try
368                    {
[1255]369                        if (querierID == 0 || std::find(querier->queriers_.begin(), querier->queriers_.end(), (unsigned int)0) != querier->queriers_.end())
370                            successfullyLocked = lock.try_lock();
371                        else
372                        {
373                            while (!lock.try_lock())
374                                boost::this_thread::yield();
375
376                            successfullyLocked = true;
377                        }
378                    } catch (...) {}
379
380                    if (successfullyLocked)
[1247]381                    {
[1255]382                        try
383                        {   output = (std::string)target->interpreter_->eval(command);   }
384                        catch (Tcl::tcl_error const &e)
385                        {   this->forceCommandToFrontOfQueue("error Tcl error: " + (std::string)e.what());   }
386                        catch (std::exception const &e)
387                        {   this->forceCommandToFrontOfQueue("error Error while executing Tcl: " + (std::string)e.what());   }
[1247]388                    }
[1255]389                    else
[1247]390                    {
[1255]391                        this->forceCommandToFrontOfQueue("error Error: Couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + ", interpreter is busy right now.");
[1247]392                    }
[1230]393                }
[1255]394
395                boost::mutex::scoped_lock lock(target->queriersMutex_);
396                target->queriers_.clear();
[1230]397            }
398        }
[1255]399        return output;
[1230]400    }
401
402    void TclThreadManager::tick(float dt)
403    {
404        {
[1255]405            this->orxonoxEvalCondition_.notify_one();
406            boost::this_thread::yield();
[1230]407        }
408
409        {
[1255]410            boost::mutex::scoped_lock lock(this->bundlesMutex_);
411            for (std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.begin(); it != this->interpreterBundles_.end(); ++it)
[1230]412            {
[1255]413                boost::mutex::scoped_lock lock((*it).second->queueMutex_);
414                if (!(*it).second->queue_.empty())
[1230]415                {
[1255]416                    std::string command = (*it).second->queue_.front();
417                    (*it).second->queue_.pop_front();
418
[1247]419                    {
[1255]420                        boost::mutex::scoped_lock lock((*it).second->finishedMutex_);
421                        (*it).second->finished_ = false;
[1247]422                    }
[1255]423
424                    boost::thread(boost::bind(&tclThread, (*it).second, command));
[1230]425                }
[1247]426            }
427        }
428
429        {
[1255]430            boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);
431            unsigned long maxtime = (unsigned long)(dt * 1000000 * TCLTHREADMANAGER_MAX_CPU_USAGE);
432            Ogre::Timer timer;
433            while (!this->queueIsEmpty())
[1247]434            {
[1255]435                CommandExecutor::execute(this->popCommandFromQueue(), false);
436                if (timer.getMicroseconds() > maxtime)
437                    break;
[1230]438            }
[1255]439        }
440    }
[1230]441
[1255]442    void tclThread(TclInterpreterBundle* interpreterBundle, std::string command)
443    {
444        try
445        {
446            boost::mutex::scoped_lock lock(interpreterBundle->interpreterMutex_);
447            interpreterBundle->interpreter_->eval(command);
448        }
449        catch (Tcl::tcl_error const &e)
450        {
451            TclThreadManager::getInstance().forceCommandToFrontOfQueue("error Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + ") error: " + e.what());
452        }
453        catch (std::exception const &e)
454        {
455            TclThreadManager::getInstance().forceCommandToFrontOfQueue("error Error while executing Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + "): " + e.what());
456        }
457
458        boost::mutex::scoped_lock lock(interpreterBundle->finishedMutex_);
459        interpreterBundle->finished_ = true;
460        interpreterBundle->finishedCondition_.notify_all();
[1230]461    }
462}
Note: See TracBrowser for help on using the repository browser.