Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Ignore:
Timestamp:
Apr 25, 2011, 8:22:36 PM (13 years ago)
Author:
scheusso
Message:

merging network6 into trunk

Location:
code/trunk
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • code/trunk

  • code/trunk/src/libraries/network/Connection.cc

    r7801 r8327  
    3838
    3939#include "packet/Packet.h"
     40#include <util/Sleep.h>
    4041
    4142namespace orxonox
    4243{
    43   const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20);
    44 
    45   Connection::Connection():
    46     host_(0), bCommunicationThreadRunning_(false)
     44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
     45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
     46
     47  Connection::Connection(uint32_t firstPeerID):
     48    host_(0), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
    4749  {
    4850    enet_initialize();
     
    5052    this->incomingEventsMutex_ = new boost::mutex;
    5153    this->outgoingEventsMutex_ = new boost::mutex;
     54//     this->overallMutex_ = new boost::mutex;
    5255  }
    5356
     
    7578  }
    7679
    77 
    78 //   int Connection::service(ENetEvent* event) {
    79 //     return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT );
    80 //   }
    81 
    82   void Connection::disconnectPeer(ENetPeer *peer)
    83   {
    84     assert(peer);
    85     outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 };
     80  void Connection::disconnectPeer(uint32_t peerID)
     81  {
     82//     this->overallMutex_->lock();
     83    outgoingEvent outEvent = { peerID, outgoingEventType::disconnectPeer, 0, 0 };
    8684   
    8785    this->outgoingEventsMutex_->lock();
    8886    this->outgoingEvents_.push_back(outEvent);
    8987    this->outgoingEventsMutex_->unlock();
    90   }
    91 
    92   void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID)
    93   {
    94     assert(peer);
    95     outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID };
     88//     this->overallMutex_->unlock();
     89  }
     90 
     91  void Connection::disconnectPeers()
     92  {
     93    outgoingEvent outEvent = { 0, outgoingEventType::disconnectPeers, 0, 0 };
    9694   
    9795    this->outgoingEventsMutex_->lock();
     
    9997    this->outgoingEventsMutex_->unlock();
    10098  }
     99
     100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
     101  {
     102//     this->overallMutex_->lock();
     103    outgoingEvent outEvent = { peerID, outgoingEventType::sendPacket, packet, channelID };
     104   
     105    this->outgoingEventsMutex_->lock();
     106    this->outgoingEvents_.push_back(outEvent);
     107    this->outgoingEventsMutex_->unlock();
     108//     this->overallMutex_->unlock();
     109  }
    101110 
    102111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
    103112  {
    104     outgoingEvent outEvent = { (ENetPeer*)15, outgoingEventType::broadcastPacket, packet, channelID };
     113//     this->overallMutex_->lock();
     114    outgoingEvent outEvent = { 0, outgoingEventType::broadcastPacket, packet, channelID };
    105115   
    106116    this->outgoingEventsMutex_->lock();
    107117    this->outgoingEvents_.push_back(outEvent);
    108118    this->outgoingEventsMutex_->unlock();
     119//     this->overallMutex_->unlock();
    109120  }
    110121
     
    114125    ENetEvent event;
    115126   
     127//     this->overallMutex_->lock();
    116128    while( bCommunicationThreadRunning_ )
    117129    {
     
    119131      while( enet_host_check_events( this->host_, &event ) > 0 )
    120132      {
    121 //         COUT(0) << "incoming event" << endl;
    122         // received an event
    123         this->incomingEventsMutex_->lock();
    124         this->incomingEvents_.push_back(event);
    125         this->incomingEventsMutex_->unlock();
    126       }
     133        processIncomingEvent(event);
     134      }
     135     
     136//       this->overallMutex_->unlock();
     137      msleep(1);
     138//       this->overallMutex_->lock();
    127139     
    128140      // Send all waiting outgoing packets
     
    138150        this->outgoingEventsMutex_->unlock();
    139151       
    140         switch( outEvent.type )
    141         {
    142           case outgoingEventType::sendPacket:
    143             enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet );
    144             break;
    145           case outgoingEventType::disconnectPeer:
    146             enet_peer_disconnect(outEvent.peer, 0);
    147             break;
    148           case outgoingEventType::broadcastPacket:
    149             enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet );
    150             break;
    151           default:
    152             assert(0);
    153         }
     152        processOutgoingEvent(outEvent);
     153       
    154154        this->outgoingEventsMutex_->lock();
    155155        outgoingEventsCount = this->outgoingEvents_.size();
     
    160160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
    161161      {
    162 //         COUT(0) << "incoming event after wait" << endl;
    163         //received an event
    164         this->incomingEventsMutex_->lock();
    165         this->incomingEvents_.push_back(event);
    166         this->incomingEventsMutex_->unlock();
     162        processIncomingEvent(event);
     163      }
     164    }
     165//     this->overallMutex_->unlock();
     166  }
     167 
     168  void Connection::processIncomingEvent(ENetEvent& event)
     169  {
     170    incomingEvent inEvent;
     171    // preprocess event
     172    switch( event.type )
     173    {
     174      case ENET_EVENT_TYPE_CONNECT:
     175        inEvent = preprocessConnectEvent(event);
     176        break;
     177      case ENET_EVENT_TYPE_RECEIVE:
     178        inEvent = preprocessReceiveEvent(event);
     179        break;
     180      case ENET_EVENT_TYPE_DISCONNECT:
     181        inEvent = preprocessDisconnectEvent(event);
     182        break;
     183      case ENET_EVENT_TYPE_NONE:
     184      default:
     185        return;
     186    }
     187   
     188    // pushing event to queue
     189    this->incomingEventsMutex_->lock();
     190    this->incomingEvents_.push_back(inEvent);
     191    this->incomingEventsMutex_->unlock();
     192  }
     193 
     194  void Connection::processOutgoingEvent(outgoingEvent& event)
     195  {
     196    ENetPeer* peer;
     197    switch( event.type )
     198    {
     199      case outgoingEventType::sendPacket:
     200        // check whether the peer is still/already in the peer list
     201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
     202        {
     203          peer = this->peerMap_[event.peerID];
     204          enet_peer_send( peer, event.channelID, event.packet );
     205        }
     206        else
     207        {
     208          // peer probably already disconnected so just discard packet
     209          assert(event.peerID<this->nextPeerID_);
     210          enet_packet_destroy(event.packet);
     211        }
     212        break;
     213      case outgoingEventType::disconnectPeer:
     214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
     215        {
     216          peer = this->peerMap_[event.peerID];
     217          enet_peer_disconnect(peer, 0);
     218        }
     219        else
     220        {
     221          // peer probably already disconnected so just discard disconnect event
     222          assert(event.peerID<this->nextPeerID_);
     223        }
     224        break;
     225      case outgoingEventType::disconnectPeers:
     226        disconnectPeersInternal();
     227        break;
     228      case outgoingEventType::broadcastPacket:
     229        enet_host_broadcast( this->host_, event.channelID, event.packet );
     230        break;
     231      default:
     232        assert(0);
     233    }
     234  }
     235
     236
     237  void Connection::disconnectPeersInternal()
     238  {
     239    std::map<uint32_t, ENetPeer*>::iterator it;
     240    for( it=this->peerMap_.begin(); it!=this->peerMap_.end(); ++it )
     241    {
     242      enet_peer_disconnect(it->second, 0);
     243    }
     244    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
     245    uint32_t i = 0;
     246    while( this->peerMap_.size() && i++ < iterations )
     247    {
     248      ENetEvent event;
     249      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
     250      {
     251        processIncomingEvent(event);
    167252      }
    168253    }
     
    171256  void Connection::processQueue()
    172257  {
    173     ENetEvent event;
     258    incomingEvent inEvent;
    174259
    175260    this->incomingEventsMutex_->lock();
     
    178263    while( incomingEventsCount > 0 )
    179264    {
    180       packet::Packet* p;
     265      // pop event from queue
    181266      this->incomingEventsMutex_->lock();
    182       event = this->incomingEvents_.front();
     267      inEvent = this->incomingEvents_.front();
    183268      this->incomingEvents_.pop_front();
    184269      this->incomingEventsMutex_->unlock();
    185270     
    186       switch(event.type)
    187       {
    188         // log handling ================
    189         case ENET_EVENT_TYPE_CONNECT:
    190           addPeer( &event );
     271      // process event
     272      switch( inEvent.type )
     273      {
     274        case incomingEventType::peerConnect:
     275          addPeer(inEvent.peerID);
    191276          break;
    192         case ENET_EVENT_TYPE_DISCONNECT:
    193           removePeer( &event );
     277        case incomingEventType::peerDisconnect:
     278          removePeer(inEvent.peerID);
    194279          break;
    195         case ENET_EVENT_TYPE_RECEIVE:
    196 //           COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl;
    197           p = createPacket( &event );
    198           processPacket(p);
     280        case incomingEventType::receivePacket:
     281          processPacket(inEvent.packet);
    199282          break;
    200         case ENET_EVENT_TYPE_NONE:
     283        default:
    201284          break;
    202285      }
    203286     
     287      // check whether there are still events in the queue
    204288      this->incomingEventsMutex_->lock();
    205289      incomingEventsCount = this->incomingEvents_.size();
     
    207291    }
    208292  }
    209 
    210   packet::Packet* Connection::createPacket(ENetEvent* event)
    211   {
    212     packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer);
    213     return p;
    214 //     return p->process();
    215   }
     293 
     294  void Connection::waitOutgoingQueue()
     295  {
     296    uint32_t outgoingEventsCount;
     297    this->outgoingEventsMutex_->lock();
     298    outgoingEventsCount = this->outgoingEvents_.size();
     299    this->outgoingEventsMutex_->unlock();
     300    while( outgoingEventsCount )
     301    {
     302      msleep(1);
     303      this->outgoingEventsMutex_->lock();
     304      outgoingEventsCount = this->outgoingEvents_.size();
     305      this->outgoingEventsMutex_->unlock();
     306    }
     307  }
     308
     309
     310  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
     311  {
     312    // make sure this peer doesn't exist
     313    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
     314    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
     315   
     316    // give peer a new id and increase peerID for next peer
     317    uint32_t peerID = this->nextPeerID_;
     318    ++this->nextPeerID_;
     319   
     320    // add peer/peerID into peerMap_ and peerIDMap_
     321    this->peerMap_[peerID] = event.peer;
     322    this->peerIDMap_[event.peer] = peerID;
     323   
     324    // create new peerEvent and return it
     325    incomingEvent inEvent = { peerID, incomingEventType::peerConnect, 0 };
     326    return inEvent;
     327  }
     328 
     329  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
     330  {
     331    // assert that the peer exists and get peerID
     332    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
     333    uint32_t peerID = this->peerIDMap_[event.peer];
     334   
     335    // remove peer/peerID from maps
     336    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
     337    this->peerMap_.erase(this->peerMap_.find(peerID));
     338   
     339    // create new peerEvent and return it
     340    incomingEvent inEvent = { peerID, incomingEventType::peerDisconnect, 0 };
     341    return inEvent;
     342  }
     343 
     344  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
     345  {
     346    // assert that the peer exists and get peerID
     347    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
     348    uint32_t peerID = this->peerIDMap_[event.peer];
     349   
     350    // create new Packet from ENetPacket
     351    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
     352   
     353    // create new peerEvent and return it
     354    incomingEvent inEvent = { peerID, incomingEventType::receivePacket, p };
     355    return inEvent;
     356  }
     357
    216358 
    217359  void Connection::enableCompression()
Note: See TracChangeset for help on using the changeset viewer.