Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Ignore:
Timestamp:
Dec 21, 2010, 6:09:09 PM (13 years ago)
Author:
scheusso
Message:

merged network5 into presentation2 branch (untested)

Location:
code/branches/presentation2
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • code/branches/presentation2

  • code/branches/presentation2/src/libraries/network/Connection.cc

    r7163 r7788  
    3030
    3131#include <cassert>
     32#include <deque>
    3233#define WIN32_LEAN_AND_MEAN
    3334#include <enet/enet.h>
     35#include <boost/thread.hpp>
     36#include <boost/thread/mutex.hpp>
     37#include <boost/date_time.hpp>
     38
    3439#include "packet/Packet.h"
    3540
    3641namespace orxonox
    3742{
    38 //   Connection *Connection::instance_=0;
     43  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20);
    3944
    4045  Connection::Connection():
    41     host_(0)
    42   {
    43 //     assert(instance_==0);
    44 //     Connection::instance_=this;
     46    host_(0), bCommunicationThreadRunning_(false)
     47  {
    4548    enet_initialize();
    4649    atexit(enet_deinitialize);
    47   }
    48 
    49   Connection::~Connection(){
    50 //     Connection::instance_=0;
    51   }
    52 
    53   int Connection::service(ENetEvent* event) {
    54     return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT );
    55   }
    56 
    57   void Connection::disconnectPeer(ENetPeer *peer) {
    58     enet_peer_disconnect(peer, 0);
    59   }
    60 
    61   bool Connection::addPacket(ENetPacket *packet, ENetPeer *peer) {
    62     if(enet_peer_send(peer, NETWORK_DEFAULT_CHANNEL, packet)!=0)
    63       return false;
    64     else
    65       return true;
    66   }
    67 
    68   bool Connection::sendPackets() {
    69     if ( /*!Connection::instance_ || */this->host_==NULL )
    70       return false;
    71     enet_host_flush(this->host_);
    72     return true;
    73   }
    74 
    75   void Connection::processQueue() {
     50    this->incomingEventsMutex_ = new boost::mutex;
     51    this->outgoingEventsMutex_ = new boost::mutex;
     52  }
     53
     54  Connection::~Connection()
     55  {
     56    delete this->incomingEventsMutex_;
     57    delete this->outgoingEventsMutex_;
     58  }
     59
     60  void Connection::startCommunicationThread()
     61  {
     62    this->bCommunicationThreadRunning_ = true;
     63    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
     64  }
     65 
     66  void Connection::stopCommunicationThread()
     67  {
     68    this->bCommunicationThreadRunning_ = false;
     69    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
     70    {
     71      // force thread to stop
     72      this->communicationThread_->interrupt();
     73    }
     74    delete this->communicationThread_;
     75  }
     76
     77
     78//   int Connection::service(ENetEvent* event) {
     79//     return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT );
     80//   }
     81
     82  void Connection::disconnectPeer(ENetPeer *peer)
     83  {
     84    assert(peer);
     85    outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 };
     86   
     87    this->outgoingEventsMutex_->lock();
     88    this->outgoingEvents_.push_back(outEvent);
     89    this->outgoingEventsMutex_->unlock();
     90  }
     91
     92  void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID)
     93  {
     94    assert(peer);
     95    outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID };
     96   
     97    this->outgoingEventsMutex_->lock();
     98    this->outgoingEvents_.push_back(outEvent);
     99    this->outgoingEventsMutex_->unlock();
     100  }
     101 
     102  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
     103  {
     104    outgoingEvent outEvent = { (ENetPeer*)15, outgoingEventType::broadcastPacket, packet, channelID };
     105   
     106    this->outgoingEventsMutex_->lock();
     107    this->outgoingEvents_.push_back(outEvent);
     108    this->outgoingEventsMutex_->unlock();
     109  }
     110
     111 
     112  void Connection::communicationThread()
     113  {
     114    COUT(0) << "starting communication thread" << endl;
    76115    ENetEvent event;
    77 
    78     assert(this->host_);
    79 
    80     while( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
     116   
     117    while( bCommunicationThreadRunning_ )
    81118    {
    82       switch(event.type){
     119      // Receive all pending incoming Events (such as packets, connects and disconnects)
     120      while( enet_host_check_events( this->host_, &event ) > 0 )
     121      {
     122//         COUT(0) << "incoming event" << endl;
     123        // received an event
     124        this->incomingEventsMutex_->lock();
     125        this->incomingEvents_.push_back(event);
     126        this->incomingEventsMutex_->unlock();
     127      }
     128     
     129      // Send all waiting outgoing packets
     130      this->outgoingEventsMutex_->lock();
     131      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
     132      this->outgoingEventsMutex_->unlock();
     133      while( outgoingEventsCount > 0 )
     134      {
     135//         COUT(0) << "outgoing event" << endl;
     136        this->outgoingEventsMutex_->lock();
     137        outgoingEvent outEvent = this->outgoingEvents_.front();
     138        this->outgoingEvents_.pop_front();
     139        this->outgoingEventsMutex_->unlock();
     140       
     141        switch( outEvent.type )
     142        {
     143          case outgoingEventType::sendPacket:
     144            enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet );
     145            break;
     146          case outgoingEventType::disconnectPeer:
     147            enet_peer_disconnect(outEvent.peer, 0);
     148            break;
     149          case outgoingEventType::broadcastPacket:
     150            enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet );
     151            break;
     152          default:
     153            assert(0);
     154        }
     155        this->outgoingEventsMutex_->lock();
     156        outgoingEventsCount = this->outgoingEvents_.size();
     157        this->outgoingEventsMutex_->unlock();
     158      }
     159     
     160      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
     161      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
     162      {
     163//         COUT(0) << "incoming event after wait" << endl;
     164        //received an event
     165        this->incomingEventsMutex_->lock();
     166        this->incomingEvents_.push_back(event);
     167        this->incomingEventsMutex_->unlock();
     168      }
     169    }
     170  }
     171
     172  void Connection::processQueue()
     173  {
     174    ENetEvent event;
     175
     176    this->incomingEventsMutex_->lock();
     177    uint32_t incomingEventsCount = this->incomingEvents_.size();
     178    this->incomingEventsMutex_->unlock();
     179    while( incomingEventsCount > 0 )
     180    {
     181      packet::Packet* p;
     182      this->incomingEventsMutex_->lock();
     183      event = this->incomingEvents_.front();
     184      this->incomingEvents_.pop_front();
     185      this->incomingEventsMutex_->unlock();
     186     
     187      switch(event.type)
     188      {
    83189        // log handling ================
    84190        case ENET_EVENT_TYPE_CONNECT:
     
    89195          break;
    90196        case ENET_EVENT_TYPE_RECEIVE:
    91           processPacket( &event );
     197//           COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl;
     198          p = createPacket( &event );
     199          processPacket(p);
    92200          break;
    93201        case ENET_EVENT_TYPE_NONE:
    94202          break;
    95203      }
     204     
     205      this->incomingEventsMutex_->lock();
     206      incomingEventsCount = this->incomingEvents_.size();
     207      this->incomingEventsMutex_->unlock();
    96208    }
    97209  }
    98210
    99   bool Connection::processPacket(ENetEvent* event) {
     211  packet::Packet* Connection::createPacket(ENetEvent* event)
     212  {
    100213    packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer);
    101     return p->process();
    102   }
     214    return p;
     215//     return p->process();
     216  }
     217 
     218  void Connection::enableCompression()
     219  {
     220    enet_host_compress_with_range_coder( this->host_ );
     221  }
     222
    103223
    104224}
Note: See TracChangeset for help on using the changeset viewer.