Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/branches/cpp11_v2/src/libraries/network/Connection.cc @ 11006

Last change on this file since 11006 was 11006, checked in by landauf, 8 years ago

made some enums in network library strongly typed. for most other enums in network this isn't possible because they are often used like flags (converted to int and compared with binary operators).
packet::Type now uses uint8_t as underlying type which reduces the network traffic (by default the type was int)

  • Property svn:eol-style set to native
File size: 10.9 KB
Line 
1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Oliver Scheuss
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include "Connection.h"
30
31#include <cassert>
32#include <deque>
33#define WIN32_LEAN_AND_MEAN
34#include <enet/enet.h>
35#include <boost/thread.hpp>
36#include <boost/thread/mutex.hpp>
37#include <boost/date_time.hpp>
38
39#include "packet/Packet.h"
40#include <util/Sleep.h>
41
42namespace orxonox
43{
44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
46
47  Connection::Connection(uint32_t firstPeerID):
48    host_(nullptr), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
49  {
50    enet_initialize();
51    atexit(enet_deinitialize);
52    this->incomingEventsMutex_ = new boost::mutex;
53    this->outgoingEventsMutex_ = new boost::mutex;
54//     this->overallMutex_ = new boost::mutex;
55  }
56
57  Connection::~Connection()
58  {
59    delete this->incomingEventsMutex_;
60    delete this->outgoingEventsMutex_;
61  }
62
63  void Connection::startCommunicationThread()
64  {
65    this->bCommunicationThreadRunning_ = true;
66    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
67  }
68 
69  void Connection::stopCommunicationThread()
70  {
71    this->bCommunicationThreadRunning_ = false;
72    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
73    {
74      // force thread to stop
75      this->communicationThread_->interrupt();
76    }
77    delete this->communicationThread_;
78  }
79
80  void Connection::disconnectPeer(uint32_t peerID)
81  {
82//     this->overallMutex_->lock();
83    outgoingEvent outEvent = { peerID, OutgoingEventType::disconnectPeer, nullptr, 0 };
84   
85    this->outgoingEventsMutex_->lock();
86    this->outgoingEvents_.push_back(outEvent);
87    this->outgoingEventsMutex_->unlock();
88//     this->overallMutex_->unlock();
89  }
90 
91  void Connection::disconnectPeers()
92  {
93    outgoingEvent outEvent = { 0, OutgoingEventType::disconnectPeers, nullptr, 0 };
94   
95    this->outgoingEventsMutex_->lock();
96    this->outgoingEvents_.push_back(outEvent);
97    this->outgoingEventsMutex_->unlock();
98  }
99
100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
101  {
102//     this->overallMutex_->lock();
103    outgoingEvent outEvent = { peerID, OutgoingEventType::sendPacket, packet, channelID };
104   
105    this->outgoingEventsMutex_->lock();
106    this->outgoingEvents_.push_back(outEvent);
107    this->outgoingEventsMutex_->unlock();
108//     this->overallMutex_->unlock();
109  }
110 
111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
112  {
113//     this->overallMutex_->lock();
114    outgoingEvent outEvent = { 0, OutgoingEventType::broadcastPacket, packet, channelID };
115   
116    this->outgoingEventsMutex_->lock();
117    this->outgoingEvents_.push_back(outEvent);
118    this->outgoingEventsMutex_->unlock();
119//     this->overallMutex_->unlock();
120  }
121
122 
123  void Connection::communicationThread()
124  {
125    ENetEvent event;
126   
127//     this->overallMutex_->lock();
128    while( bCommunicationThreadRunning_ )
129    {
130      // Receive all pending incoming Events (such as packets, connects and disconnects)
131      while( enet_host_check_events( this->host_, &event ) > 0 )
132      {
133        processIncomingEvent(event);
134      }
135     
136//       this->overallMutex_->unlock();
137      msleep(1);
138//       this->overallMutex_->lock();
139     
140      // Send all waiting outgoing packets
141      this->outgoingEventsMutex_->lock();
142      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
143      this->outgoingEventsMutex_->unlock();
144      while( outgoingEventsCount > 0 )
145      {
146//         orxout(verbose, context::network) << "outgoing event" << endl;
147        this->outgoingEventsMutex_->lock();
148        outgoingEvent outEvent = this->outgoingEvents_.front();
149        this->outgoingEvents_.pop_front();
150        this->outgoingEventsMutex_->unlock();
151       
152        processOutgoingEvent(outEvent);
153       
154        this->outgoingEventsMutex_->lock();
155        outgoingEventsCount = this->outgoingEvents_.size();
156        this->outgoingEventsMutex_->unlock();
157      }
158     
159      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
161      {
162        processIncomingEvent(event);
163      }
164    }
165//     this->overallMutex_->unlock();
166  }
167 
168  void Connection::processIncomingEvent(ENetEvent& event)
169  {
170    incomingEvent inEvent;
171    // preprocess event
172    switch( event.type )
173    {
174      case ENET_EVENT_TYPE_CONNECT:
175        inEvent = preprocessConnectEvent(event);
176        break;
177      case ENET_EVENT_TYPE_RECEIVE:
178        inEvent = preprocessReceiveEvent(event);
179        break;
180      case ENET_EVENT_TYPE_DISCONNECT:
181        inEvent = preprocessDisconnectEvent(event);
182        break;
183      case ENET_EVENT_TYPE_NONE:
184      default:
185        return;
186    }
187   
188    // pushing event to queue
189    this->incomingEventsMutex_->lock();
190    this->incomingEvents_.push_back(inEvent);
191    this->incomingEventsMutex_->unlock();
192  }
193 
194  void Connection::processOutgoingEvent(outgoingEvent& event)
195  {
196    ENetPeer* peer;
197    switch( event.type )
198    {
199      case OutgoingEventType::sendPacket:
200        // check whether the peer is still/already in the peer list
201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
202        {
203          peer = this->peerMap_[event.peerID];
204          enet_peer_send( peer, event.channelID, event.packet );
205        }
206        else
207        {
208          // peer probably already disconnected so just discard packet
209          assert(event.peerID<this->nextPeerID_);
210          enet_packet_destroy(event.packet);
211        }
212        break;
213      case OutgoingEventType::disconnectPeer:
214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
215        {
216          peer = this->peerMap_[event.peerID];
217          enet_peer_disconnect(peer, 0);
218        }
219        else
220        {
221          // peer probably already disconnected so just discard disconnect event
222          assert(event.peerID<this->nextPeerID_);
223        }
224        break;
225      case OutgoingEventType::disconnectPeers:
226        disconnectPeersInternal();
227        break;
228      case OutgoingEventType::broadcastPacket:
229        enet_host_broadcast( this->host_, event.channelID, event.packet );
230        break;
231      default:
232        assert(0);
233    }
234  }
235
236
237  void Connection::disconnectPeersInternal()
238  {
239    for( const auto& mapEntry : this->peerMap_ )
240    {
241      enet_peer_disconnect(mapEntry.second, 0);
242    }
243    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
244    uint32_t i = 0;
245    while( this->peerMap_.size() && i++ < iterations )
246    {
247      ENetEvent event;
248      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
249      {
250        processIncomingEvent(event);
251      }
252    }
253  }
254
255  void Connection::processQueue()
256  {
257    incomingEvent inEvent;
258
259    this->incomingEventsMutex_->lock();
260    uint32_t incomingEventsCount = this->incomingEvents_.size();
261    this->incomingEventsMutex_->unlock();
262    while( incomingEventsCount > 0 )
263    {
264      // pop event from queue
265      this->incomingEventsMutex_->lock();
266      inEvent = this->incomingEvents_.front();
267      this->incomingEvents_.pop_front();
268      this->incomingEventsMutex_->unlock();
269     
270      // process event
271      switch( inEvent.type )
272      {
273        case IncomingEventType::peerConnect:
274          addPeer(inEvent.peerID);
275          break;
276        case IncomingEventType::peerDisconnect:
277          removePeer(inEvent.peerID);
278          break;
279        case IncomingEventType::receivePacket:
280          processPacket(inEvent.packet);
281          break;
282        default:
283          break;
284      }
285     
286      // check whether there are still events in the queue
287      this->incomingEventsMutex_->lock();
288      incomingEventsCount = this->incomingEvents_.size();
289      this->incomingEventsMutex_->unlock();
290    }
291  }
292 
293  void Connection::waitOutgoingQueue()
294  {
295    uint32_t outgoingEventsCount;
296    this->outgoingEventsMutex_->lock();
297    outgoingEventsCount = this->outgoingEvents_.size();
298    this->outgoingEventsMutex_->unlock();
299    while( outgoingEventsCount )
300    {
301      msleep(1);
302      this->outgoingEventsMutex_->lock();
303      outgoingEventsCount = this->outgoingEvents_.size();
304      this->outgoingEventsMutex_->unlock();
305    }
306  }
307
308
309  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
310  {
311    // make sure this peer doesn't exist
312    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
313    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
314   
315    // give peer a new id and increase peerID for next peer
316    uint32_t peerID = this->nextPeerID_;
317    ++this->nextPeerID_;
318   
319    // add peer/peerID into peerMap_ and peerIDMap_
320    this->peerMap_[peerID] = event.peer;
321    this->peerIDMap_[event.peer] = peerID;
322   
323    // create new peerEvent and return it
324    incomingEvent inEvent = { peerID, IncomingEventType::peerConnect, nullptr };
325    return inEvent;
326  }
327 
328  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
329  {
330    // assert that the peer exists and get peerID
331    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
332    uint32_t peerID = this->peerIDMap_[event.peer];
333   
334    // remove peer/peerID from maps
335    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
336    this->peerMap_.erase(this->peerMap_.find(peerID));
337   
338    // create new peerEvent and return it
339    incomingEvent inEvent = { peerID, IncomingEventType::peerDisconnect, nullptr };
340    return inEvent;
341  }
342 
343  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
344  {
345    // assert that the peer exists and get peerID
346    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
347    uint32_t peerID = this->peerIDMap_[event.peer];
348   
349    // create new Packet from ENetPacket
350    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
351   
352    // create new peerEvent and return it
353    incomingEvent inEvent = { peerID, IncomingEventType::receivePacket, p };
354    return inEvent;
355  }
356
357 
358  void Connection::enableCompression()
359  {
360    enet_host_compress_with_range_coder( this->host_ );
361  }
362
363
364}
Note: See TracBrowser for help on using the repository browser.