Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: code/trunk/src/libraries/network/Connection.cc @ 12038

Last change on this file since 12038 was 11071, checked in by landauf, 10 years ago

merged branch cpp11_v3 back to trunk

  • Property svn:eol-style set to native
File size: 10.9 KB
Line 
1/*
2 *   ORXONOX - the hottest 3D action shooter ever to exist
3 *                    > www.orxonox.net <
4 *
5 *
6 *   License notice:
7 *
8 *   This program is free software; you can redistribute it and/or
9 *   modify it under the terms of the GNU General Public License
10 *   as published by the Free Software Foundation; either version 2
11 *   of the License, or (at your option) any later version.
12 *
13 *   This program is distributed in the hope that it will be useful,
14 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 *   GNU General Public License for more details.
17 *
18 *   You should have received a copy of the GNU General Public License
19 *   along with this program; if not, write to the Free Software
20 *   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
21 *
22 *   Author:
23 *      Oliver Scheuss
24 *   Co-authors:
25 *      ...
26 *
27 */
28
29#include "Connection.h"
30
31#include <cassert>
32#include <deque>
33#define WIN32_LEAN_AND_MEAN
34#include <enet/enet.h>
35#include <boost/thread.hpp>
36#include <boost/thread/mutex.hpp>
37#include <boost/date_time.hpp>
38
39#include "packet/Packet.h"
40#include <util/Sleep.h>
41
42namespace orxonox
43{
44  const boost::posix_time::millisec NETWORK_COMMUNICATION_THREAD_WAIT_TIME(200);
45  const unsigned int                NETWORK_DISCONNECT_TIMEOUT = 500;
46
47  Connection::Connection(uint32_t firstPeerID):
48    host_(nullptr), bCommunicationThreadRunning_(false), nextPeerID_(firstPeerID)
49  {
50    enet_initialize();
51    atexit(enet_deinitialize);
52    this->incomingEventsMutex_ = new boost::mutex;
53    this->outgoingEventsMutex_ = new boost::mutex;
54//     this->overallMutex_ = new boost::mutex;
55  }
56
57  Connection::~Connection()
58  {
59    delete this->incomingEventsMutex_;
60    delete this->outgoingEventsMutex_;
61  }
62
63  void Connection::startCommunicationThread()
64  {
65    this->bCommunicationThreadRunning_ = true;
66    this->communicationThread_ = new boost::thread(&Connection::communicationThread, this);
67  }
68 
69  void Connection::stopCommunicationThread()
70  {
71    this->bCommunicationThreadRunning_ = false;
72    if( !this->communicationThread_->timed_join(NETWORK_COMMUNICATION_THREAD_WAIT_TIME) )
73    {
74      // force thread to stop
75      this->communicationThread_->interrupt();
76    }
77    delete this->communicationThread_;
78  }
79
80  void Connection::disconnectPeer(uint32_t peerID)
81  {
82//     this->overallMutex_->lock();
83    outgoingEvent outEvent = { peerID, OutgoingEventType::disconnectPeer, nullptr, 0 };
84   
85    this->outgoingEventsMutex_->lock();
86    this->outgoingEvents_.push_back(outEvent);
87    this->outgoingEventsMutex_->unlock();
88//     this->overallMutex_->unlock();
89  }
90 
91  void Connection::disconnectPeers()
92  {
93    outgoingEvent outEvent = { 0, OutgoingEventType::disconnectPeers, nullptr, 0 };
94   
95    this->outgoingEventsMutex_->lock();
96    this->outgoingEvents_.push_back(outEvent);
97    this->outgoingEventsMutex_->unlock();
98  }
99
100  void Connection::addPacket(ENetPacket* packet, uint32_t peerID, uint8_t channelID)
101  {
102//     this->overallMutex_->lock();
103    outgoingEvent outEvent = { peerID, OutgoingEventType::sendPacket, packet, channelID };
104   
105    this->outgoingEventsMutex_->lock();
106    this->outgoingEvents_.push_back(outEvent);
107    this->outgoingEventsMutex_->unlock();
108//     this->overallMutex_->unlock();
109  }
110 
111  void Connection::broadcastPacket(ENetPacket* packet, uint8_t channelID)
112  {
113//     this->overallMutex_->lock();
114    outgoingEvent outEvent = { 0, OutgoingEventType::broadcastPacket, packet, channelID };
115   
116    this->outgoingEventsMutex_->lock();
117    this->outgoingEvents_.push_back(outEvent);
118    this->outgoingEventsMutex_->unlock();
119//     this->overallMutex_->unlock();
120  }
121
122 
123  void Connection::communicationThread()
124  {
125    ENetEvent event;
126   
127//     this->overallMutex_->lock();
128    while( bCommunicationThreadRunning_ )
129    {
130      // Receive all pending incoming Events (such as packets, connects and disconnects)
131      while( enet_host_check_events( this->host_, &event ) > 0 )
132      {
133        processIncomingEvent(event);
134      }
135     
136//       this->overallMutex_->unlock();
137      msleep(1);
138//       this->overallMutex_->lock();
139     
140      // Send all waiting outgoing packets
141      this->outgoingEventsMutex_->lock();
142      uint32_t outgoingEventsCount = this->outgoingEvents_.size();
143      this->outgoingEventsMutex_->unlock();
144      while( outgoingEventsCount > 0 )
145      {
146//         orxout(verbose, context::network) << "outgoing event" << endl;
147        this->outgoingEventsMutex_->lock();
148        outgoingEvent outEvent = this->outgoingEvents_.front();
149        this->outgoingEvents_.pop_front();
150        this->outgoingEventsMutex_->unlock();
151       
152        processOutgoingEvent(outEvent);
153       
154        this->outgoingEventsMutex_->lock();
155        outgoingEventsCount = this->outgoingEvents_.size();
156        this->outgoingEventsMutex_->unlock();
157      }
158     
159      // Wait for incoming events (at most NETWORK_WAIT_TIMEOUT ms)
160      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
161      {
162        processIncomingEvent(event);
163      }
164    }
165//     this->overallMutex_->unlock();
166  }
167 
168  void Connection::processIncomingEvent(ENetEvent& event)
169  {
170    incomingEvent inEvent;
171    // preprocess event
172    switch( event.type )
173    {
174      case ENET_EVENT_TYPE_CONNECT:
175        inEvent = preprocessConnectEvent(event);
176        break;
177      case ENET_EVENT_TYPE_RECEIVE:
178        inEvent = preprocessReceiveEvent(event);
179        break;
180      case ENET_EVENT_TYPE_DISCONNECT:
181        inEvent = preprocessDisconnectEvent(event);
182        break;
183      case ENET_EVENT_TYPE_NONE:
184      default:
185        return;
186    }
187   
188    // pushing event to queue
189    this->incomingEventsMutex_->lock();
190    this->incomingEvents_.push_back(inEvent);
191    this->incomingEventsMutex_->unlock();
192  }
193 
194  void Connection::processOutgoingEvent(outgoingEvent& event)
195  {
196    ENetPeer* peer;
197    switch( event.type )
198    {
199      case OutgoingEventType::sendPacket:
200        // check whether the peer is still/already in the peer list
201        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
202        {
203          peer = this->peerMap_[event.peerID];
204          enet_peer_send( peer, event.channelID, event.packet );
205        }
206        else
207        {
208          // peer probably already disconnected so just discard packet
209          assert(event.peerID<this->nextPeerID_);
210          enet_packet_destroy(event.packet);
211        }
212        break;
213      case OutgoingEventType::disconnectPeer:
214        if( this->peerMap_.find(event.peerID) != this->peerMap_.end() )
215        {
216          peer = this->peerMap_[event.peerID];
217          enet_peer_disconnect(peer, 0);
218        }
219        else
220        {
221          // peer probably already disconnected so just discard disconnect event
222          assert(event.peerID<this->nextPeerID_);
223        }
224        break;
225      case OutgoingEventType::disconnectPeers:
226        disconnectPeersInternal();
227        break;
228      case OutgoingEventType::broadcastPacket:
229        enet_host_broadcast( this->host_, event.channelID, event.packet );
230        break;
231      default:
232        assert(0);
233    }
234  }
235
236
237  void Connection::disconnectPeersInternal()
238  {
239    for( const auto& mapEntry : this->peerMap_ )
240    {
241      enet_peer_disconnect(mapEntry.second, 0);
242    }
243    uint32_t iterations = NETWORK_DISCONNECT_TIMEOUT/NETWORK_WAIT_TIMEOUT;
244    uint32_t i = 0;
245    while( this->peerMap_.size() && i++ < iterations )
246    {
247      ENetEvent event;
248      if( enet_host_service( this->host_, &event, NETWORK_WAIT_TIMEOUT ) > 0 )
249      {
250        processIncomingEvent(event);
251      }
252    }
253  }
254
255  void Connection::processQueue()
256  {
257    incomingEvent inEvent;
258
259    this->incomingEventsMutex_->lock();
260    uint32_t incomingEventsCount = this->incomingEvents_.size();
261    this->incomingEventsMutex_->unlock();
262    while( incomingEventsCount > 0 )
263    {
264      // pop event from queue
265      this->incomingEventsMutex_->lock();
266      inEvent = this->incomingEvents_.front();
267      this->incomingEvents_.pop_front();
268      this->incomingEventsMutex_->unlock();
269     
270      // process event
271      switch( inEvent.type )
272      {
273        case IncomingEventType::peerConnect:
274          addPeer(inEvent.peerID);
275          break;
276        case IncomingEventType::peerDisconnect:
277          removePeer(inEvent.peerID);
278          break;
279        case IncomingEventType::receivePacket:
280          processPacket(inEvent.packet);
281          break;
282        default:
283          break;
284      }
285     
286      // check whether there are still events in the queue
287      this->incomingEventsMutex_->lock();
288      incomingEventsCount = this->incomingEvents_.size();
289      this->incomingEventsMutex_->unlock();
290    }
291  }
292 
293  void Connection::waitOutgoingQueue()
294  {
295    uint32_t outgoingEventsCount;
296    this->outgoingEventsMutex_->lock();
297    outgoingEventsCount = this->outgoingEvents_.size();
298    this->outgoingEventsMutex_->unlock();
299    while( outgoingEventsCount )
300    {
301      msleep(1);
302      this->outgoingEventsMutex_->lock();
303      outgoingEventsCount = this->outgoingEvents_.size();
304      this->outgoingEventsMutex_->unlock();
305    }
306  }
307
308
309  incomingEvent Connection::preprocessConnectEvent(ENetEvent& event)
310  {
311    // make sure this peer doesn't exist
312    assert( this->peerMap_.find(this->nextPeerID_) == this->peerMap_.end() );
313    assert( this->peerIDMap_.find(event.peer) == this->peerIDMap_.end() );
314   
315    // give peer a new id and increase peerID for next peer
316    uint32_t peerID = this->nextPeerID_;
317    ++this->nextPeerID_;
318   
319    // add peer/peerID into peerMap_ and peerIDMap_
320    this->peerMap_[peerID] = event.peer;
321    this->peerIDMap_[event.peer] = peerID;
322   
323    // create new peerEvent and return it
324    incomingEvent inEvent = { peerID, IncomingEventType::peerConnect, nullptr };
325    return inEvent;
326  }
327 
328  incomingEvent Connection::preprocessDisconnectEvent(ENetEvent& event)
329  {
330    // assert that the peer exists and get peerID
331    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
332    uint32_t peerID = this->peerIDMap_[event.peer];
333   
334    // remove peer/peerID from maps
335    this->peerIDMap_.erase(this->peerIDMap_.find(event.peer));
336    this->peerMap_.erase(this->peerMap_.find(peerID));
337   
338    // create new peerEvent and return it
339    incomingEvent inEvent = { peerID, IncomingEventType::peerDisconnect, nullptr };
340    return inEvent;
341  }
342 
343  incomingEvent Connection::preprocessReceiveEvent(ENetEvent& event)
344  {
345    // assert that the peer exists and get peerID
346    assert( this->peerIDMap_.find(event.peer) != this->peerIDMap_.end() );
347    uint32_t peerID = this->peerIDMap_[event.peer];
348   
349    // create new Packet from ENetPacket
350    packet::Packet* p = packet::Packet::createPacket(event.packet, peerID);
351   
352    // create new peerEvent and return it
353    incomingEvent inEvent = { peerID, IncomingEventType::receivePacket, p };
354    return inEvent;
355  }
356
357 
358  void Connection::enableCompression()
359  {
360    enet_host_compress_with_range_coder( this->host_ );
361  }
362
363
364}
Note: See TracBrowser for help on using the repository browser.