Changeset 8327 for code/trunk/src/libraries/network/Connection.cc
- Timestamp:
- Apr 25, 2011, 8:22:36 PM (13 years ago)
- Location:
- code/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
code/trunk
- Property svn:mergeinfo changed
/code/branches/network6 (added) merged: 7823-7825,7875,7878,7881-7882,7898,7900,7931,8315
- Property svn:mergeinfo changed
-
code/trunk/src/libraries/network/Connection.cc
r7801 r8327 38 38 39 39 #include "packet/Packet.h" 40 #include <util/Sleep.h> 40 41 41 42 namespace orxonox 42 43 { 43 const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20); 44 45 Connection::Connection(): 46 host_(0), bCommunicationThreadRunning_(false) 44 const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200); 45 const unsigned int NETWORK_DISCONNECT_TIMEOUT = 500; 46 47 Connection::Connection(uint32_t firstPeerID): 48 host_(0), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID) 47 49 { 48 50 enet_initialize(); … … 50 52 this->incomingEventsMutex_ = new boost::mutex; 51 53 this->outgoingEventsMutex_ = new boost::mutex; 54 // this->overallMutex_ = new boost::mutex; 52 55 } 53 56 … … 75 78 } 76 79 77 78 // int Connection::service(ENetEvent* event) { 79 // return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 80 // } 81 82 void Connection::disconnectPeer(ENetPeer *peer) 83 { 84 assert(peer); 85 outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 }; 80 void Connection::disconnectPeer(uint32_t peerID) 81 { 82 // this->overallMutex_->lock(); 83 outgoingEvent outEvent = { peerID, outgoingEventType::disconnectPeer, 0, 0 }; 86 84 87 85 this->outgoingEventsMutex_->lock(); 88 86 this->outgoingEvents_.push_back(outEvent); 89 87 this->outgoingEventsMutex_->unlock(); 90 } 91 92 void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID)93 {94 assert(peer);95 outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID};88 // this->overallMutex_->unlock(); 89 } 90 91 void Connection::disconnectPeers() 92 { 93 outgoingEvent outEvent = { 0, outgoingEventType::disconnectPeers, 0, 0 }; 96 94 97 95 this->outgoingEventsMutex_->lock(); … … 99 97 this->outgoingEventsMutex_->unlock(); 100 98 } 99 100 void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID) 101 { 102 // this->overallMutex_->lock(); 103 outgoingEvent outEvent = { peerID, outgoingEventType::sendPacket, packet, channelID }; 104 105 this->outgoingEventsMutex_->lock(); 106 this->outgoingEvents_.push_back(outEvent); 107 this->outgoingEventsMutex_->unlock(); 108 // this->overallMutex_->unlock(); 109 } 101 110 102 111 void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID) 103 112 { 104 outgoingEvent outEvent = { (ENetPeer*)15, outgoingEventType::broadcastPacket, packet, channelID }; 113 // this->overallMutex_->lock(); 114 outgoingEvent outEvent = { 0, outgoingEventType::broadcastPacket, packet, channelID }; 105 115 106 116 this->outgoingEventsMutex_->lock(); 107 117 this->outgoingEvents_.push_back(outEvent); 108 118 this->outgoingEventsMutex_->unlock(); 119 // this->overallMutex_->unlock(); 109 120 } 110 121 … … 114 125 ENetEvent event; 115 126 127 // this->overallMutex_->lock(); 116 128 while( bCommunicationThreadRunning_ ) 117 129 { … … 119 131 while( enet_host_check_events( this->host_, &event ) > 0 ) 120 132 { 121 // COUT(0) << "incoming event" << endl;122 // received an event123 this->incomingEventsMutex_->lock();124 this->incomingEvents_.push_back(event);125 this->incomingEventsMutex_->unlock();126 } 133 processIncomingEvent(event); 134 } 135 136 // this->overallMutex_->unlock(); 137 msleep(1); 138 // this->overallMutex_->lock(); 127 139 128 140 // Send all waiting outgoing packets … … 138 150 this->outgoingEventsMutex_->unlock(); 139 151 140 switch( outEvent.type ) 141 { 142 case outgoingEventType::sendPacket: 143 enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet ); 144 break; 145 case outgoingEventType::disconnectPeer: 146 enet_peer_disconnect(outEvent.peer, 0); 147 break; 148 case outgoingEventType::broadcastPacket: 149 enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet ); 150 break; 151 default: 152 assert(0); 153 } 152 processOutgoingEvent(outEvent); 153 154 154 this->outgoingEventsMutex_->lock(); 155 155 outgoingEventsCount = this->outgoingEvents_.size(); … … 160 160 if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 161 161 { 162 // COUT(0) << "incoming event after wait" << endl; 163 //received an event 164 this->incomingEventsMutex_->lock(); 165 this->incomingEvents_.push_back(event); 166 this->incomingEventsMutex_->unlock(); 162 processIncomingEvent(event); 163 } 164 } 165 // this->overallMutex_->unlock(); 166 } 167 168 void Connection::processIncomingEvent(ENetEvent& event) 169 { 170 incomingEvent inEvent; 171 // preprocess event 172 switch( event.type ) 173 { 174 case ENET_EVENT_TYPE_CONNECT: 175 inEvent = preprocessConnectEvent(event); 176 break; 177 case ENET_EVENT_TYPE_RECEIVE: 178 inEvent = preprocessReceiveEvent(event); 179 break; 180 case ENET_EVENT_TYPE_DISCONNECT: 181 inEvent = preprocessDisconnectEvent(event); 182 break; 183 case ENET_EVENT_TYPE_NONE: 184 default: 185 return; 186 } 187 188 // pushing event to queue 189 this->incomingEventsMutex_->lock(); 190 this->incomingEvents_.push_back(inEvent); 191 this->incomingEventsMutex_->unlock(); 192 } 193 194 void Connection::processOutgoingEvent(outgoingEvent& event) 195 { 196 ENetPeer* peer; 197 switch( event.type ) 198 { 199 case outgoingEventType::sendPacket: 200 // check whether the peer is still/already in the peer list 201 if( this->peerMap_.find(event.peerID) != this->peerMap_.end() ) 202 { 203 peer = this->peerMap_[event.peerID]; 204 enet_peer_send( peer, event.channelID, event.packet ); 205 } 206 else 207 { 208 // peer probably already disconnected so just discard packet 209 assert(event.peerID<this->nextPeerID_); 210 enet_packet_destroy(event.packet); 211 } 212 break; 213 case outgoingEventType::disconnectPeer: 214 if( this->peerMap_.find(event.peerID) != this->peerMap_.end() ) 215 { 216 peer = this->peerMap_[event.peerID]; 217 enet_peer_disconnect(peer, 0); 218 } 219 else 220 { 221 // peer probably already disconnected so just discard disconnect event 222 assert(event.peerID<this->nextPeerID_); 223 } 224 break; 225 case outgoingEventType::disconnectPeers: 226 disconnectPeersInternal(); 227 break; 228 case outgoingEventType::broadcastPacket: 229 enet_host_broadcast( this->host_, event.channelID, event.packet ); 230 break; 231 default: 232 assert(0); 233 } 234 } 235 236 237 void Connection::disconnectPeersInternal() 238 { 239 std::map<uint32_t, ENetPeer*>::iterator it; 240 for( it=this->peerMap_.begin(); it!=this->peerMap_.end(); ++it ) 241 { 242 enet_peer_disconnect(it->second, 0); 243 } 244 uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT; 245 uint32_t i = 0; 246 while( this->peerMap_.size() && i++ < iterations ) 247 { 248 ENetEvent event; 249 if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 250 { 251 processIncomingEvent(event); 167 252 } 168 253 } … … 171 256 void Connection::processQueue() 172 257 { 173 ENetEvent event;258 incomingEvent inEvent; 174 259 175 260 this->incomingEventsMutex_->lock(); … … 178 263 while( incomingEventsCount > 0 ) 179 264 { 180 packet::Packet* p;265 // pop event from queue 181 266 this->incomingEventsMutex_->lock(); 182 event = this->incomingEvents_.front();267 inEvent = this->incomingEvents_.front(); 183 268 this->incomingEvents_.pop_front(); 184 269 this->incomingEventsMutex_->unlock(); 185 270 186 switch(event.type)187 {188 // log handling ================189 case ENET_EVENT_TYPE_CONNECT:190 addPeer( &event);271 // process event 272 switch( inEvent.type ) 273 { 274 case incomingEventType::peerConnect: 275 addPeer(inEvent.peerID); 191 276 break; 192 case ENET_EVENT_TYPE_DISCONNECT:193 removePeer( &event);277 case incomingEventType::peerDisconnect: 278 removePeer(inEvent.peerID); 194 279 break; 195 case ENET_EVENT_TYPE_RECEIVE: 196 // COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl; 197 p = createPacket( &event ); 198 processPacket(p); 280 case incomingEventType::receivePacket: 281 processPacket(inEvent.packet); 199 282 break; 200 case ENET_EVENT_TYPE_NONE:283 default: 201 284 break; 202 285 } 203 286 287 // check whether there are still events in the queue 204 288 this->incomingEventsMutex_->lock(); 205 289 incomingEventsCount = this->incomingEvents_.size(); … … 207 291 } 208 292 } 209 210 packet::Packet* Connection::createPacket(ENetEvent* event) 211 { 212 packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer); 213 return p; 214 // return p->process(); 215 } 293 294 void Connection::waitOutgoingQueue() 295 { 296 uint32_t outgoingEventsCount; 297 this->outgoingEventsMutex_->lock(); 298 outgoingEventsCount = this->outgoingEvents_.size(); 299 this->outgoingEventsMutex_->unlock(); 300 while( outgoingEventsCount ) 301 { 302 msleep(1); 303 this->outgoingEventsMutex_->lock(); 304 outgoingEventsCount = this->outgoingEvents_.size(); 305 this->outgoingEventsMutex_->unlock(); 306 } 307 } 308 309 310 incomingEvent Connection::preprocessConnectEvent(ENetEvent& event) 311 { 312 // make sure this peer doesn't exist 313 assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() ); 314 assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() ); 315 316 // give peer a new id and increase peerID for next peer 317 uint32_t peerID = this->nextPeerID_; 318 ++this->nextPeerID_; 319 320 // add peer/peerID into peerMap_ and peerIDMap_ 321 this->peerMap_[peerID] = event.peer; 322 this->peerIDMap_[event.peer] = peerID; 323 324 // create new peerEvent and return it 325 incomingEvent inEvent = { peerID, incomingEventType::peerConnect, 0 }; 326 return inEvent; 327 } 328 329 incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event) 330 { 331 // assert that the peer exists and get peerID 332 assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() ); 333 uint32_t peerID = this->peerIDMap_[event.peer]; 334 335 // remove peer/peerID from maps 336 this->peerIDMap_.erase(this->peerIDMap_.find(event.peer)); 337 this->peerMap_.erase(this->peerMap_.find(peerID)); 338 339 // create new peerEvent and return it 340 incomingEvent inEvent = { peerID, incomingEventType::peerDisconnect, 0 }; 341 return inEvent; 342 } 343 344 incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event) 345 { 346 // assert that the peer exists and get peerID 347 assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() ); 348 uint32_t peerID = this->peerIDMap_[event.peer]; 349 350 // create new Packet from ENetPacket 351 packet::Packet* p = packet::Packet::createPacket(event.packet, peerID); 352 353 // create new peerEvent and return it 354 incomingEvent inEvent = { peerID, incomingEventType::receivePacket, p }; 355 return inEvent; 356 } 357 216 358 217 359 void Connection::enableCompression()
Note: See TracChangeset
for help on using the changeset viewer.