Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/branches/console/src/core/TclThreadManager.cc @ 1255

Last change on this file since 1255 was 1255, checked in by landauf, 16 years ago
  • after a total rewrite of the TclThreadManager, there are some new problems, but most of the old ones are fixed. i commit this version to have a backup, not because it's finished.
  • 'exec' command is now called 'source' (tcl and orxonox)
  • 'orxonox' command is now called 'query' (tcl only)
  • added 'crossquery' command (tcl only)
File size: 18.6 KB
Line 
1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Fabian 'x3n' Landau
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include <iostream>
30#include <string>
31
32#include <boost/thread/thread.hpp>
33#include <boost/bind.hpp>
34
35#include <OgreTimer.h>
36
37#include "CoreIncludes.h"
38#include "ConsoleCommand.h"
39#include "CommandExecutor.h"
40#include "Debug.h"
41#include "TclBind.h"
42#include "TclThreadManager.h"
43#include "util/Convert.h"
44
45#define TCLTHREADMANAGER_MAX_QUEUE_LENGTH 1024
46#define TCLTHREADMANAGER_MAX_CPU_USAGE 0.50
47
48namespace orxonox
49{
50//    ConsoleCommand(TclThreadManager, tclthread, AccessLevel::None, true);
51    ConsoleCommand(TclThreadManager, create,    AccessLevel::None, false);
52    ConsoleCommand(TclThreadManager, destroy,   AccessLevel::None, false);
53    ConsoleCommand(TclThreadManager, execute,   AccessLevel::None, false);
54    ConsoleCommand(TclThreadManager, query,     AccessLevel::None, false);
55//    ConsoleCommand(TclThreadManager, status,    AccessLevel::None, false);
56//    ConsoleCommand(TclThreadManager, dump,      AccessLevel::None, false);
57
58    TclThreadManager* instance = &TclThreadManager::getInstance();
59
60    TclThreadManager::TclThreadManager()
61    {
62        RegisterRootObject(TclThreadManager);
63
64        this->threadCounter_ = 0;
65        this->orxonoxInterpreterBundle_.id_ = 0;
66        this->orxonoxInterpreterBundle_.interpreter_ = TclBind::getInstance().getTclInterpreter();
67    }
68
69    TclThreadManager& TclThreadManager::getInstance()
70    {
71        static TclThreadManager instance;
72        return instance;
73    }
74
75    unsigned int TclThreadManager::create()
76    {
77        boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_);
78        TclThreadManager::getInstance().threadCounter_++;
79        std::string name = getConvertedValue<unsigned int, std::string>(TclThreadManager::getInstance().threadCounter_);
80
81        TclInterpreterBundle* bundle = new TclInterpreterBundle;
82        bundle->id_ = TclThreadManager::getInstance().threadCounter_;
83        bundle->interpreter_ = TclThreadManager::getInstance().createNewTclInterpreter(name);
84        bundle->interpreterName_ = name;
85        bundle->running_ = true;
86
87        TclThreadManager::getInstance().interpreterBundles_[TclThreadManager::getInstance().threadCounter_] = bundle;
88        COUT(0) << "Created new Tcl-interpreter with ID " << TclThreadManager::getInstance().threadCounter_ << std::endl;
89        return TclThreadManager::getInstance().threadCounter_;
90    }
91
92    void TclThreadManager::destroy(unsigned int threadID)
93    {
94        TclInterpreterBundle* bundle = TclThreadManager::getInstance().getInterpreterBundle(threadID);
95        if (bundle)
96        {
97            {
98                boost::mutex::scoped_lock lock(bundle->runningMutex_);
99                bundle->running_ = false;
100            }
101            {
102                boost::mutex::scoped_lock lock(bundle->finishedMutex_);
103                while (!bundle->finished_)
104                    bundle->finishedCondition_.wait(lock);
105            }
106            {
107                boost::mutex::scoped_lock lock(bundle->interpreterMutex_);
108                delete bundle->interpreter_;
109            }
110            delete bundle;
111            {
112                boost::mutex::scoped_lock lock(TclThreadManager::getInstance().bundlesMutex_);
113                TclThreadManager::getInstance().interpreterBundles_.erase(threadID);
114            }
115        }
116    }
117
118    void TclThreadManager::execute(unsigned int threadID, const std::string& command)
119    {
120        TclThreadManager::getInstance().pushCommandToQueue(threadID, command);
121    }
122
123    std::string TclThreadManager::query(unsigned int threadID, const std::string& command)
124    {
125        return TclThreadManager::getInstance().evalQuery(TclThreadManager::getInstance().orxonoxInterpreterBundle_.id_, threadID, command);
126    }
127
128    void TclThreadManager::tcl_execute(Tcl::object const &args)
129    {
130        std::string command = args.get();
131        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
132            command = command.substr(1, command.size() - 2);
133
134        TclThreadManager::getInstance().pushCommandToQueue(command);
135    }
136
137    std::string TclThreadManager::tcl_query(int querierID, Tcl::object const &args)
138    {
139        std::string command = args.get();
140        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
141            command = command.substr(1, command.size() - 2);
142
143        return TclThreadManager::getInstance().evalQuery((unsigned int)querierID, command);
144    }
145
146    std::string TclThreadManager::tcl_crossquery(int querierID, int threadID, Tcl::object const &args)
147    {
148        std::string command = args.get();
149        while (command.size() >= 2 && command[0] == '{' && command[command.size() - 1] == '}')
150            command = command.substr(1, command.size() - 2);
151
152        return TclThreadManager::getInstance().evalQuery((unsigned int)querierID, (unsigned int)threadID, command);
153    }
154
155    bool TclThreadManager::tcl_running(int threadID)
156    {
157        TclInterpreterBundle* bundle = TclThreadManager::getInstance().getInterpreterBundle((unsigned int)threadID);
158        if (bundle)
159        {
160            boost::mutex::scoped_lock lock(bundle->runningMutex_);
161            return bundle->running_;
162        }
163        return false;
164    }
165
166    Tcl::interpreter* TclThreadManager::createNewTclInterpreter(const std::string& threadID)
167    {
168        Tcl::interpreter* i = 0;
169        i = new Tcl::interpreter(TclBind::getInstance().getTclLibPath());
170
171        try
172        {
173            i->def("orxonox::query", TclThreadManager::tcl_query, Tcl::variadic());
174            i->def("orxonox::crossquery", TclThreadManager::tcl_crossquery, Tcl::variadic());
175            i->def("orxonox::execute", TclThreadManager::tcl_execute, Tcl::variadic());
176            i->def("orxonox::running", TclThreadManager::tcl_running);
177
178            i->def("execute", TclThreadManager::tcl_execute, Tcl::variadic());
179            i->eval("proc query args { orxonox::query " + threadID + " $args }");
180            i->eval("proc crossquery {id args} { orxonox::crossquery " + threadID + " $id $args }");
181            i->eval("set id " + threadID);
182
183            i->eval("rename exit tcl::exit");
184            i->eval("proc exit {} { orxonox TclThreadManager destroy " + threadID + " }");
185
186            i->eval("redef_puts");
187
188            i->eval("rename while tcl::while");
189            i->eval("proc while {test command} { tcl::while {$test && [orxonox::running " + threadID + "]} \"$command\" }");
190//            i->eval("rename for tcl::for");
191//            i->eval("proc for {start test next command} { uplevel tcl::for \"$start\" \"$test\" \"$next\" \"$command\" }");
192        }
193        catch (Tcl::tcl_error const &e)
194        {   COUT(1) << "Tcl error while creating Tcl-interpreter (" << threadID << "): " << e.what() << std::endl;   }
195        catch (std::exception const &e)
196        {   COUT(1) << "Error while creating Tcl-interpreter (" << threadID << "): " << e.what() << std::endl;   }
197
198        return i;
199    }
200
201    TclInterpreterBundle* TclThreadManager::getInterpreterBundle(unsigned int threadID)
202    {
203        if (threadID == 0)
204            return &this->orxonoxInterpreterBundle_;
205
206        boost::mutex::scoped_lock lock(this->bundlesMutex_);
207        std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.find(threadID);
208        if (it != this->interpreterBundles_.end())
209        {
210            return (*it).second;
211        }
212        else
213        {
214            this->forceCommandToFrontOfQueue("error Error: No Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + " existing.");
215            return 0;
216        }
217    }
218
219    std::string TclThreadManager::dumpList(const std::list<unsigned int>& list)
220    {
221        std::string output = "";
222        for (std::list<unsigned int>::const_iterator it = list.begin(); it != list.end(); ++it)
223        {
224            if (it != list.begin())
225                output += " ";
226
227            output += getConvertedValue<unsigned int, std::string>(*it);
228        }
229        return output;
230    }
231
232    void TclThreadManager::pushCommandToQueue(const std::string& command)
233    {
234        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
235        while (this->orxonoxInterpreterBundle_.queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH)
236            this->fullQueueCondition_.wait(lock);
237
238        this->orxonoxInterpreterBundle_.queue_.push_back(command);
239    }
240
241    void TclThreadManager::forceCommandToFrontOfQueue(const std::string& command)
242    {
243        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
244        this->orxonoxInterpreterBundle_.queue_.push_front(command);
245    }
246
247    std::string TclThreadManager::popCommandFromQueue()
248    {
249        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
250        std::string temp = this->orxonoxInterpreterBundle_.queue_.front();
251        this->orxonoxInterpreterBundle_.queue_.pop_front();
252        this->fullQueueCondition_.notify_one();
253        return temp;
254    }
255
256    bool TclThreadManager::queueIsEmpty()
257    {
258        boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queueMutex_);
259        return this->orxonoxInterpreterBundle_.queue_.empty();
260    }
261
262    void TclThreadManager::pushCommandToQueue(unsigned int threadID, const std::string& command)
263    {
264        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
265        if (bundle)
266        {
267            boost::mutex::scoped_lock lock(bundle->queueMutex_);
268            if (bundle->queue_.size() >= TCLTHREADMANAGER_MAX_QUEUE_LENGTH)
269            {
270                this->forceCommandToFrontOfQueue("error Error: Queue of Tcl-interpreter " + getConvertedValue<unsigned int, std::string>(threadID) + " is full, couldn't add command.");
271                return;
272            }
273
274            bundle->queue_.push_back(command);
275        }
276    }
277
278    std::string TclThreadManager::popCommandFromQueue(unsigned int threadID)
279    {
280        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
281        if (bundle)
282        {
283            boost::mutex::scoped_lock lock(bundle->queueMutex_);
284            std::string temp = bundle->queue_.front();
285            bundle->queue_.pop_front();
286            return temp;
287        }
288        return "";
289    }
290
291    bool TclThreadManager::queueIsEmpty(unsigned int threadID)
292    {
293        TclInterpreterBundle* bundle = this->getInterpreterBundle(threadID);
294        if (bundle)
295        {
296            boost::mutex::scoped_lock lock(bundle->queueMutex_);
297            return bundle->queue_.empty();
298        }
299        return true;
300    }
301
302    bool TclThreadManager::updateQueriersList(TclInterpreterBundle* querier, TclInterpreterBundle* target)
303    {
304        if (querier == target)
305            return false;
306
307        boost::mutex::scoped_lock lock(target->queriersMutex_);
308
309        {
310            boost::mutex::scoped_lock lock(querier->queriersMutex_);
311            target->queriers_.insert(target->queriers_.end(), querier->queriers_.begin(), querier->queriers_.end());
312        }
313
314        target->queriers_.insert(target->queriers_.end(), querier->id_);
315
316        if (std::find(target->queriers_.begin(), target->queriers_.end(), target->id_) != target->queriers_.end())
317        {
318            this->forceCommandToFrontOfQueue("error Error: Circular query (" + this->dumpList(target->queriers_) + " -> " + getConvertedValue<unsigned int, std::string>(target->id_) + "), couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(target->id_) + " from other interpreter with ID " + getConvertedValue<unsigned int, std::string>(querier->id_) + ".");
319            return false;
320        }
321
322        return true;
323    }
324
325    std::string TclThreadManager::evalQuery(unsigned int querierID, const std::string& command)
326    {
327        TclInterpreterBundle* querier = this->getInterpreterBundle(querierID);
328        std::string output = "";
329        if (querier)
330        {
331            if (this->updateQueriersList(querier, &this->orxonoxInterpreterBundle_))
332            {
333                boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);
334                this->orxonoxEvalCondition_.wait(lock);
335
336                if (!CommandExecutor::execute(command, false))
337                    this->forceCommandToFrontOfQueue("error Error: Can't execute command \"" + command + "\"!");
338
339                if (CommandExecutor::getLastEvaluation().hasReturnvalue())
340                    output = CommandExecutor::getLastEvaluation().getReturnvalue().toString();
341            }
342
343            boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.queriersMutex_);
344            this->orxonoxInterpreterBundle_.queriers_.clear();
345        }
346        return output;
347    }
348
349    std::string TclThreadManager::evalQuery(unsigned int querierID, unsigned int threadID, const std::string& command)
350    {
351        TclInterpreterBundle* target = this->getInterpreterBundle(threadID);
352        std::string output = "";
353        if (target)
354        {
355            TclInterpreterBundle* querier = 0;
356            if (querierID)
357                querier = this->getInterpreterBundle(querierID);
358            else
359                querier = &this->orxonoxInterpreterBundle_;
360
361            if (querier)
362            {
363                if (this->updateQueriersList(querier, target))
364                {
365                    boost::mutex::scoped_lock lock(target->interpreterMutex_, boost::defer_lock_t());
366                    bool successfullyLocked = false;
367                    try
368                    {
369                        if (querierID == 0 || std::find(querier->queriers_.begin(), querier->queriers_.end(), (unsigned int)0) != querier->queriers_.end())
370                            successfullyLocked = lock.try_lock();
371                        else
372                        {
373                            while (!lock.try_lock())
374                                boost::this_thread::yield();
375
376                            successfullyLocked = true;
377                        }
378                    } catch (...) {}
379
380                    if (successfullyLocked)
381                    {
382                        try
383                        {   output = (std::string)target->interpreter_->eval(command);   }
384                        catch (Tcl::tcl_error const &e)
385                        {   this->forceCommandToFrontOfQueue("error Tcl error: " + (std::string)e.what());   }
386                        catch (std::exception const &e)
387                        {   this->forceCommandToFrontOfQueue("error Error while executing Tcl: " + (std::string)e.what());   }
388                    }
389                    else
390                    {
391                        this->forceCommandToFrontOfQueue("error Error: Couldn't query Tcl-interpreter with ID " + getConvertedValue<unsigned int, std::string>(threadID) + ", interpreter is busy right now.");
392                    }
393                }
394
395                boost::mutex::scoped_lock lock(target->queriersMutex_);
396                target->queriers_.clear();
397            }
398        }
399        return output;
400    }
401
402    void TclThreadManager::tick(float dt)
403    {
404        {
405            this->orxonoxEvalCondition_.notify_one();
406            boost::this_thread::yield();
407        }
408
409        {
410            boost::mutex::scoped_lock lock(this->bundlesMutex_);
411            for (std::map<unsigned int, TclInterpreterBundle*>::iterator it = this->interpreterBundles_.begin(); it != this->interpreterBundles_.end(); ++it)
412            {
413                boost::mutex::scoped_lock lock((*it).second->queueMutex_);
414                if (!(*it).second->queue_.empty())
415                {
416                    std::string command = (*it).second->queue_.front();
417                    (*it).second->queue_.pop_front();
418
419                    {
420                        boost::mutex::scoped_lock lock((*it).second->finishedMutex_);
421                        (*it).second->finished_ = false;
422                    }
423
424                    boost::thread(boost::bind(&tclThread, (*it).second, command));
425                }
426            }
427        }
428
429        {
430            boost::mutex::scoped_lock lock(this->orxonoxInterpreterBundle_.interpreterMutex_);
431            unsigned long maxtime = (unsigned long)(dt * 1000000 * TCLTHREADMANAGER_MAX_CPU_USAGE);
432            Ogre::Timer timer;
433            while (!this->queueIsEmpty())
434            {
435                CommandExecutor::execute(this->popCommandFromQueue(), false);
436                if (timer.getMicroseconds() > maxtime)
437                    break;
438            }
439        }
440    }
441
442    void tclThread(TclInterpreterBundle* interpreterBundle, std::string command)
443    {
444        try
445        {
446            boost::mutex::scoped_lock lock(interpreterBundle->interpreterMutex_);
447            interpreterBundle->interpreter_->eval(command);
448        }
449        catch (Tcl::tcl_error const &e)
450        {
451            TclThreadManager::getInstance().forceCommandToFrontOfQueue("error Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + ") error: " + e.what());
452        }
453        catch (std::exception const &e)
454        {
455            TclThreadManager::getInstance().forceCommandToFrontOfQueue("error Error while executing Tcl (ID " + getConvertedValue<unsigned int, std::string>(interpreterBundle->id_) + "): " + e.what());
456        }
457
458        boost::mutex::scoped_lock lock(interpreterBundle->finishedMutex_);
459        interpreterBundle->finished_ = true;
460        interpreterBundle->finishedCondition_.notify_all();
461    }
462}
Note: See TracBrowser for help on using the repository browser.