- Timestamp:
- Dec 17, 2010, 10:41:24 AM (15 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
code/branches/network5/src/libraries/network/Connection.cc
r7163 r7772 30 30 31 31 #include <cassert> 32 #include <deque> 32 33 #define WIN32_LEAN_AND_MEAN 33 34 #include <enet/enet.h> 35 #include <boost/thread.hpp> 36 #include <boost/thread/mutex.hpp> 37 #include <boost/date_time.hpp> 38 34 39 #include "packet/Packet.h" 35 40 36 41 namespace orxonox 37 42 { 38 // Connection *Connection::instance_=0;43 const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20); 39 44 40 45 Connection::Connection(): 41 host_(0) 42 { 43 // assert(instance_==0); 44 // Connection::instance_=this; 46 host_(0), bCommunicationThreadRunning_(false) 47 { 45 48 enet_initialize(); 46 49 atexit(enet_deinitialize); 47 } 48 49 Connection::~Connection(){ 50 // Connection::instance_=0; 51 } 52 53 int Connection::service(ENetEvent* event) { 54 return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 55 } 56 57 void Connection::disconnectPeer(ENetPeer *peer) { 58 enet_peer_disconnect(peer, 0); 59 } 60 61 bool Connection::addPacket(ENetPacket *packet, ENetPeer *peer) { 62 if(enet_peer_send(peer, NETWORK_DEFAULT_CHANNEL, packet)!=0) 63 return false; 64 else 65 return true; 66 } 67 68 bool Connection::sendPackets() { 69 if ( /*!Connection::instance_ || */this->host_==NULL ) 70 return false; 71 enet_host_flush(this->host_); 72 return true; 73 } 74 75 void Connection::processQueue() { 50 this->incomingEventsMutex_ = new boost::mutex; 51 this->outgoingEventsMutex_ = new boost::mutex; 52 } 53 54 Connection::~Connection() 55 { 56 delete this->incomingEventsMutex_; 57 delete this->outgoingEventsMutex_; 58 } 59 60 void Connection::startCommunicationThread() 61 { 62 this->bCommunicationThreadRunning_ = true; 63 this->communicationThread_ = new boost::thread(&Connection::communicationThread, this); 64 } 65 66 void Connection::stopCommunicationThread() 67 { 68 this->bCommunicationThreadRunning_ = false; 69 if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) ) 70 { 71 // force thread to stop 72 this->communicationThread_->interrupt(); 73 } 74 delete this->communicationThread_; 75 } 76 77 78 // int Connection::service(ENetEvent* event) { 79 // return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 80 // } 81 82 void Connection::disconnectPeer(ENetPeer *peer) 83 { 84 outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 }; 85 86 this->outgoingEventsMutex_->lock(); 87 this->outgoingEvents_.push_back(outEvent); 88 this->outgoingEventsMutex_->unlock(); 89 } 90 91 void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID) 92 { 93 outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID }; 94 95 this->outgoingEventsMutex_->lock(); 96 this->outgoingEvents_.push_back(outEvent); 97 this->outgoingEventsMutex_->unlock(); 98 } 99 100 void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID) 101 { 102 outgoingEvent outEvent = { 0, outgoingEventType::broadcastPacket, packet, channelID }; 103 104 this->outgoingEventsMutex_->lock(); 105 this->outgoingEvents_.push_back(outEvent); 106 this->outgoingEventsMutex_->unlock(); 107 } 108 109 110 void Connection::communicationThread() 111 { 112 COUT(0) << "starting communication thread" << endl; 76 113 ENetEvent event; 77 78 assert(this->host_); 79 80 while( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 114 115 while( bCommunicationThreadRunning_ ) 81 116 { 82 switch(event.type){ 117 // Receive all pending incoming Events (such as packets, connects and disconnects) 118 while( enet_host_check_events( this->host_, &event ) > 0 ) 119 { 120 // COUT(0) << "incoming event" << endl; 121 // received an event 122 this->incomingEventsMutex_->lock(); 123 this->incomingEvents_.push_back(event); 124 this->incomingEventsMutex_->unlock(); 125 } 126 127 // Send all waiting outgoing packets 128 this->outgoingEventsMutex_->lock(); 129 uint32_t outgoingEventsCount = this->outgoingEvents_.size(); 130 this->outgoingEventsMutex_->unlock(); 131 while( outgoingEventsCount > 0 ) 132 { 133 // COUT(0) << "outgoing event" << endl; 134 this->outgoingEventsMutex_->lock(); 135 outgoingEvent outEvent = this->outgoingEvents_.front(); 136 this->outgoingEvents_.pop_front(); 137 this->outgoingEventsMutex_->unlock(); 138 139 switch( outEvent.type ) 140 { 141 case outgoingEventType::sendPacket: 142 enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet ); 143 break; 144 case outgoingEventType::disconnectPeer: 145 enet_peer_disconnect(outEvent.peer, 0); 146 break; 147 case outgoingEventType::broadcastPacket: 148 enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet ); 149 break; 150 default: 151 assert(0); 152 } 153 this->outgoingEventsMutex_->lock(); 154 outgoingEventsCount = this->outgoingEvents_.size(); 155 this->outgoingEventsMutex_->unlock(); 156 } 157 158 // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms) 159 if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 160 { 161 // COUT(0) << "incoming event after wait" << endl; 162 //received an event 163 this->incomingEventsMutex_->lock(); 164 this->incomingEvents_.push_back(event); 165 this->incomingEventsMutex_->unlock(); 166 } 167 } 168 } 169 170 void Connection::processQueue() 171 { 172 ENetEvent event; 173 174 this->incomingEventsMutex_->lock(); 175 uint32_t incomingEventsCount = this->incomingEvents_.size(); 176 this->incomingEventsMutex_->unlock(); 177 while( incomingEventsCount > 0 ) 178 { 179 this->incomingEventsMutex_->lock(); 180 event = this->incomingEvents_.front(); 181 this->incomingEvents_.pop_front(); 182 this->incomingEventsMutex_->unlock(); 183 184 switch(event.type) 185 { 83 186 // log handling ================ 84 187 case ENET_EVENT_TYPE_CONNECT: … … 89 192 break; 90 193 case ENET_EVENT_TYPE_RECEIVE: 194 // COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl; 91 195 processPacket( &event ); 92 196 break; … … 94 198 break; 95 199 } 200 201 this->incomingEventsMutex_->lock(); 202 incomingEventsCount = this->incomingEvents_.size(); 203 this->incomingEventsMutex_->unlock(); 96 204 } 97 205 } 98 206 99 bool Connection::processPacket(ENetEvent* event) { 207 bool Connection::processPacket(ENetEvent* event) 208 { 100 209 packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer); 101 210 return p->process();
Note: See TracChangeset
for help on using the changeset viewer.