Changeset 7801 for code/trunk/src/libraries/network/Connection.cc
- Timestamp:
- Dec 22, 2010, 7:24:24 PM (13 years ago)
- Location:
- code/trunk
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
code/trunk
- Property svn:mergeinfo changed
-
code/trunk/src/libraries/network/Connection.cc
r7163 r7801 30 30 31 31 #include <cassert> 32 #include <deque> 32 33 #define WIN32_LEAN_AND_MEAN 33 34 #include <enet/enet.h> 35 #include <boost/thread.hpp> 36 #include <boost/thread/mutex.hpp> 37 #include <boost/date_time.hpp> 38 34 39 #include "packet/Packet.h" 35 40 36 41 namespace orxonox 37 42 { 38 // Connection *Connection::instance_=0;43 const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(20); 39 44 40 45 Connection::Connection(): 41 host_(0) 42 { 43 // assert(instance_==0); 44 // Connection::instance_=this; 46 host_(0), bCommunicationThreadRunning_(false) 47 { 45 48 enet_initialize(); 46 49 atexit(enet_deinitialize); 47 } 48 49 Connection::~Connection(){ 50 // Connection::instance_=0; 51 } 52 53 int Connection::service(ENetEvent* event) { 54 return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 55 } 56 57 void Connection::disconnectPeer(ENetPeer *peer) { 58 enet_peer_disconnect(peer, 0); 59 } 60 61 bool Connection::addPacket(ENetPacket *packet, ENetPeer *peer) { 62 if(enet_peer_send(peer, NETWORK_DEFAULT_CHANNEL, packet)!=0) 63 return false; 64 else 65 return true; 66 } 67 68 bool Connection::sendPackets() { 69 if ( /*!Connection::instance_ || */this->host_==NULL ) 70 return false; 71 enet_host_flush(this->host_); 72 return true; 73 } 74 75 void Connection::processQueue() { 50 this->incomingEventsMutex_ = new boost::mutex; 51 this->outgoingEventsMutex_ = new boost::mutex; 52 } 53 54 Connection::~Connection() 55 { 56 delete this->incomingEventsMutex_; 57 delete this->outgoingEventsMutex_; 58 } 59 60 void Connection::startCommunicationThread() 61 { 62 this->bCommunicationThreadRunning_ = true; 63 this->communicationThread_ = new boost::thread(&Connection::communicationThread, this); 64 } 65 66 void Connection::stopCommunicationThread() 67 { 68 this->bCommunicationThreadRunning_ = false; 69 if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) ) 70 { 71 // force thread to stop 72 this->communicationThread_->interrupt(); 73 } 74 delete this->communicationThread_; 75 } 76 77 78 // int Connection::service(ENetEvent* event) { 79 // return enet_host_service( this->host_, event, NETWORK_WAIT_TIMEOUT ); 80 // } 81 82 void Connection::disconnectPeer(ENetPeer *peer) 83 { 84 assert(peer); 85 outgoingEvent outEvent = { peer, outgoingEventType::disconnectPeer, (ENetPacket*)10, 15 }; 86 87 this->outgoingEventsMutex_->lock(); 88 this->outgoingEvents_.push_back(outEvent); 89 this->outgoingEventsMutex_->unlock(); 90 } 91 92 void Connection::addPacket(ENetPacket *packet, ENetPeer *peer, uint8_t channelID) 93 { 94 assert(peer); 95 outgoingEvent outEvent = { peer, outgoingEventType::sendPacket, packet, channelID }; 96 97 this->outgoingEventsMutex_->lock(); 98 this->outgoingEvents_.push_back(outEvent); 99 this->outgoingEventsMutex_->unlock(); 100 } 101 102 void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID) 103 { 104 outgoingEvent outEvent = { (ENetPeer*)15, outgoingEventType::broadcastPacket, packet, channelID }; 105 106 this->outgoingEventsMutex_->lock(); 107 this->outgoingEvents_.push_back(outEvent); 108 this->outgoingEventsMutex_->unlock(); 109 } 110 111 112 void Connection::communicationThread() 113 { 76 114 ENetEvent event; 77 78 assert(this->host_); 79 80 while( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 115 116 while( bCommunicationThreadRunning_ ) 81 117 { 82 switch(event.type){ 118 // Receive all pending incoming Events (such as packets, connects and disconnects) 119 while( enet_host_check_events( this->host_, &event ) > 0 ) 120 { 121 // COUT(0) << "incoming event" << endl; 122 // received an event 123 this->incomingEventsMutex_->lock(); 124 this->incomingEvents_.push_back(event); 125 this->incomingEventsMutex_->unlock(); 126 } 127 128 // Send all waiting outgoing packets 129 this->outgoingEventsMutex_->lock(); 130 uint32_t outgoingEventsCount = this->outgoingEvents_.size(); 131 this->outgoingEventsMutex_->unlock(); 132 while( outgoingEventsCount > 0 ) 133 { 134 // COUT(0) << "outgoing event" << endl; 135 this->outgoingEventsMutex_->lock(); 136 outgoingEvent outEvent = this->outgoingEvents_.front(); 137 this->outgoingEvents_.pop_front(); 138 this->outgoingEventsMutex_->unlock(); 139 140 switch( outEvent.type ) 141 { 142 case outgoingEventType::sendPacket: 143 enet_peer_send( outEvent.peer, outEvent.channelID, outEvent.packet ); 144 break; 145 case outgoingEventType::disconnectPeer: 146 enet_peer_disconnect(outEvent.peer, 0); 147 break; 148 case outgoingEventType::broadcastPacket: 149 enet_host_broadcast( this->host_, outEvent.channelID, outEvent.packet ); 150 break; 151 default: 152 assert(0); 153 } 154 this->outgoingEventsMutex_->lock(); 155 outgoingEventsCount = this->outgoingEvents_.size(); 156 this->outgoingEventsMutex_->unlock(); 157 } 158 159 // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms) 160 if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 ) 161 { 162 // COUT(0) << "incoming event after wait" << endl; 163 //received an event 164 this->incomingEventsMutex_->lock(); 165 this->incomingEvents_.push_back(event); 166 this->incomingEventsMutex_->unlock(); 167 } 168 } 169 } 170 171 void Connection::processQueue() 172 { 173 ENetEvent event; 174 175 this->incomingEventsMutex_->lock(); 176 uint32_t incomingEventsCount = this->incomingEvents_.size(); 177 this->incomingEventsMutex_->unlock(); 178 while( incomingEventsCount > 0 ) 179 { 180 packet::Packet* p; 181 this->incomingEventsMutex_->lock(); 182 event = this->incomingEvents_.front(); 183 this->incomingEvents_.pop_front(); 184 this->incomingEventsMutex_->unlock(); 185 186 switch(event.type) 187 { 83 188 // log handling ================ 84 189 case ENET_EVENT_TYPE_CONNECT: … … 89 194 break; 90 195 case ENET_EVENT_TYPE_RECEIVE: 91 processPacket( &event ); 196 // COUT(0) << "ENET_EVENT_TYPE_RECEIVE" << endl; 197 p = createPacket( &event ); 198 processPacket(p); 92 199 break; 93 200 case ENET_EVENT_TYPE_NONE: 94 201 break; 95 202 } 203 204 this->incomingEventsMutex_->lock(); 205 incomingEventsCount = this->incomingEvents_.size(); 206 this->incomingEventsMutex_->unlock(); 96 207 } 97 208 } 98 209 99 bool Connection::processPacket(ENetEvent* event) { 210 packet::Packet* Connection::createPacket(ENetEvent* event) 211 { 100 212 packet::Packet *p = packet::Packet::createPacket(event->packet, event->peer); 101 return p->process(); 102 } 213 return p; 214 // return p->process(); 215 } 216 217 void Connection::enableCompression() 218 { 219 enet_host_compress_with_range_coder( this->host_ ); 220 } 221 103 222 104 223 }
Note: See TracChangeset
for help on using the changeset viewer.