Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/network_stream.cc @ 9583

Last change on this file since 9583 was 9583, checked in by patrick, 18 years ago

integration of the networkmonitor into the shared network data, using the new getPeerByUserId

File size: 35.5 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11### File Specific:
12   main-programmer: Christoph Renner rennerc@ee.ethz.ch
13   co-programmer:   Patrick Boenzli  patrick@orxonox.ethz.ch
14
15     June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch)
16     July 2006: some code rearangement and integration of the proxy server mechanism (patrick@orxonox.ethz.ch)
17*/
18
19
20#define DEBUG_MODULE_NETWORK
21
22#include "proxy/proxy_control.h"
23
24#include "base_object.h"
25#include "network_protocol.h"
26#include "udp_socket.h"
27#include "udp_server_socket.h"
28#include "monitor/connection_monitor.h"
29#include "monitor/network_monitor.h"
30#include "synchronizeable.h"
31#include "ip.h"
32#include "network_game_manager.h"
33#include "shared_network_data.h"
34#include "message_manager.h"
35#include "preferences.h"
36#include "zip.h"
37
38#include "src/lib/util/loading/resource_manager.h"
39
40#include "network_log.h"
41
42#include "player_stats.h"
43
44#include "lib/util/loading/factory.h"
45
46#include "debug.h"
47#include "class_list.h"
48#include <algorithm>
49
50
51#include "network_stream.h"
52
53
54#include "converter.h"
55
56
57#define PACKAGE_SIZE  256
58
59
60/**
61 * empty constructor
62 */
63NetworkStream::NetworkStream()
64    : DataStream()
65{
66  this->init();
67  /* initialize the references */
68  this->pInfo->nodeType = NET_UNASSIGNED;
69}
70
71
72NetworkStream::NetworkStream( int nodeType)
73{
74  this->init();
75
76  this->pInfo->nodeType = nodeType;
77
78  switch( nodeType)
79  {
80    case NET_MASTER_SERVER:
81      // init the shared network data
82      SharedNetworkData::getInstance()->setHostID(NET_ID_MASTER_SERVER);
83      break;
84    case NET_PROXY_SERVER_ACTIVE:
85      // init the shared network data
86      SharedNetworkData::getInstance()->setHostID(NET_ID_PROXY_SERVER_01);
87      break;
88    case NET_PROXY_SERVER_PASSIVE:
89      // init the shared network data
90      SharedNetworkData::getInstance()->setHostID(NET_ID_PROXY_SERVER_01);
91      break;
92    case NET_CLIENT:
93      SharedNetworkData::getInstance()->setHostID(NET_ID_UNASSIGNED);
94      break;
95  }
96
97  SharedNetworkData::getInstance()->setDefaultSyncStream(this);
98
99  // get the local ip address
100  IPaddress ip;
101  SDLNet_ResolveHost( &ip, NULL, 0);
102  this->pInfo->ip = ip;
103}
104
105
106
107/**
108 * generic init functions
109 */
110void NetworkStream::init()
111{
112  /* set the class id for the base object */
113  this->setClassID(CL_NETWORK_STREAM, "NetworkStream");
114  this->clientSocket = NULL;
115  this->proxySocket = NULL;
116  this->networkGameManager = NULL;
117  this->networkMonitor = NULL;
118
119  this->pInfo = new PeerInfo();
120  this->pInfo->userId = NET_UID_UNASSIGNED;
121  this->pInfo->lastAckedState = 0;
122  this->pInfo->lastRecvedState = 0;
123
124  this->bRedirect = false;
125
126  this->currentState = 0;
127
128  remainingBytesToWriteToDict = Preferences::getInstance()->getInt( "compression", "writedict", 0 );
129
130  assert( Zip::getInstance()->loadDictionary( "testdict" ) >= 0 );
131  this->dictClient = Zip::getInstance()->loadDictionary( "dict2pl_client" );
132  assert( this->dictClient >= 0 );
133  this->dictServer = Zip::getInstance()->loadDictionary( "dict2p_server" );
134  assert( this->dictServer >= 0 );
135}
136
137
138/**
139 * deconstructor
140 */
141NetworkStream::~NetworkStream()
142{
143  if ( this->clientSocket )
144  {
145    clientSocket->close();
146    delete clientSocket;
147    clientSocket = NULL;
148  }
149  if ( this->proxySocket)
150  {
151    proxySocket->close();
152    delete proxySocket;
153    proxySocket = NULL;
154  }
155  for ( PeerList::iterator i = peers.begin(); i!=peers.end(); i++)
156  {
157    if ( i->second.socket )
158    {
159      i->second.socket->disconnectServer();
160      delete i->second.socket;
161      i->second.socket = NULL;
162    }
163
164    if ( i->second.handshake )
165    {
166      delete i->second.handshake;
167      i->second.handshake = NULL;
168    }
169
170    if ( i->second.connectionMonitor )
171    {
172      delete i->second.connectionMonitor;
173      i->second.connectionMonitor = NULL;
174    }
175  }
176  for ( SynchronizeableList::const_iterator it = getSyncBegin(); it != getSyncEnd(); it ++ )
177    (*it)->setNetworkStream( NULL );
178
179  if( this->pInfo)
180    delete this->pInfo;
181
182  if( this->networkMonitor)
183    delete this->networkMonitor;
184}
185
186
187/**
188 * establish a connection to a remote master server
189 * @param host: host name
190 * @param port: the port number
191 */
192void NetworkStream::connectToMasterServer(std::string host, int port)
193{
194  int node = NET_ID_MASTER_SERVER;
195  // this create the new node in the peers map
196  this->peers[node].socket = new UdpSocket( host, port );
197  this->peers[node].userId = NET_ID_MASTER_SERVER;
198
199  this->peers[node].nodeType = NET_MASTER_SERVER;
200  this->peers[node].connectionMonitor = new ConnectionMonitor( NET_ID_MASTER_SERVER );
201  this->peers[node].ip = this->peers[node].socket->getRemoteAddress();
202}
203
204
205/**
206 * establish a connection to a remote proxy server
207 * @param host: host name
208 * @param port: the port number
209 */
210void NetworkStream::connectToProxyServer(int proxyId, std::string host, int port)
211{
212  PRINTF(0)("connect to proxy %s, this is proxyId %i\n", host.c_str(), proxyId);
213
214  // this creates the new proxyId in the peers map
215  this->peers[proxyId].socket = new UdpSocket( host, port );
216  this->peers[proxyId].userId = proxyId;
217
218  this->peers[proxyId].nodeType = NET_PROXY_SERVER_ACTIVE;
219  this->peers[proxyId].connectionMonitor = new ConnectionMonitor( proxyId );
220  this->peers[proxyId].ip = this->peers[proxyId].socket->getRemoteAddress();
221}
222
223
224/**
225 * create a server
226 * @param port: interface port for all clients
227 */
228void NetworkStream::createServer(int clientPort, int proxyPort)
229{
230  PRINTF(0)(" Creating new Server: listening for clients on port %i and for proxies on port %i", clientPort, proxyPort);
231  this->clientSocket= new UdpServerSocket(clientPort);
232  this->proxySocket = new UdpServerSocket(proxyPort);
233}
234
235
236/**
237 * creates a new instance of the network game manager
238 */
239void NetworkStream::createNetworkGameManager()
240{
241  this->networkGameManager = NetworkGameManager::getInstance();
242
243  this->networkGameManager->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
244  MessageManager::getInstance()->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
245}
246
247
248/**
249 * starts the network handshake
250 * handsakes are always initialized from the client side first. this starts the handshake and therefore is only
251 * executed as client
252 * @param userId: start handshake for this user id (optional, default == 0)
253 */
254void NetworkStream::startHandshake(int userId)
255{
256  Handshake* hs = new Handshake(this->pInfo->nodeType);
257  // fake the unique id
258  hs->setUniqueID( NET_UID_HANDSHAKE );
259  assert( peers[userId].handshake == NULL );
260  peers[userId].handshake = hs;
261
262  // set the preferred nick name
263  hs->setPreferedNickName( Preferences::getInstance()->getString( "multiplayer", "nickname", "Player" ) );
264
265  PRINTF(0)("NetworkStream: Handshake created: %s\n", hs->getCName());
266}
267
268
269/**
270 * this functions connects a synchronizeable to the networkstream, therefore synchronizeing
271 * it all over the network and creating it on the other platforms (if and only if it is a
272 * server
273 * @param sync: the synchronizeable to add
274 */
275void NetworkStream::connectSynchronizeable(Synchronizeable& sync)
276{
277  this->synchronizeables.push_back(&sync);
278  sync.setNetworkStream( this );
279}
280
281
282/**
283 * removes the synchronizeable from the list of synchronized entities
284 * @param sync: the syncronizeable to remove
285 */
286void NetworkStream::disconnectSynchronizeable(Synchronizeable& sync)
287{
288  // removing the Synchronizeable from the List.
289  std::list<Synchronizeable*>::iterator disconnectSynchro = std::find(this->synchronizeables.begin(), this->synchronizeables.end(), &sync);
290  if (disconnectSynchro != this->synchronizeables.end())
291    this->synchronizeables.erase(disconnectSynchro);
292
293  oldSynchronizeables[sync.getUniqueID()] = SDL_GetTicks();
294}
295
296
297/**
298 * this is called to process data from the network socket to the synchronizeable and vice versa
299 */
300void NetworkStream::processData()
301{
302  // create the network monitor after all the init work and before there is any connection handlings
303  if( this->networkMonitor == NULL)
304  {
305    this->networkMonitor = new NetworkMonitor(this);
306    SharedNetworkData::getInstance()->setNetworkMonitor( this->networkMonitor);
307  }
308
309
310  int tick = SDL_GetTicks();
311
312  this->currentState++;
313  // there was a wrap around
314  if( this->currentState < 0)
315  {
316    PRINTF(1)("A wrap around in the state variable as occured. The server was running so long? Pls restart server or write a mail to the supporters!\n");
317  }
318
319  if ( SharedNetworkData::getInstance()->isMasterServer())
320  {
321    // execute everytthing the master server shoudl do
322    if ( this->clientSocket )
323      this->clientSocket->update();
324    if( this->proxySocket)
325      this->proxySocket->update();
326
327    this->updateConnectionList();
328  }
329  else if( SharedNetworkData::getInstance()->isProxyServerActive())
330  {
331    //execute everything the proxy server should do
332    if ( this->clientSocket )
333      this->clientSocket->update();
334    if( this->proxySocket)
335      this->proxySocket->update();
336
337    this->updateConnectionList();
338  }
339
340#warning make this more modular: every proxy/master server connection should be watched for termination
341  if( !SharedNetworkData::getInstance()->isMasterServer())
342  {
343    // check if the connection is ok else terminate and remove
344    if ( !peers.empty() && peers[NET_ID_MASTER_SERVER].socket &&
345          ( !peers[NET_ID_MASTER_SERVER].socket->isOk() ||
346          peers[NET_ID_MASTER_SERVER].connectionMonitor->hasTimedOut() ) )
347    {
348      this->handleDisconnect( NET_ID_MASTER_SERVER);
349      PRINTF(1)("lost connection to server\n");
350    }
351    // check if there is a redirection command
352    if( this->bRedirect)
353    {
354      this->handleReconnect( NET_ID_MASTER_SERVER);
355    }
356  }
357
358  this->cleanUpOldSyncList();
359  this->handleHandshakes();
360
361  // update the network monitor
362  this->networkMonitor->process();
363
364  // order of up/downstream is important!!!!
365  // don't change it
366  this->handleDownstream( tick );
367  this->handleUpstream( tick );
368}
369
370
371/**
372 * @brief handles incoming connections
373 *
374 * if we are a NET_MASTER_SERVER or NET_PROXY_SERVER_ACTIVE update the connection list to accept new connections (clients)
375 * start and initialize the handsake for the new clients
376 */
377void NetworkStream::updateConnectionList( )
378{
379  //check for new connections
380
381  NetworkSocket* tempNetworkSocket = NULL;
382  int userId;
383
384  if( this->clientSocket != NULL)
385  {
386    tempNetworkSocket = this->clientSocket->getNewSocket();
387
388    // we got new NET_CLIENT connecting
389    if ( tempNetworkSocket )
390    {
391      // get a userId
392//       if ( freeSocketSlots.size() > 0 )
393//       {
394//         // this should never be called
395//         userId = freeSocketSlots.back();
396//         freeSocketSlots.pop_back();
397//       }
398//       else
399      {
400        // each server (proxy and master) have an address space for new network nodes of 1000 nodes
401        userId = SharedNetworkData::getInstance()->getHostID() * 1000 + 1;
402
403        for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
404          if ( it->first >= userId )
405            userId = it->first + 1;
406
407        // make sure that this server only uses an address space of 1000
408        assert( userId < (SharedNetworkData::getInstance()->getHostID() + 1) * 1000);
409      }
410      // this creates a new entry in the peers list
411      peers[userId].socket = tempNetworkSocket;
412      peers[userId].nodeType = NET_CLIENT;
413
414      // handle the newly connected client
415      this->handleConnect(userId);
416
417      PRINTF(0)("New Client: %d\n", userId);
418    }
419  }
420
421
422  if( this->proxySocket != NULL)
423  {
424    tempNetworkSocket = this->proxySocket->getNewSocket();
425
426    // we got new NET_PROXY_SERVER_ACTIVE connecting
427    if ( tempNetworkSocket )
428    {
429      // determine the network node id
430      if ( freeSocketSlots.size() > 0 )
431      {
432        userId = freeSocketSlots.back();
433        freeSocketSlots.pop_back();
434      }
435      else
436      {
437        userId = 1;
438
439        for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
440          if ( it->first >= userId )
441            userId = it->first + 1;
442      }
443
444      // this creates a new entry in the peers list
445      peers[userId].socket = tempNetworkSocket;
446      peers[userId].nodeType = NET_PROXY_SERVER_ACTIVE;
447
448      // handle the newly connected proxy server
449      this->handleConnect(userId);
450
451      PRINTF(0)("New proxy connected: %d\n", userId);
452    }
453  }
454
455
456
457  //check if connections are ok else remove them
458  for ( PeerList::iterator it = peers.begin(); it != peers.end(); )
459  {
460    if (
461          it->second.socket &&
462          (
463            !it->second.socket->isOk()  ||
464            it->second.connectionMonitor->hasTimedOut()
465          )
466       )
467    {
468      std::string reason = "disconnected";
469      if ( it->second.connectionMonitor->hasTimedOut() )
470        reason = "timeout";
471      PRINTF(0)("Client is gone: %d (%s)\n", it->second.userId, reason.c_str());
472
473
474      this->handleDisconnect( it->second.userId);
475
476      if( SharedNetworkData::getInstance()->isProxyServerActive())
477        ProxyControl::getInstance()->signalLeaveClient(it->second.userId);
478
479      it++;
480      continue;
481    }
482
483    it++;
484  }
485
486
487}
488
489
490/**
491 * this handles new connections
492 * @param userId: the id of the new user node
493 */
494void NetworkStream::handleConnect( int userId)
495{
496  // create new handshake and init its variables
497  peers[userId].handshake = new Handshake(this->pInfo->nodeType, userId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID());
498  peers[userId].handshake->setUniqueID(userId);
499
500  peers[userId].connectionMonitor = new ConnectionMonitor( userId );
501  peers[userId].userId = userId;
502
503  PRINTF(0)("num sync: %d\n", synchronizeables.size());
504
505  // get the proxy server informations and write them to the handshake, if any (proxy)
506  assert( this->networkMonitor != NULL);
507  PeerInfo* pi = this->networkMonitor->getFirstChoiceProxy();
508  if( pi != NULL)
509  {
510    peers[userId].handshake->setProxy1Address( pi->ip);
511  }
512  pi = this->networkMonitor->getSecondChoiceProxy();
513  if( pi != NULL)
514    peers[userId].handshake->setProxy2Address( pi->ip);
515
516  // check if the connecting client should reconnect to a proxy server
517  if( SharedNetworkData::getInstance()->isMasterServer())
518    peers[userId].handshake->setRedirect(/*this->networkMonitor->isReconnectNextClient()*/false);
519
520  // the connecting node of course is a client
521  peers[userId].ip = peers[userId].socket->getRemoteAddress();
522}
523
524
525
526/**
527 * some debug output
528 */
529void NetworkStream::debug()
530{
531  if( SharedNetworkData::getInstance()->isMasterServer()) {
532    PRINT(0)(" Host ist Master Server with ID: %i\n", this->pInfo->userId);
533  }
534  else if( SharedNetworkData::getInstance()->isProxyServerActive()) {
535    PRINT(0)(" Host ist Proxy Server with ID: %i\n", this->pInfo->userId);
536  }
537  else {
538    PRINT(0)(" Host ist Client with ID: %i\n", this->pInfo->userId);
539  }
540
541  PRINT(0)(" Current number of connections is: %i\n", this->peers.size());
542  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
543  {
544    PRINT(0)("  peers[%i] with uniqueId %i and address: %s\n", it->first, it->second.userId, it->second.ip.ipString().c_str());
545  }
546  PRINT(0)("\n\n");
547
548
549  PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size());
550  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
551  {
552    if( (*it)->beSynchronized() == true)
553      PRINT(0)("  Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassCName(), (*it)->getCName(),
554               (*it)->getUniqueID(), (*it)->beSynchronized());
555  }
556  PRINT(0)(" Maximal Connections: %i\n", SharedNetworkData::getInstance()->getMaxPlayer() );
557
558}
559
560
561/**
562 * @returns the number of synchronizeables registered to this stream
563 */
564int NetworkStream::getSyncCount()
565{
566  int n = 0;
567  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
568    if( (*it)->beSynchronized() == true)
569      ++n;
570
571  //return synchronizeables.size();
572  return n;
573}
574
575
576/**
577 * check if handshakes completed. if so create the network game manager else remove it again
578 */
579void NetworkStream::handleHandshakes( )
580{
581  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
582  {
583    if ( it->second.handshake )
584    {
585      // handshake finished
586      if ( it->second.handshake->completed() )
587      {
588        //handshake is correct
589        if ( it->second.handshake->ok() )
590        {
591          // write the first informations into the node so they can be read from there for case differentiation
592          it->second.nodeType = it->second.handshake->getRemoteNodeType();
593
594          // the counter part didn't mark it free for deletion yet
595          if ( !it->second.handshake->allowDel() )
596          {
597            // make sure this is a connection:
598            // - client       <==> master server
599            // - proxy server <==> master server
600            if(  SharedNetworkData::getInstance()->isClient() ||
601                 SharedNetworkData::getInstance()->isProxyServerActive() &&
602                 SharedNetworkData::getInstance()->isUserMasterServer(it->second.userId))
603            {
604              PRINTF(4)("Handshake: i am in client role\n");
605
606              SharedNetworkData::getInstance()->setHostID( it->second.handshake->getHostId() );
607              this->pInfo->userId = SharedNetworkData::getInstance()->getHostID();
608
609#warning the ip address is not set here because it results in a segfault when connecting to a proxy server => trace this later
610//               it->second.ip = it->second.socket->getRemoteAddress();
611
612              // it->second.nodeType = it->second.handshake->getRemoteNodeType();
613              // it->second.ip = it->second.socket->getRemoteAddress();
614              // add the new server to the nodes list (it can be a NET_MASTER_SERVER or NET_PROXY_SERVER)
615              this->networkMonitor->addNode(&it->second);
616              // get proxy 1 address and add it
617              this->networkMonitor->addNode(it->second.handshake->getProxy1Address(), NET_PROXY_SERVER_ACTIVE);
618              // get proxy 2 address and add it
619              this->networkMonitor->addNode(it->second.handshake->getProxy2Address(), NET_PROXY_SERVER_ACTIVE);
620
621              // now check if the server accepted the connection
622              if( SharedNetworkData::getInstance()->isClient() && it->second.handshake->redirect() )
623              {
624                this->bRedirect = true;
625              }
626
627              // create the new network game manager and init it
628              this->networkGameManager = NetworkGameManager::getInstance();
629              this->networkGameManager->setUniqueID( it->second.handshake->getNetworkGameManagerId() );
630              // init the new message manager
631              MessageManager::getInstance()->setUniqueID( it->second.handshake->getMessageManagerId() );
632            }
633
634            PRINT(0)("handshake finished id=%d\n", it->second.handshake->getNetworkGameManagerId());
635            it->second.handshake->del();
636
637          }
638          else
639          {
640            // handshake finished registring new player
641            if ( it->second.handshake->canDel() )
642            {
643
644              if (  SharedNetworkData::getInstance()->isMasterServer() )
645              {
646                it->second.ip = it->second.socket->getRemoteAddress();
647
648                this->networkMonitor->addNode(&it->second);
649
650                this->handleNewClient( it->second.userId );
651
652                if ( PlayerStats::getStats( it->second.userId ) && it->second.handshake->getPreferedNickName() != "" )
653                {
654                  PlayerStats::getStats( it->second.userId )->setNickName( it->second.handshake->getPreferedNickName() );
655                }
656              }
657              else if ( SharedNetworkData::getInstance()->isProxyServerActive() && it->second.isClient() )
658              {
659                PRINTF(4)("Handshake: Proxy in server role: connecting %i\n", it->second.userId);
660
661                it->second.ip = it->second.socket->getRemoteAddress();
662
663                this->networkMonitor->addNode(&it->second);
664
665                // work with the ProxyControl to init the new client
666                ProxyControl::getInstance()->signalNewClient( it->second.userId);
667
668#warning dont know if this works: nick name handling
669                if ( PlayerStats::getStats( it->second.userId ) && it->second.handshake->getPreferedNickName() != "" )
670                {
671                  PlayerStats::getStats( it->second.userId )->setNickName( it->second.handshake->getPreferedNickName() );
672                }
673              }
674
675              PRINT(0)("handshake finished delete it\n");
676              delete it->second.handshake;
677              it->second.handshake = NULL;
678            }
679          }
680
681        }
682        else
683        {
684          PRINT(1)("handshake failed!\n");
685          it->second.socket->disconnectServer();
686        }
687      }
688    }
689  }
690}
691
692
693/**
694 * this functions handles a reconnect event received from the a NET_MASTER_SERVER or NET_PROXY_SERVER
695 */
696void NetworkStream::handleReconnect(int userId)
697{
698  this->bRedirect = false;
699#warning this peer will be created if it does not yet exist: dangerous
700  PeerInfo* pInfo = &this->peers[userId];
701
702  PRINTF(0)("===============================================\n");
703  PRINTF(0)("Client is redirected to the other proxy servers\n");
704  PRINTF(0)("  user id: %i\n", userId);
705  PRINTF(0)("  connecting to: %s\n", this->networkMonitor->getFirstChoiceProxy()->ip.ipString().c_str());
706  PRINTF(0)("===============================================\n");
707
708  // flush the old synchronization states, since the numbering could be completely different
709  pInfo->lastAckedState = 0;
710  pInfo->lastRecvedState = 0;
711
712  // temp save the ip address here
713  IP proxyIP = pInfo->handshake->getProxy1Address();
714
715  // disconnect from the current server and reconnect to proxy server
716  this->handleDisconnect( userId);
717  this->connectToProxyServer(NET_ID_PROXY_SERVER_01, proxyIP.ipString(), 9999);
718  #warning the ports are not yet integrated correctly in the ip class
719
720  // and restart the handshake
721  this->startHandshake( userId);
722}
723
724
725/**
726 * handles the disconnect event
727 * @param userId id of the user to remove
728 */
729void NetworkStream::handleDisconnect( int userId )
730{
731  peers[userId].socket->disconnectServer();
732  delete peers[userId].socket;
733  peers[userId].socket = NULL;
734
735  if ( peers[userId].handshake )
736    delete peers[userId].handshake;
737  peers[userId].handshake = NULL;
738
739  if ( peers[userId].connectionMonitor )
740    delete peers[userId].connectionMonitor;
741  peers[userId].connectionMonitor = NULL;
742
743
744  for ( SynchronizeableList::iterator it2 = synchronizeables.begin(); it2 != synchronizeables.end(); it2++ )  {
745    (*it2)->cleanUpUser( userId );
746  }
747
748  if( SharedNetworkData::getInstance()->isMasterServer())
749    NetworkGameManager::getInstance()->signalLeftPlayer(userId);
750
751  this->freeSocketSlots.push_back( userId );
752
753  peers.erase( userId);
754}
755
756
757
758/**
759 * handle upstream network traffic
760 * @param tick: seconds elapsed since last update
761 */
762void NetworkStream::handleUpstream( int tick )
763{
764  int offset;
765  int n;
766
767  for ( PeerList::reverse_iterator peer = peers.rbegin(); peer != peers.rend(); peer++ )
768  {
769    offset = INTSIZE; // reserve enough space for the packet length
770
771    // continue with the next peer if this peer has no socket assigned (therefore no network)
772    if ( !peer->second.socket )
773      continue;
774
775    // header informations: current state
776    n = Converter::intToByteArray( currentState, buf + offset, UDP_PACKET_SIZE - offset );
777    assert( n == INTSIZE );
778    offset += n;
779
780    // header informations: last acked state
781    n = Converter::intToByteArray( peer->second.lastAckedState, buf + offset, UDP_PACKET_SIZE - offset );
782    assert( n == INTSIZE );
783    offset += n;
784
785    // header informations: last recved state
786    n = Converter::intToByteArray( peer->second.lastRecvedState, buf + offset, UDP_PACKET_SIZE - offset );
787    assert( n == INTSIZE );
788    offset += n;
789
790    // now write all synchronizeables in the packet
791    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
792    {
793
794      int oldOffset = offset;
795      Synchronizeable & sync = **it;
796
797
798      // do not include synchronizeables with uninit id and syncs that don't want to be synchronized
799      if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED )
800        continue;
801
802      // if handshake not finished only sync handshake
803      if ( peer->second.handshake && sync.getLeafClassID() != CL_HANDSHAKE )
804        continue;
805
806      // if we are a server (both master and proxy servers) and this is not our handshake
807      if ( ( SharedNetworkData::getInstance()->isMasterServer() ||
808             SharedNetworkData::getInstance()->isProxyServerActive() &&  peer->second.isClient())
809             && sync.getLeafClassID() == CL_HANDSHAKE && sync.getUniqueID() != peer->second.userId )
810        continue;
811
812      /* list of synchronizeables that will never be synchronized over the network: */
813      // do not sync null parent
814      if ( sync.getLeafClassID() == CL_NULL_PARENT )
815        continue;
816
817
818      assert( sync.getLeafClassID() != 0);
819
820      assert( offset + INTSIZE <= UDP_PACKET_SIZE );
821
822      // server fakes uniqueid == 0 for handshake synchronizeable
823      if ( ( SharedNetworkData::getInstance()->isMasterServer() ||
824             SharedNetworkData::getInstance()->isProxyServerActive() &&  peer->second.isClient() ) &&
825             ( sync.getUniqueID() >= 1000 || sync.getUniqueID() <= SharedNetworkData::getInstance()->getMaxPlayer() + 1)
826             /*<= SharedNetworkData::getInstance()->getMaxPlayer() + 1*/) // plus one to handle one client more than the max to redirect it
827        n = Converter::intToByteArray( 0, buf + offset, UDP_PACKET_SIZE - offset );
828      else
829        n = Converter::intToByteArray( sync.getUniqueID(), buf + offset, UDP_PACKET_SIZE - offset );
830
831
832      assert( n == INTSIZE );
833      offset += n;
834
835      // make space for packet size
836      offset += INTSIZE;
837
838      n = sync.getStateDiff( peer->second.userId, buf + offset, UDP_PACKET_SIZE-offset, currentState, peer->second.lastAckedState, -1000 );
839      offset += n;
840
841      assert( Converter::intToByteArray( n, buf + offset - n - INTSIZE, INTSIZE ) == INTSIZE );
842
843      // check if all data bytes == 0 -> remove data and the synchronizeable from the sync process since there is no update
844      // TODO not all synchronizeables like this maybe add Synchronizeable::canRemoveZeroDiff()
845      bool allZero = true;
846      for ( int i = 0; i < n; i++ )
847      {
848         if ( buf[i+oldOffset+2*INTSIZE] != 0 )
849           allZero = false;
850      }
851      // if there is no new data in this synchronizeable reset the data offset to the last state -> dont synchronizes
852      // data that hast not changed
853      if ( allZero )
854      {
855        offset = oldOffset;
856      }
857    } // all synchronizeables written
858
859
860
861    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
862    {
863      Synchronizeable & sync = **it;
864
865      // again exclude all unwanted syncs
866      if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED)
867        continue;
868
869      sync.handleSentState( peer->second.userId, currentState, peer->second.lastAckedState );
870    }
871
872
873    assert( Converter::intToByteArray( offset, buf, INTSIZE ) == INTSIZE );
874
875    // now compress the data with the zip library
876    int compLength = 0;
877    if ( SharedNetworkData::getInstance()->isMasterServer() ||
878         SharedNetworkData::getInstance()->isProxyServerActive())
879      compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE, dictServer );
880    else
881      compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE, dictClient );
882
883    if ( compLength <= 0 )
884    {
885      PRINTF(1)("compression failed!\n");
886      continue;
887    }
888
889    assert( peer->second.socket->writePacket( compBuf, compLength ) );
890
891    if ( this->remainingBytesToWriteToDict > 0 )
892      writeToNewDict( buf, offset, true );
893
894    peer->second.connectionMonitor->processUnzippedOutgoingPacket( tick, buf, offset, currentState );
895    peer->second.connectionMonitor->processZippedOutgoingPacket( tick, compBuf, compLength, currentState );
896
897  }
898}
899
900/**
901 * handle downstream network traffic
902 */
903void NetworkStream::handleDownstream( int tick )
904{
905  int offset = 0;
906
907  int length = 0;
908  int packetLength = 0;
909  int compLength = 0;
910  int uniqueId = 0;
911  int state = 0;
912  int ackedState = 0;
913  int fromState = 0;
914  int syncDataLength = 0;
915
916  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
917  {
918
919    if ( !peer->second.socket )
920      continue;
921
922    while ( 0 < (compLength = peer->second.socket->readPacket( compBuf, UDP_PACKET_SIZE )) )
923    {
924      peer->second.connectionMonitor->processZippedIncomingPacket( tick, compBuf, compLength );
925
926      packetLength = Zip::getInstance()->unZip( compBuf, compLength, buf, UDP_PACKET_SIZE );
927
928      if ( packetLength < 4*INTSIZE )
929      {
930        if ( packetLength != 0 )
931          PRINTF(1)("got too small packet: %d\n", packetLength);
932        continue;
933      }
934
935      if ( this->remainingBytesToWriteToDict > 0 )
936        writeToNewDict( buf, packetLength, false );
937
938      assert( Converter::byteArrayToInt( buf, &length ) == INTSIZE );
939      assert( Converter::byteArrayToInt( buf + INTSIZE, &state ) == INTSIZE );
940      assert( Converter::byteArrayToInt( buf + 2*INTSIZE, &fromState ) == INTSIZE );
941      assert( Converter::byteArrayToInt( buf + 3*INTSIZE, &ackedState ) == INTSIZE );
942      offset = 4*INTSIZE;
943
944      peer->second.connectionMonitor->processUnzippedIncomingPacket( tick, buf, packetLength, state, ackedState );
945
946
947      //if this is an old state drop it
948      if ( state <= peer->second.lastRecvedState )
949        continue;
950
951      if ( packetLength != length )
952      {
953        PRINTF(1)("real packet length (%d) and transmitted packet length (%d) do not match!\n", packetLength, length);
954        peer->second.socket->disconnectServer();
955        continue;
956      }
957
958      while ( offset + 2 * INTSIZE < length )
959      {
960        // read the unique id of the sync
961        assert( offset > 0 );
962        assert( Converter::byteArrayToInt( buf + offset, &uniqueId ) == INTSIZE );
963        offset += INTSIZE;
964
965        // read the data length
966        assert( Converter::byteArrayToInt( buf + offset, &syncDataLength ) == INTSIZE );
967        offset += INTSIZE;
968
969        assert( syncDataLength > 0 );
970        assert( syncDataLength < 10000 );
971
972        Synchronizeable * sync = NULL;
973
974        // look for the synchronizeable in question
975        for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
976        {
977          // client thinks his handshake has a special id: hostId * 1000 (host id of this server)
978          if ( (*it)->getUniqueID() == uniqueId ||
979                 ( uniqueId == 0  && (*it)->getUniqueID() == peer->second.userId ) )
980          {
981            sync = *it;
982            break;
983          }
984        }
985
986        // this synchronizeable does not yet exist! create it
987        if ( sync == NULL )
988        {
989          PRINTF(0)("could not find sync with id %d. try to create it\n", uniqueId);
990
991          // if it is an old synchronizeable already removed, ignore it
992          if ( oldSynchronizeables.find( uniqueId ) != oldSynchronizeables.end() )
993          {
994            offset += syncDataLength;
995            continue;
996          }
997
998          // if the node we got this unknown sync we ignore it if:
999          //  - the remote host is a client
1000          //  - the remote host is a proxy server and we are master server
1001          // (since it has no rights to create a new sync)
1002          if ( peers[peer->second.userId].isClient() ||
1003               (peers[peer->second.userId].isProxyServerActive() && SharedNetworkData::getInstance()->isMasterServer()))
1004          {
1005            offset += syncDataLength;
1006            continue;
1007          }
1008
1009          int leafClassId;
1010          if ( INTSIZE > length - offset )
1011          {
1012            offset += syncDataLength;
1013            continue;
1014          }
1015
1016          Converter::byteArrayToInt( buf + offset, &leafClassId );
1017
1018          assert( leafClassId != 0 );
1019
1020
1021          BaseObject * b = NULL;
1022          /* These are some small exeptions in creation: Not all objects can/should be created via Factory */
1023          /* Exception 1: NullParent */
1024          if( leafClassId == CL_NULL_PARENT || leafClassId == CL_SYNCHRONIZEABLE || leafClassId == CL_NETWORK_GAME_MANAGER )
1025          {
1026            PRINTF(1)("Don't create Object with ID %x, ignored!\n", (int)leafClassId);
1027            offset += syncDataLength;
1028            continue;
1029          }
1030          else
1031            b = Factory::fabricate( (ClassID)leafClassId );
1032
1033          if ( !b )
1034          {
1035            PRINTF(1)("Could not fabricate Object with classID %x\n", leafClassId);
1036            offset += syncDataLength;
1037            continue;
1038          }
1039
1040          if ( b->isA(CL_SYNCHRONIZEABLE) )
1041          {
1042            sync = dynamic_cast<Synchronizeable*>(b);
1043            sync->setUniqueID( uniqueId );
1044            sync->setSynchronized(true);
1045
1046            PRINTF(0)("Fabricated %s with id %d\n", sync->getClassCName(), sync->getUniqueID());
1047          }
1048          else
1049          {
1050            PRINTF(1)("Class with ID %x is not a synchronizeable!\n", (int)leafClassId);
1051            delete b;
1052            offset += syncDataLength;
1053            continue;
1054          }
1055        }
1056
1057
1058        int n = sync->setStateDiff( peer->second.userId, buf+offset, syncDataLength, state, fromState );
1059        offset += n;
1060
1061      }
1062
1063      if ( offset != length )
1064      {
1065        PRINTF(0)("offset (%d) != length (%d)\n", offset, length);
1066        peer->second.socket->disconnectServer();
1067      }
1068
1069
1070      for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
1071      {
1072        Synchronizeable & sync = **it;
1073
1074        if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED )
1075          continue;
1076
1077        sync.handleRecvState( peer->second.userId, state, fromState );
1078      }
1079
1080      assert( peer->second.lastAckedState <= ackedState );
1081      peer->second.lastAckedState = ackedState;
1082
1083      assert( peer->second.lastRecvedState < state );
1084      peer->second.lastRecvedState = state;
1085
1086    }
1087
1088  }
1089
1090}
1091
1092/**
1093 * is executed when a handshake has finished
1094 */
1095void NetworkStream::handleNewClient( int userId )
1096{
1097  // init and assign the message manager
1098  MessageManager::getInstance()->initUser( userId );
1099  // do all game relevant stuff here
1100  networkGameManager->signalNewPlayer( userId );
1101}
1102
1103
1104/**
1105 * removes old items from oldSynchronizeables
1106 */
1107void NetworkStream::cleanUpOldSyncList( )
1108{
1109  int now = SDL_GetTicks();
1110
1111  for ( std::map<int,int>::iterator it = oldSynchronizeables.begin(); it != oldSynchronizeables.end();  )
1112  {
1113    if ( it->second < now - 10*1000 )
1114    {
1115      std::map<int,int>::iterator delIt = it;
1116      it++;
1117      oldSynchronizeables.erase( delIt );
1118      continue;
1119    }
1120    it++;
1121  }
1122}
1123
1124/**
1125 * writes data to DATA/dicts/newdict
1126 * @param data pointer to data
1127 * @param length length
1128 */
1129void NetworkStream::writeToNewDict( byte * data, int length, bool upstream )
1130{
1131  if ( remainingBytesToWriteToDict <= 0 )
1132    return;
1133
1134  if ( length > remainingBytesToWriteToDict )
1135    length = remainingBytesToWriteToDict;
1136
1137  std::string fileName = ResourceManager::getInstance()->getDataDir();
1138  fileName += "/dicts/newdict";
1139
1140  if ( upstream )
1141    fileName += "_upstream";
1142  else
1143    fileName += "_downstream";
1144
1145  FILE * f = fopen( fileName.c_str(), "a" );
1146
1147  if ( !f )
1148  {
1149    PRINTF(2)("could not open %s\n", fileName.c_str());
1150    remainingBytesToWriteToDict = 0;
1151    return;
1152  }
1153
1154  if ( fwrite( data, 1, length, f ) != length )
1155  {
1156    PRINTF(2)("could not write to file\n");
1157    fclose( f );
1158    return;
1159  }
1160
1161  fclose( f );
1162
1163  remainingBytesToWriteToDict -= length;
1164}
1165
1166
1167
1168
1169
1170
Note: See TracBrowser for help on using the repository browser.