Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/network/src/lib/network/network_stream.cc @ 7565

Last change on this file since 7565 was 7565, checked in by rennerc, 18 years ago

reimplemented NetworkStream

File size: 16.3 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11### File Specific:
12   main-programmer: claudio
13   co-programmer:
14*/
15
16
17/* this is for debug output. It just says, that all calls to PRINT() belong to the DEBUG_MODULE_NETWORK module
18   For more information refere to https://www.orxonox.net/cgi-bin/trac.cgi/wiki/DebugOutput
19*/
20#define DEBUG_MODULE_NETWORK
21
22
23#include "base_object.h"
24#include "network_protocol.h"
25#include "udp_socket.h"
26#include "udp_server_socket.h"
27#include "connection_monitor.h"
28#include "synchronizeable.h"
29#include "network_game_manager.h"
30#include "shared_network_data.h"
31
32#include "lib/util/loading/factory.h"
33
34#include "debug.h"
35#include "class_list.h"
36#include <algorithm>
37
38/* include your own header */
39#include "network_stream.h"
40
41/* probably unnecessary */
42using namespace std;
43
44
45#define PACKAGE_SIZE  256
46
47
48NetworkStream::NetworkStream()
49    : DataStream()
50{
51  this->init();
52  /* initialize the references */
53  this->type = NET_CLIENT;
54  this->networkProtocol = new NetworkProtocol();
55  this->connectionMonitor = new ConnectionMonitor();
56}
57
58
59NetworkStream::NetworkStream( std::string host, int port )
60{
61  this->type = NET_CLIENT;
62  this->init();
63  this->peers[0].socket = new UdpSocket( host, port );
64  this->networkProtocol = new NetworkProtocol();
65  this->connectionMonitor = new ConnectionMonitor();
66  this->maxConnections = MAX_CONNECTIONS;
67}
68
69
70NetworkStream::NetworkStream( int port )
71{
72  this->type = NET_SERVER;
73  this->init();
74  this->serverSocket = new UdpServerSocket(port);
75  this->networkProtocol = new NetworkProtocol();
76  this->connectionMonitor = new ConnectionMonitor();
77  this->bActive = true;
78}
79
80
81void NetworkStream::init()
82{
83  /* set the class id for the base object */
84  this->setClassID(CL_NETWORK_STREAM, "NetworkStream");
85  this->bActive = false;
86  this->serverSocket = NULL;
87  this->networkGameManager = NULL;
88  myHostId = 0;
89  currentState = 0;
90}
91
92
93NetworkStream::~NetworkStream()
94{
95  if ( this->serverSocket )
96  {
97    serverSocket->close();
98    delete serverSocket;
99  }
100
101  for ( PeerList::iterator i = peers.begin(); i!=peers.end(); i++)
102  {
103    if ( i->second.socket )
104    {
105      i->second.socket->disconnectServer();
106      delete i->second.socket;
107      i->second.socket = NULL;
108    }
109   
110    if ( i->second.handshake )
111    {
112      delete i->second.handshake;
113      i->second.handshake = NULL;
114    }
115  }
116 
117  if ( serverSocket )
118  {
119    delete serverSocket;
120    serverSocket = NULL;
121  }
122
123  delete connectionMonitor;
124  delete networkProtocol;
125}
126
127
128void NetworkStream::createNetworkGameManager()
129{
130  this->networkGameManager = NetworkGameManager::getInstance();
131  // setUniqueID( maxCon+2 ) because we need one id for every handshake
132  // and one for handshake to reject client maxCon+1
133  this->networkGameManager->setUniqueID( this->maxConnections + 2 );
134
135}
136
137
138void NetworkStream::startHandshake()
139{
140  Handshake* hs = new Handshake(false);
141  hs->setUniqueID( 0 );
142  assert( peers[0].handshake == NULL );
143  peers[0].handshake = hs;
144  //this->connectSynchronizeable(*hs);
145  PRINTF(0)("NetworkStream: %s\n", hs->getName());
146}
147
148
149void NetworkStream::connectSynchronizeable(Synchronizeable& sync)
150{
151  this->synchronizeables.push_back(&sync);
152  sync.setNetworkStream( this );
153
154  this->bActive = true;
155}
156
157
158void NetworkStream::disconnectSynchronizeable(Synchronizeable& sync)
159{
160  // removing the Synchronizeable from the List.
161  std::list<Synchronizeable*>::iterator disconnectSynchro = std::find(this->synchronizeables.begin(), this->synchronizeables.end(), &sync);
162  if (disconnectSynchro != this->synchronizeables.end())
163    this->synchronizeables.erase(disconnectSynchro);
164
165  this->bActive = false;
166}
167
168
169void NetworkStream::processData()
170{
171  currentState++;
172 
173  if ( this->type == NET_SERVER )
174    this->updateConnectionList();
175  else
176  {
177    if ( peers[0].socket && !peers[0].socket->isOk() )
178    {
179      PRINTF(1)("lost connection to server\n");
180
181      peers[0].socket->disconnectServer();
182      delete peers[0].socket;
183      peers[0].socket = NULL;
184
185      if ( peers[0].handshake )
186        delete peers[0].handshake;
187      peers[0].handshake = NULL;
188    }
189  }
190
191
192
193
194  /* DOWNSTREAM */
195#if 0
196
197
198  int dataLength;
199  int reciever;
200  Header header;
201  int counter;
202
203  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
204  {
205    counter = 0;
206
207    if ( (*it)!=NULL && (*it)->beSynchronized() /*&& (*it)->getOwner() == myHostId*/ )
208    {
209      do {
210        counter++;
211
212        reciever = 0;
213#warning fix this
214dataLength = 0;
215//TODO fix
216        //dataLength = (*it)->readBytes(downBuffer, DATA_STREAM_BUFFER_SIZE, &reciever);
217
218        if ( dataLength<=0 ){
219          reciever = 0;
220          continue;
221        }
222
223        dataLength = networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE, static_cast<const Synchronizeable&>(*(*it)));
224
225        Header* header = (Header*)downBuffer;
226        if ( header->synchronizeableID < this->maxConnections+2 )
227        {
228          //if ( !isServer() ) PRINTF(0)("RESET UNIQUEID FROM %d TO 0 maxCon=%d\n", header->synchronizeableID, this->maxConnections);
229          header->synchronizeableID = 0;
230        }
231        else
232        {
233          //if ( !isServer() ) PRINTF(0)("UNIQUEID=%d\n", header->synchronizeableID);
234        }
235
236        if ( dataLength<=0 )
237          continue;
238
239        if ( reciever!=0 )
240        {
241          if ( reciever < 0)
242          {
243            for ( int i = 0; i<networkSockets.size(); i++)
244            {
245              if ( i!=abs(reciever) && networkSockets[i] != NULL )
246              {
247                PRINTF(0)("write %d bytes to socket %d uniqueid %d reciever %d\n", dataLength, i, (*it)->getUniqueID(), reciever);
248                networkSockets[i]->writePacket(downBuffer, dataLength);
249              }
250            }
251          }
252          else
253          {
254            if ( networkSockets[reciever] != NULL )
255            {
256              PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever);
257              networkSockets[reciever]->writePacket(downBuffer, dataLength);
258            }
259            else
260            {
261              PRINTF(1)("networkSockets[reciever] == NULL\n");
262            }
263          }
264        }
265        else
266        {
267          for ( int i = 0; i<networkSockets.size(); i++)
268          {
269            if ( networkSockets[i] != NULL )
270            {
271              PRINTF(5)("write %d bytes to socket %d\n", dataLength, i);
272              networkSockets[i]->writePacket(downBuffer, dataLength);
273            }
274          }
275        }
276
277      } while( reciever!=0 );
278    }
279  }
280
281  /* UPSTREAM */
282
283  for ( int i = 0; i<networkSockets.size(); i++)
284  {
285    if ( networkSockets[i] )
286    {
287      do {
288        dataLength = networkSockets[i]->readPacket(upBuffer, DATA_STREAM_BUFFER_SIZE);
289
290        if ( dataLength<=0 )
291          continue;
292
293        header = networkProtocol->extractHeader(upBuffer, dataLength);
294        dataLength -= sizeof(header);
295
296        PRINTF(5)("read %d bytes from socket uniqueID = %d\n", dataLength, header.synchronizeableID);
297
298        if ( dataLength != header.length )
299        {
300          PRINTF(1)("packetsize in header and real packetsize do not match! %d:%d\n", dataLength, header.length);
301          continue;
302        }
303
304        if ( header.synchronizeableID == 0 )
305        {
306          header.synchronizeableID = i;
307        }
308
309        for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
310        {
311#warning fix this
312
313          if ( *it && (*it)->getUniqueID()==header.synchronizeableID )
314          {
315            if ( (*it)->writeBytes(upBuffer+sizeof(header), dataLength, i) != header.length )
316            {
317              PRINTF(1)("%s did not read all the data id = %d!\n", (*it)->getClassName(), (*it)->getUniqueID());
318              break;
319            }
320            continue;
321          }
322
323        }
324
325      } while ( dataLength>0 );
326    }
327
328  }
329#endif
330}
331
332void NetworkStream::updateConnectionList( )
333{
334  //check for new connections
335
336  NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket();
337
338  if ( tempNetworkSocket )
339  {
340    int clientId;
341    if ( freeSocketSlots.size() >0 )
342    {
343      clientId = freeSocketSlots.back();
344      freeSocketSlots.pop_back();
345      peers[clientId].socket = tempNetworkSocket;
346      peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID());
347      peers[clientId].handshake->setUniqueID(clientId);
348      peers[clientId].userId = clientId;
349    } else
350    {
351      clientId = 0;
352     
353      for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
354        if ( it->first >= clientId )
355          clientId = it->first + 1;
356     
357      peers[clientId].socket = tempNetworkSocket;
358      peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID());
359      peers[clientId].handshake->setUniqueID(clientId);
360      peers[clientId].userId = clientId;
361    }
362
363    if ( clientId > this->maxConnections )
364    {
365      peers[clientId].handshake->doReject( "too many connections" );
366      PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId);
367    }
368    else
369
370    PRINTF(0)("New Client: %d\n", clientId);
371
372    //this->connectSynchronizeable(*handshakes[clientId]);
373  }
374
375
376  //check if connections are ok else remove them
377  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
378  {
379    if ( it->second.socket && !it->second.socket->isOk() )
380    {
381      PRINTF(0)("Client is gone: %d\n", it->second.userId);
382
383      it->second.socket->disconnectServer();
384      delete it->second.socket;
385      it->second.socket = NULL;
386
387      if ( it->second.handshake )
388        delete it->second.handshake;
389      it->second.handshake = NULL;
390
391
392      NetworkGameManager::getInstance()->signalLeftPlayer(it->second.userId);
393
394      freeSocketSlots.push_back( it->second.userId );
395
396    }
397  }
398
399
400}
401
402void NetworkStream::debug()
403{
404  if( this->isServer())
405    PRINT(0)(" Host ist Server with ID: %i\n", this->myHostId);
406  else
407    PRINT(0)(" Host ist Client with ID: %i\n", this->myHostId);
408
409  PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size());
410  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
411  {
412    if( (*it)->beSynchronized() == true)
413      PRINT(0)("  Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassName(), (*it)->getName(),
414               (*it)->getUniqueID(), (*it)->beSynchronized());
415  }
416  PRINT(0)(" Maximal Connections: %i\n", this->maxConnections);
417
418}
419
420
421int NetworkStream::getSyncCount()
422{
423  int n = 0;
424  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
425    if( (*it)->beSynchronized() == true)
426      ++n;
427
428  //return synchronizeables.size();
429  return n;
430}
431
432/**
433 * check if handshakes completed
434 */
435void NetworkStream::handleHandshakes( )
436{
437  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
438  {
439    if ( it->second.handshake )
440    {
441      if ( it->second.handshake->completed() )
442      {
443        if ( it->second.handshake->ok() )
444        {
445          if ( type != NET_SERVER )
446          {
447            SharedNetworkData::getInstance()->setHostID( it->second.handshake->getHostId() );
448            myHostId = SharedNetworkData::getInstance()->getHostID();
449
450            this->networkGameManager = NetworkGameManager::getInstance();
451            this->networkGameManager->setUniqueID( it->second.handshake->getNetworkGameManagerId() );
452          }
453
454          PRINT(0)("handshake finished id=%d\n", it->second.handshake->getNetworkGameManagerId());
455
456
457          delete it->second.handshake;
458          it->second.handshake = NULL;
459        }
460        else
461        {
462          PRINT(1)("handshake failed!\n");
463          it->second.socket->disconnectServer();
464        }
465      }
466    }
467  }
468}
469
470/**
471 * handle upstream network traffic
472 */
473void NetworkStream::handleUpstream( )
474{
475  byte buf[UDP_PACKET_SIZE];
476  int offset;
477  int n;
478 
479  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
480  {
481    offset = INTSIZE; //make already space for length
482   
483    if ( peer->second.socket )
484      continue;
485   
486    n = Converter::intToByteArray( currentState, buf + offset, UDP_PACKET_SIZE - offset );
487    assert( n == INTSIZE );
488    offset += n;
489   
490    n = Converter::intToByteArray( peer->second.lastAckedState, buf + offset, UDP_PACKET_SIZE - offset );
491    assert( n == INTSIZE );
492    offset += n;
493   
494    n = Converter::intToByteArray( peer->second.lastRecvedState, buf + offset, UDP_PACKET_SIZE - offset );
495    assert( n == INTSIZE );
496    offset += n;
497   
498    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
499    {
500      Synchronizeable & sync = **it;
501     
502      assert( offset + INTSIZE <= UDP_PACKET_SIZE );
503     
504      n = Converter::intToByteArray( sync.getUniqueID(), buf + offset, UDP_PACKET_SIZE - offset );
505      assert( n == INTSIZE );
506      offset += n;
507     
508      offset += sync.getStateDiff( peer->second.userId, buf + offset, UDP_PACKET_SIZE-offset, currentState, peer->second.lastAckedState, 0 );
509    }
510   
511    assert( Converter::intToByteArray( offset, buf, INTSIZE ) == INTSIZE );
512   
513    assert( peer->second.socket->writePacket( buf, offset ) );
514  }
515}
516
517/**
518 * handle downstream network traffic
519 */
520void NetworkStream::handleDownstream( )
521{
522  byte buf[UDP_PACKET_SIZE];
523  int offset = 0;
524 
525  int length = 0;
526  int packetLength = 0;
527  int uniqueId = 0;
528  int state = 0;
529  int ackedState = 0;
530  int fromState = 0;
531 
532  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
533  {
534    packetLength = peer->second.socket->readPacket( buf, UDP_PACKET_SIZE );
535   
536    assert( Converter::byteArrayToInt( buf, &length ) == INTSIZE );
537    assert( Converter::byteArrayToInt( buf + INTSIZE, &state ) == INTSIZE );
538    assert( Converter::byteArrayToInt( buf + 2*INTSIZE, &fromState ) == INTSIZE );
539    assert( Converter::byteArrayToInt( buf + 3*INTSIZE, &ackedState ) == INTSIZE );
540   
541    //if this is an old state drop it
542    if ( state <= peer->second.lastRecvedState )
543      continue;
544   
545    if ( packetLength != length )
546    {
547      PRINTF(1)("real packet length (%d) and transmitted packet length (%d) do not match!\n", packetLength, length);
548      peer->second.socket->disconnectServer();
549      continue;
550    }
551   
552    offset = 4*INTSIZE;
553   
554    while ( offset < length )
555    {
556      assert( Converter::byteArrayToInt( buf + offset, &uniqueId ) == INTSIZE );
557      offset += INTSIZE;
558     
559      Synchronizeable * sync = NULL;
560     
561      for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
562      {
563        if ( (*it)->getUniqueID() == uniqueId )
564        {
565          sync = *it;
566          break;
567        }
568      }
569     
570      if ( sync == NULL )
571      {
572        //TODO dont accept new object from all peers (probably only servers)
573        int leafClassId;
574        if ( INTSIZE > length - offset )
575          break;
576       
577        Converter::byteArrayToInt( buf + offset, &leafClassId );
578       
579        BaseObject * b;
580        /* These are some small exeptions in creation: Not all objects can/should be created via Factory */
581        /* Exception 1: NullParent */
582        if( leafClassId == CL_NULL_PARENT)
583        {
584          PRINTF(1)("Can not create Class with ID %x!", (int)leafClassId);
585          break;
586        }
587        else
588          b = Factory::fabricate( (ClassID)leafClassId );
589
590        if ( !b )
591        {
592          PRINTF(1)("Could not fabricate Object with classID %x\n", leafClassId);
593          break;
594        }
595       
596        if ( b->isA(CL_SYNCHRONIZEABLE) )
597        {
598          sync = dynamic_cast<Synchronizeable*>(b);
599          sync->setUniqueID( uniqueId );
600          sync->setSynchronized(true);
601 
602          PRINTF(0)("Fabricated %s with id %d\n", sync->getClassName(), sync->getUniqueID());
603        }
604        else
605        {
606          PRINTF(1)("Class with ID %x is not a synchronizeable!", (int)leafClassId);
607          delete b;
608          break;
609        }
610      }
611     
612      offset += sync->setStateDiff( peer->second.userId, buf+offset, length-offset, state, fromState );
613    }
614   
615    if ( offset != length )
616    {
617      peer->second.socket->disconnectServer();
618    }
619   
620    peer->second.lastAckedState = ackedState;
621  }
622}
623
624
625
626
627
628
Note: See TracBrowser for help on using the repository browser.