- Timestamp:
- Dec 21, 2010, 6:09:09 PM (13 years ago)
- Location:
- code/branches/presentation2
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
code/branches/presentation2
- Property svn:mergeinfo changed
/code/branches/network3 (added) merged: 7333,7336-7337,7344 /code/branches/network4 (added) merged: 7497,7718,7753-7755 /code/branches/network5 (added) merged: 7757-7759,7772-7778,7780-7781
- Property svn:mergeinfo changed
-
code/branches/presentation2/src/libraries/network/Connection.cc
r7163 r7788 30 30 31 31 #include <cassert> 32 #include <deque> 32 33 #define WIN32_LEAN_AND_MEAN 33 34 #include <enet/enet.h> 35 #include <boost/thread.hpp> 36 #include <boost/thread/mutex.hpp> 37 #include <boost/date_time.hpp> 38 34 39 #include "packet/Packet.h" 35 40 36 41 namespace orxonox 37 42 { 38 // Connection *Connection::instance_=0;43 const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20); 39 44 40 45 Connection::Connection(): 41 host_(0) 42 { 43 // assert(instance_==0); 44 // Connection::instance_=this; 46 host_(0), bCommunicationThreadRunning_(false) 47 { 45 48 enet_initialize(); 46 49 atexit(enet_deinitialize); 47 } 48 49 Connection::~Connection(){ 50 // Connection::instance_=0; 51 } 52 53 int Connection::service(ENetEvent* event) { 54 return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 55 } 56 57 void Connection::disconnectPeer(ENetPeer *peer) { 58 enet_peer_disconnect(peer, 0); 59 } 60 61 bool Connection::addPacket(ENetPacket *packet, ENetPeer *peer) { 62 if(enet_peer_send(peer, NETWORK_DEFAULT_CHANNEL, packet)!=0) 63 return false; 64 else 65 return true; 66 } 67 68 bool Connection::sendPackets() { 69 if ( /*!Connection::instance_ || */this->host_==NULL ) 70 return false; 71 enet_host_flush(this->host_); 72 return true; 73 } 74 75 void Connection::processQueue() { 50 this->incomingEventsMutex_ = new boost::mutex; 51 this->outgoingEventsMutex_ = new boost::mutex; 52 } 53 54 Connection::~Connection() 55 { 56 delete this->incomingEventsMutex_; 57 delete this->outgoingEventsMutex_; 58 } 59 60 void Connection::startCommunicationThread() 61 { 62 this->bCommunicationThreadRunning_ = true; 63 this->communicationThread_ = new boost::thread(&Connection::communicationThread, this); 64 } 65 66 void Connection::stopCommunicationThread() 67 { 68 this->bCommunicationThreadRunning_ = false; 69 if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) ) 70 { 71 // force thread to stop 72 this->communicationThread_->interrupt(); 73 } 74 delete this->communicationThread_; 75 } 76 77 78 // int Connection::service(ENetEvent* event) { 79 // return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 80 // } 81 82 void Connection::disconnectPeer(ENetPeer *peer) 83 { 84 assert(peer); 85 outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 }; 86 87 this->outgoingEventsMutex_->lock(); 88 this->outgoingEvents_.push_back(outEvent); 89 this->outgoingEventsMutex_->unlock(); 90 } 91 92 void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID) 93 { 94 assert(peer); 95 outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID }; 96 97 this->outgoingEventsMutex_->lock(); 98 this->outgoingEvents_.push_back(outEvent); 99 this->outgoingEventsMutex_->unlock(); 100 } 101 102 void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID) 103 { 104 outgoingEvent outEvent = { (ENetPeer*)15, outgoingEventType::broadcastPacket, packet, channelID }; 105 106 this->outgoingEventsMutex_->lock(); 107 this->outgoingEvents_.push_back(outEvent); 108 this->outgoingEventsMutex_->unlock(); 109 } 110 111 112 void Connection::communicationThread() 113 { 114 COUT(0) << "starting communication thread" << endl; 76 115 ENetEvent event; 77 78 assert(this->host_); 79 80 while( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 116 117 while( bCommunicationThreadRunning_ ) 81 118 { 82 switch(event.type){ 119 // Receive all pending incoming Events (such as packets, connects and disconnects) 120 while( enet_host_check_events( this->host_, &event ) > 0 ) 121 { 122 // COUT(0) << "incoming event" << endl; 123 // received an event 124 this->incomingEventsMutex_->lock(); 125 this->incomingEvents_.push_back(event); 126 this->incomingEventsMutex_->unlock(); 127 } 128 129 // Send all waiting outgoing packets 130 this->outgoingEventsMutex_->lock(); 131 uint32_t outgoingEventsCount = this->outgoingEvents_.size(); 132 this->outgoingEventsMutex_->unlock(); 133 while( outgoingEventsCount > 0 ) 134 { 135 // COUT(0) << "outgoing event" << endl; 136 this->outgoingEventsMutex_->lock(); 137 outgoingEvent outEvent = this->outgoingEvents_.front(); 138 this->outgoingEvents_.pop_front(); 139 this->outgoingEventsMutex_->unlock(); 140 141 switch( outEvent.type ) 142 { 143 case outgoingEventType::sendPacket: 144 enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet ); 145 break; 146 case outgoingEventType::disconnectPeer: 147 enet_peer_disconnect(outEvent.peer, 0); 148 break; 149 case outgoingEventType::broadcastPacket: 150 enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet ); 151 break; 152 default: 153 assert(0); 154 } 155 this->outgoingEventsMutex_->lock(); 156 outgoingEventsCount = this->outgoingEvents_.size(); 157 this->outgoingEventsMutex_->unlock(); 158 } 159 160 // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms) 161 if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 162 { 163 // COUT(0) << "incoming event after wait" << endl; 164 //received an event 165 this->incomingEventsMutex_->lock(); 166 this->incomingEvents_.push_back(event); 167 this->incomingEventsMutex_->unlock(); 168 } 169 } 170 } 171 172 void Connection::processQueue() 173 { 174 ENetEvent event; 175 176 this->incomingEventsMutex_->lock(); 177 uint32_t incomingEventsCount = this->incomingEvents_.size(); 178 this->incomingEventsMutex_->unlock(); 179 while( incomingEventsCount > 0 ) 180 { 181 packet::Packet* p; 182 this->incomingEventsMutex_->lock(); 183 event = this->incomingEvents_.front(); 184 this->incomingEvents_.pop_front(); 185 this->incomingEventsMutex_->unlock(); 186 187 switch(event.type) 188 { 83 189 // log handling ================ 84 190 case ENET_EVENT_TYPE_CONNECT: … … 89 195 break; 90 196 case ENET_EVENT_TYPE_RECEIVE: 91 processPacket( &event ); 197 // COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl; 198 p = createPacket( &event ); 199 processPacket(p); 92 200 break; 93 201 case ENET_EVENT_TYPE_NONE: 94 202 break; 95 203 } 204 205 this->incomingEventsMutex_->lock(); 206 incomingEventsCount = this->incomingEvents_.size(); 207 this->incomingEventsMutex_->unlock(); 96 208 } 97 209 } 98 210 99 bool Connection::processPacket(ENetEvent* event) { 211 packet::Packet* Connection::createPacket(ENetEvent* event) 212 { 100 213 packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer); 101 return p->process(); 102 } 214 return p; 215 // return p->process(); 216 } 217 218 void Connection::enableCompression() 219 { 220 enet_host_compress_with_range_coder( this->host_ ); 221 } 222 103 223 104 224 }
Note: See TracChangeset
for help on using the changeset viewer.