Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/network_stream.cc @ 9537

Last change on this file since 9537 was 9537, checked in by patrick, 18 years ago

found the bug. there is need for another control center for proxies to signal new clients and leavings. This is a central part of the proxy synchronization process

File size: 34.6 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11### File Specific:
12   main-programmer: Christoph Renner rennerc@ee.ethz.ch
13   co-programmer:   Patrick Boenzli  boenzlip@orxonox.ethz.ch
14
15     June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch)
16     July 2006: some code rearangement and integration of the proxy server mechanism (boenzlip@ee.ethz.ch)
17*/
18
19
20#define DEBUG_MODULE_NETWORK
21
22
23#include "base_object.h"
24#include "network_protocol.h"
25#include "udp_socket.h"
26#include "udp_server_socket.h"
27#include "monitor/connection_monitor.h"
28#include "monitor/network_monitor.h"
29#include "synchronizeable.h"
30#include "ip.h"
31#include "network_game_manager.h"
32#include "shared_network_data.h"
33#include "message_manager.h"
34#include "preferences.h"
35#include "zip.h"
36
37#include "src/lib/util/loading/resource_manager.h"
38
39#include "network_log.h"
40
41#include "player_stats.h"
42
43#include "lib/util/loading/factory.h"
44
45#include "debug.h"
46#include "class_list.h"
47#include <algorithm>
48
49
50#include "network_stream.h"
51
52
53#include "converter.h"
54
55
56#define PACKAGE_SIZE  256
57
58
59/**
60 * empty constructor
61 */
62NetworkStream::NetworkStream()
63    : DataStream()
64{
65  this->init();
66  /* initialize the references */
67  this->pInfo->nodeType = NET_UNASSIGNED;
68}
69
70
71NetworkStream::NetworkStream( int nodeType)
72{
73  this->init();
74
75  this->pInfo->nodeType = nodeType;
76
77  switch( nodeType)
78  {
79    case NET_MASTER_SERVER:
80      // init the shared network data
81      SharedNetworkData::getInstance()->setHostID(NET_ID_MASTER_SERVER);
82      break;
83    case NET_PROXY_SERVER_ACTIVE:
84      // init the shared network data
85      SharedNetworkData::getInstance()->setHostID(NET_ID_PROXY_SERVER_01);
86      break;
87    case NET_PROXY_SERVER_PASSIVE:
88      // init the shared network data
89      SharedNetworkData::getInstance()->setHostID(NET_ID_PROXY_SERVER_01);
90      break;
91    case NET_CLIENT:
92      SharedNetworkData::getInstance()->setHostID(NET_ID_UNASSIGNED);
93      break;
94  }
95
96  SharedNetworkData::getInstance()->setDefaultSyncStream(this);
97
98  // get the local ip address
99  IPaddress ip;
100  SDLNet_ResolveHost( &ip, NULL, 0);
101  this->pInfo->ip = ip;
102}
103
104
105
106/**
107 * generic init functions
108 */
109void NetworkStream::init()
110{
111  /* set the class id for the base object */
112  this->setClassID(CL_NETWORK_STREAM, "NetworkStream");
113  this->clientSocket = NULL;
114  this->proxySocket = NULL;
115  this->networkGameManager = NULL;
116  this->networkMonitor = NULL;
117
118  this->pInfo = new PeerInfo();
119  this->pInfo->userId = 0;
120  this->pInfo->lastAckedState = 0;
121  this->pInfo->lastRecvedState = 0;
122
123  this->bRedirect = false;
124
125  this->currentState = 0;
126
127  remainingBytesToWriteToDict = Preferences::getInstance()->getInt( "compression", "writedict", 0 );
128
129  assert( Zip::getInstance()->loadDictionary( "testdict" ) >= 0 );
130  this->dictClient = Zip::getInstance()->loadDictionary( "dict2pl_client" );
131  assert( this->dictClient >= 0 );
132  this->dictServer = Zip::getInstance()->loadDictionary( "dict2p_server" );
133  assert( this->dictServer >= 0 );
134}
135
136
137/**
138 * deconstructor
139 */
140NetworkStream::~NetworkStream()
141{
142  if ( this->clientSocket )
143  {
144    clientSocket->close();
145    delete clientSocket;
146    clientSocket = NULL;
147  }
148  if ( this->proxySocket)
149  {
150    proxySocket->close();
151    delete proxySocket;
152    proxySocket = NULL;
153  }
154  for ( PeerList::iterator i = peers.begin(); i!=peers.end(); i++)
155  {
156    if ( i->second.socket )
157    {
158      i->second.socket->disconnectServer();
159      delete i->second.socket;
160      i->second.socket = NULL;
161    }
162
163    if ( i->second.handshake )
164    {
165      delete i->second.handshake;
166      i->second.handshake = NULL;
167    }
168
169    if ( i->second.connectionMonitor )
170    {
171      delete i->second.connectionMonitor;
172      i->second.connectionMonitor = NULL;
173    }
174  }
175  for ( SynchronizeableList::const_iterator it = getSyncBegin(); it != getSyncEnd(); it ++ )
176    (*it)->setNetworkStream( NULL );
177
178  if( this->pInfo)
179    delete this->pInfo;
180
181  if( this->networkMonitor)
182    delete this->networkMonitor;
183}
184
185
186/**
187 * establish a connection to a remote master server
188 * @param host: host name
189 * @param port: the port number
190 */
191void NetworkStream::connectToMasterServer(std::string host, int port)
192{
193  int node = NET_ID_MASTER_SERVER;
194  // this create the new node in the peers map
195  this->peers[node].socket = new UdpSocket( host, port );
196  this->peers[node].userId = NET_ID_MASTER_SERVER;
197
198  this->peers[node].nodeType = NET_MASTER_SERVER;
199  this->peers[node].connectionMonitor = new ConnectionMonitor( NET_ID_MASTER_SERVER );
200  this->peers[node].ip = this->peers[node].socket->getRemoteAddress();
201}
202
203
204/**
205 * establish a connection to a remote proxy server
206 * @param host: host name
207 * @param port: the port number
208 */
209void NetworkStream::connectToProxyServer(int proxyId, std::string host, int port)
210{
211  PRINTF(0)("connect to proxy %s, this is proxyId %i\n", host.c_str(), proxyId);
212
213  // this creates the new proxyId in the peers map
214  this->peers[proxyId].socket = new UdpSocket( host, port );
215  this->peers[proxyId].userId = proxyId;
216
217  this->peers[proxyId].nodeType = NET_PROXY_SERVER_ACTIVE;
218  this->peers[proxyId].connectionMonitor = new ConnectionMonitor( proxyId );
219  this->peers[proxyId].ip = this->peers[proxyId].socket->getRemoteAddress();
220}
221
222
223/**
224 * create a server
225 * @param port: interface port for all clients
226 */
227void NetworkStream::createServer(int clientPort, int proxyPort)
228{
229  PRINTF(0)(" Creating new Server: listening for clients on port %i and for proxies on port %i", clientPort, proxyPort);
230  this->clientSocket= new UdpServerSocket(clientPort);
231  this->proxySocket = new UdpServerSocket(proxyPort);
232}
233
234
235/**
236 * creates a new instance of the network game manager
237 */
238void NetworkStream::createNetworkGameManager()
239{
240  this->networkGameManager = NetworkGameManager::getInstance();
241
242  this->networkGameManager->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
243  MessageManager::getInstance()->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
244}
245
246
247/**
248 * starts the network handshake
249 * handsakes are always initialized from the client side first. this starts the handshake and therefore is only
250 * executed as client
251 * @param userId: start handshake for this user id (optional, default == 0)
252 */
253void NetworkStream::startHandshake(int userId)
254{
255  Handshake* hs = new Handshake(this->pInfo->nodeType);
256  // fake the unique id
257  hs->setUniqueID( NET_UID_HANDSHAKE );
258  assert( peers[userId].handshake == NULL );
259  peers[userId].handshake = hs;
260
261  // set the preferred nick name
262  hs->setPreferedNickName( Preferences::getInstance()->getString( "multiplayer", "nickname", "Player" ) );
263
264  PRINTF(0)("NetworkStream: Handshake created: %s\n", hs->getCName());
265}
266
267
268/**
269 * this functions connects a synchronizeable to the networkstream, therefore synchronizeing
270 * it all over the network and creating it on the other platforms (if and only if it is a
271 * server
272 * @param sync: the synchronizeable to add
273 */
274void NetworkStream::connectSynchronizeable(Synchronizeable& sync)
275{
276  this->synchronizeables.push_back(&sync);
277  sync.setNetworkStream( this );
278}
279
280
281/**
282 * removes the synchronizeable from the list of synchronized entities
283 * @param sync: the syncronizeable to remove
284 */
285void NetworkStream::disconnectSynchronizeable(Synchronizeable& sync)
286{
287  // removing the Synchronizeable from the List.
288  std::list<Synchronizeable*>::iterator disconnectSynchro = std::find(this->synchronizeables.begin(), this->synchronizeables.end(), &sync);
289  if (disconnectSynchro != this->synchronizeables.end())
290    this->synchronizeables.erase(disconnectSynchro);
291
292  oldSynchronizeables[sync.getUniqueID()] = SDL_GetTicks();
293}
294
295
296/**
297 * this is called to process data from the network socket to the synchronizeable and vice versa
298 */
299void NetworkStream::processData()
300{
301  // create the network monitor after all the init work and before there is any connection handlings
302  if( this->networkMonitor == NULL)
303    this->networkMonitor = new NetworkMonitor(this);
304
305
306  int tick = SDL_GetTicks();
307
308  this->currentState++;
309  // there was a wrap around
310  if( this->currentState < 0)
311  {
312    PRINTF(1)("A wrap around in the state variable as occured. The server was running so long? Pls restart server or write a mail to the supporters!\n");
313  }
314
315  if ( SharedNetworkData::getInstance()->isMasterServer())
316  {
317    // execute everytthing the master server shoudl do
318    if ( this->clientSocket )
319      this->clientSocket->update();
320    if( this->proxySocket)
321      this->proxySocket->update();
322
323    this->updateConnectionList();
324  }
325  else if( SharedNetworkData::getInstance()->isProxyServerActive())
326  {
327    //execute everything the proxy server should do
328    if ( this->clientSocket )
329      this->clientSocket->update();
330    if( this->proxySocket)
331      this->proxySocket->update();
332
333    this->updateConnectionList();
334  }
335
336#warning make this more modular: every proxy/master server connection should be watched for termination
337  if( !SharedNetworkData::getInstance()->isMasterServer())
338  {
339    // check if the connection is ok else terminate and remove
340    if ( !peers.empty() && peers[NET_ID_MASTER_SERVER].socket &&
341          ( !peers[NET_ID_MASTER_SERVER].socket->isOk() ||
342          peers[NET_ID_MASTER_SERVER].connectionMonitor->hasTimedOut() ) )
343    {
344      this->handleDisconnect( NET_ID_MASTER_SERVER);
345      PRINTF(1)("lost connection to server\n");
346    }
347    // check if there is a redirection command
348    if( this->bRedirect)
349    {
350      this->handleReconnect( NET_ID_MASTER_SERVER);
351    }
352  }
353
354  this->cleanUpOldSyncList();
355  this->handleHandshakes();
356
357  // update the network monitor
358  this->networkMonitor->process();
359
360  // order of up/downstream is important!!!!
361  // don't change it
362  this->handleDownstream( tick );
363  this->handleUpstream( tick );
364}
365
366
367/**
368 * @brief handles incoming connections
369 *
370 * if we are a NET_MASTER_SERVER or NET_PROXY_SERVER_ACTIVE update the connection list to accept new connections (clients)
371 * start and initialize the handsake for the new clients
372 */
373void NetworkStream::updateConnectionList( )
374{
375  //check for new connections
376
377  NetworkSocket* tempNetworkSocket = NULL;
378  int userId;
379
380  if( this->clientSocket != NULL)
381  {
382    tempNetworkSocket = this->clientSocket->getNewSocket();
383
384    // we got new NET_CLIENT connecting
385    if ( tempNetworkSocket )
386    {
387      // determine the network node id
388      if ( freeSocketSlots.size() > 0 )
389      {
390        // this should never be called
391        assert(false);
392        userId = freeSocketSlots.back();
393        freeSocketSlots.pop_back();
394      }
395      else
396      {
397        //userId = 1;
398        // each server (proxy and master) have an address space for new network nodes of 1000 nodes
399        userId = SharedNetworkData::getInstance()->getHostID() * 1000;
400
401        for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
402          if ( it->first >= userId )
403            userId = it->first + 1;
404
405        assert( userId < (SharedNetworkData::getInstance()->getHostID() + 1) * 1000);
406      }
407      // this creates a new entry in the peers list
408      peers[userId].socket = tempNetworkSocket;
409      peers[userId].nodeType = NET_CLIENT;
410
411      // handle the newly connected client
412      this->handleConnect(userId);
413
414      PRINTF(0)("New Client: %d\n", userId);
415    }
416  }
417
418
419  if( this->proxySocket != NULL)
420  {
421    tempNetworkSocket = this->proxySocket->getNewSocket();
422
423    // we got new NET_PROXY_SERVER_ACTIVE connecting
424    if ( tempNetworkSocket )
425    {
426      // determine the network node id
427      if ( freeSocketSlots.size() > 0 )
428      {
429        userId = freeSocketSlots.back();
430        freeSocketSlots.pop_back();
431      }
432      else
433      {
434        userId = 1;
435
436        for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
437          if ( it->first >= userId )
438            userId = it->first + 1;
439      }
440
441      // this creates a new entry in the peers list
442      peers[userId].socket = tempNetworkSocket;
443      peers[userId].nodeType = NET_PROXY_SERVER_ACTIVE;
444
445      // handle the newly connected proxy server
446      this->handleConnect(userId);
447
448      PRINTF(0)("New proxy connected: %d\n", userId);
449    }
450  }
451
452
453  //check if connections are ok else remove them
454  for ( PeerList::iterator it = peers.begin(); it != peers.end(); )
455  {
456    if (
457          it->second.socket &&
458          (
459            !it->second.socket->isOk()  ||
460            it->second.connectionMonitor->hasTimedOut()
461          )
462       )
463    {
464      std::string reason = "disconnected";
465      if ( it->second.connectionMonitor->hasTimedOut() )
466        reason = "timeout";
467      PRINTF(0)("Client is gone: %d (%s)\n", it->second.userId, reason.c_str());
468
469      this->handleDisconnect( it->second.userId);
470
471      it++;
472      continue;
473    }
474
475    it++;
476  }
477
478
479}
480
481
482/**
483 * this handles new connections
484 * @param userId: the id of the new user node
485 */
486void NetworkStream::handleConnect( int userId)
487{
488  // create new handshake and init its variables
489  peers[userId].handshake = new Handshake(this->pInfo->nodeType, userId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID());
490  peers[userId].handshake->setUniqueID(userId);
491
492  peers[userId].connectionMonitor = new ConnectionMonitor( userId );
493  peers[userId].userId = userId;
494
495  PRINTF(0)("num sync: %d\n", synchronizeables.size());
496
497  // get the proxy server informations and write them to the handshake, if any (proxy)
498  assert( this->networkMonitor != NULL);
499  PeerInfo* pi = this->networkMonitor->getFirstChoiceProxy();
500  if( pi != NULL)
501  {
502    peers[userId].handshake->setProxy1Address( pi->ip);
503  }
504  pi = this->networkMonitor->getSecondChoiceProxy();
505  if( pi != NULL)
506    peers[userId].handshake->setProxy2Address( pi->ip);
507
508  // check if the connecting client should reconnect to a proxy server
509  if( SharedNetworkData::getInstance()->isMasterServer())
510    peers[userId].handshake->setRedirect(/*this->networkMonitor->isReconnectNextClient()*/false);
511
512  // the connecting node of course is a client
513  peers[userId].ip = peers[userId].socket->getRemoteAddress();
514}
515
516
517
518/**
519 * some debug output
520 */
521void NetworkStream::debug()
522{
523  if( SharedNetworkData::getInstance()->isMasterServer()) {
524    PRINT(0)(" Host ist Master Server with ID: %i\n", this->pInfo->userId);
525  }
526  else if( SharedNetworkData::getInstance()->isProxyServerActive()) {
527    PRINT(0)(" Host ist Proxy Server with ID: %i\n", this->pInfo->userId);
528  }
529  else {
530    PRINT(0)(" Host ist Client with ID: %i\n", this->pInfo->userId);
531  }
532
533  PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size());
534  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
535  {
536    if( (*it)->beSynchronized() == true)
537      PRINT(0)("  Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassCName(), (*it)->getCName(),
538               (*it)->getUniqueID(), (*it)->beSynchronized());
539  }
540  PRINT(0)(" Maximal Connections: %i\n", SharedNetworkData::getInstance()->getMaxPlayer() );
541
542}
543
544
545/**
546 * @returns the number of synchronizeables registered to this stream
547 */
548int NetworkStream::getSyncCount()
549{
550  int n = 0;
551  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
552    if( (*it)->beSynchronized() == true)
553      ++n;
554
555  //return synchronizeables.size();
556  return n;
557}
558
559
560/**
561 * check if handshakes completed. if so create the network game manager else remove it again
562 */
563void NetworkStream::handleHandshakes( )
564{
565  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
566  {
567    if ( it->second.handshake )
568    {
569      // handshake finished
570      if ( it->second.handshake->completed() )
571      {
572        //handshake is correct
573        if ( it->second.handshake->ok() )
574        {
575          // write the first informations into the node so they can be read from there for case differentiation
576          it->second.nodeType = it->second.handshake->getRemoteNodeType();
577
578          // the counter part didn't mark it free for deletion yet
579          if ( !it->second.handshake->allowDel() )
580          {
581            // make sure this is a connection:
582            // - client       <==> master server
583            // - proxy server <==> master server
584            if(  SharedNetworkData::getInstance()->isClient() || SharedNetworkData::getInstance()->isProxyServerActive() && it->second.isMasterServer())
585            {
586              PRINTF(0)("Handshake: i am in client role\n");
587
588              SharedNetworkData::getInstance()->setHostID( it->second.handshake->getHostId() );
589              this->pInfo->userId = SharedNetworkData::getInstance()->getHostID();
590
591#warning the ip address is not set here because it results in a segfault when connecting to a proxy server => trace this later
592//               it->second.ip = it->second.socket->getRemoteAddress();
593
594              // it->second.nodeType = it->second.handshake->getRemoteNodeType();
595              // it->second.ip = it->second.socket->getRemoteAddress();
596              // add the new server to the nodes list (it can be a NET_MASTER_SERVER or NET_PROXY_SERVER)
597              this->networkMonitor->addNode(&it->second);
598              // get proxy 1 address and add it
599              this->networkMonitor->addNode(it->second.handshake->getProxy1Address(), NET_PROXY_SERVER_ACTIVE);
600              // get proxy 2 address and add it
601              this->networkMonitor->addNode(it->second.handshake->getProxy2Address(), NET_PROXY_SERVER_ACTIVE);
602
603              // now check if the server accepted the connection
604              if( SharedNetworkData::getInstance()->isClient() && it->second.handshake->redirect() )
605              {
606                this->bRedirect = true;
607              }
608
609              // create the new network game manager and init it
610              this->networkGameManager = NetworkGameManager::getInstance();
611              this->networkGameManager->setUniqueID( it->second.handshake->getNetworkGameManagerId() );
612              // init the new message manager
613              MessageManager::getInstance()->setUniqueID( it->second.handshake->getMessageManagerId() );
614            }
615
616            PRINT(0)("handshake finished id=%d\n", it->second.handshake->getNetworkGameManagerId());
617            it->second.handshake->del();
618
619          }
620          else
621          {
622            // handshake finished registring new player
623            if ( it->second.handshake->canDel() )
624            {
625
626              if (  SharedNetworkData::getInstance()->isMasterServer() )
627              {
628                it->second.ip = it->second.socket->getRemoteAddress();
629
630                this->networkMonitor->addNode(&it->second);
631
632                this->handleNewClient( it->second.userId );
633
634                if ( PlayerStats::getStats( it->second.userId ) && it->second.handshake->getPreferedNickName() != "" )
635                {
636                  PlayerStats::getStats( it->second.userId )->setNickName( it->second.handshake->getPreferedNickName() );
637                }
638              }
639              else if ( SharedNetworkData::getInstance()->isProxyServerActive() && it->second.isClient() )
640              {
641                PRINTF(0)("Handshake: i am in server role\n");
642
643                it->second.ip = it->second.socket->getRemoteAddress();
644
645                this->networkMonitor->addNode(&it->second);
646
647                // work with the ProxyControl to init the new client
648
649
650//                 this->handleNewClient( it->second.userId );
651
652//                 if ( PlayerStats::getStats( it->second.userId ) && it->second.handshake->getPreferedNickName() != "" )
653//                 {
654//                   PlayerStats::getStats( it->second.userId )->setNickName( it->second.handshake->getPreferedNickName() );
655//                 }
656              }
657
658              PRINT(0)("handshake finished delete it\n");
659              delete it->second.handshake;
660              it->second.handshake = NULL;
661            }
662          }
663
664        }
665        else
666        {
667          PRINT(1)("handshake failed!\n");
668          it->second.socket->disconnectServer();
669        }
670      }
671    }
672  }
673}
674
675
676/**
677 * this functions handles a reconnect event received from the a NET_MASTER_SERVER or NET_PROXY_SERVER
678 */
679void NetworkStream::handleReconnect(int userId)
680{
681  this->bRedirect = false;
682  PeerInfo* pInfo = &this->peers[userId];
683
684  PRINTF(0)("===============================================\n");
685  PRINTF(0)("Client is redirected to the other proxy servers\n");
686  PRINTF(0)("  user id: %i\n", userId);
687  PRINTF(0)("  connecting to: %s\n", this->networkMonitor->getFirstChoiceProxy()->ip.ipString().c_str());
688  PRINTF(0)("===============================================\n");
689
690  // flush the old synchronization states, since the numbering could be completely different
691  pInfo->lastAckedState = 0;
692  pInfo->lastRecvedState = 0;
693
694  // temp save the ip address here
695  IP proxyIP = pInfo->handshake->getProxy1Address();
696
697  // disconnect from the current server and reconnect to proxy server
698  this->handleDisconnect( userId);
699  this->connectToProxyServer(NET_ID_PROXY_SERVER_01, proxyIP.ipString(), 9999);
700  #warning the ports are not yet integrated correctly in the ip class
701
702  // and restart the handshake
703  this->startHandshake( userId);
704}
705
706
707/**
708 * handles the disconnect event
709 * @param userId id of the user to remove
710 */
711void NetworkStream::handleDisconnect( int userId )
712{
713  peers[userId].socket->disconnectServer();
714  delete peers[userId].socket;
715  peers[userId].socket = NULL;
716
717  if ( peers[userId].handshake )
718    delete peers[userId].handshake;
719  peers[userId].handshake = NULL;
720
721  if ( peers[userId].connectionMonitor )
722    delete peers[userId].connectionMonitor;
723  peers[userId].connectionMonitor = NULL;
724
725
726  for ( SynchronizeableList::iterator it2 = synchronizeables.begin(); it2 != synchronizeables.end(); it2++ )  {
727    (*it2)->cleanUpUser( userId );
728  }
729
730  if( SharedNetworkData::getInstance()->isMasterServer())
731    NetworkGameManager::getInstance()->signalLeftPlayer(userId);
732
733  this->freeSocketSlots.push_back( userId );
734
735  peers.erase( userId);
736}
737
738
739
740/**
741 * handle upstream network traffic
742 * @param tick: seconds elapsed since last update
743 */
744void NetworkStream::handleUpstream( int tick )
745{
746  int offset;
747  int n;
748
749  for ( PeerList::reverse_iterator peer = peers.rbegin(); peer != peers.rend(); peer++ )
750  {
751    offset = INTSIZE; // reserve enough space for the packet length
752
753    // continue with the next peer if this peer has no socket assigned (therefore no network)
754    if ( !peer->second.socket )
755      continue;
756
757    // header informations: current state
758    n = Converter::intToByteArray( currentState, buf + offset, UDP_PACKET_SIZE - offset );
759    assert( n == INTSIZE );
760    offset += n;
761
762    // header informations: last acked state
763    n = Converter::intToByteArray( peer->second.lastAckedState, buf + offset, UDP_PACKET_SIZE - offset );
764    assert( n == INTSIZE );
765    offset += n;
766
767    // header informations: last recved state
768    n = Converter::intToByteArray( peer->second.lastRecvedState, buf + offset, UDP_PACKET_SIZE - offset );
769    assert( n == INTSIZE );
770    offset += n;
771
772    // now write all synchronizeables in the packet
773    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
774    {
775
776      int oldOffset = offset;
777      Synchronizeable & sync = **it;
778
779
780      // do not include synchronizeables with uninit id and syncs that don't want to be synchronized
781      if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED )
782        continue;
783
784      // if handshake not finished only sync handshake
785      if ( peer->second.handshake && sync.getLeafClassID() != CL_HANDSHAKE )
786        continue;
787
788      // if we are a server (both master and proxy servers) and this is not our handshake
789      if ( ( SharedNetworkData::getInstance()->isMasterServer() ||
790             SharedNetworkData::getInstance()->isProxyServerActive() &&  peer->second.isClient())
791             && sync.getLeafClassID() == CL_HANDSHAKE && sync.getUniqueID() != peer->second.userId )
792        continue;
793
794      /* list of synchronizeables that will never be synchronized over the network: */
795      // do not sync null parent
796      if ( sync.getLeafClassID() == CL_NULL_PARENT )
797        continue;
798
799
800      assert( sync.getLeafClassID() != 0);
801
802      assert( offset + INTSIZE <= UDP_PACKET_SIZE );
803
804      // server fakes uniqueid == 0 for handshake synchronizeable
805      if ( ( SharedNetworkData::getInstance()->isMasterServer() ||
806             SharedNetworkData::getInstance()->isProxyServerActive() &&  peer->second.isClient() ) &&
807             ( sync.getUniqueID() >= 1000 || sync.getUniqueID() <= SharedNetworkData::getInstance()->getMaxPlayer() + 1)
808             /*<= SharedNetworkData::getInstance()->getMaxPlayer() + 1*/) // plus one to handle one client more than the max to redirect it
809        n = Converter::intToByteArray( 0, buf + offset, UDP_PACKET_SIZE - offset );
810      else
811        n = Converter::intToByteArray( sync.getUniqueID(), buf + offset, UDP_PACKET_SIZE - offset );
812
813
814      assert( n == INTSIZE );
815      offset += n;
816
817      // make space for packet size
818      offset += INTSIZE;
819
820      n = sync.getStateDiff( peer->second.userId, buf + offset, UDP_PACKET_SIZE-offset, currentState, peer->second.lastAckedState, -1000 );
821      offset += n;
822
823      assert( Converter::intToByteArray( n, buf + offset - n - INTSIZE, INTSIZE ) == INTSIZE );
824
825      // check if all data bytes == 0 -> remove data and the synchronizeable from the sync process since there is no update
826      // TODO not all synchronizeables like this maybe add Synchronizeable::canRemoveZeroDiff()
827      bool allZero = true;
828      for ( int i = 0; i < n; i++ )
829      {
830         if ( buf[i+oldOffset+2*INTSIZE] != 0 )
831           allZero = false;
832      }
833      // if there is no new data in this synchronizeable reset the data offset to the last state -> dont synchronizes
834      // data that hast not changed
835      if ( allZero )
836      {
837        offset = oldOffset;
838      }
839    } // all synchronizeables written
840
841
842
843    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
844    {
845      Synchronizeable & sync = **it;
846
847      // again exclude all unwanted syncs
848      if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED)
849        continue;
850
851      sync.handleSentState( peer->second.userId, currentState, peer->second.lastAckedState );
852    }
853
854
855    assert( Converter::intToByteArray( offset, buf, INTSIZE ) == INTSIZE );
856
857    // now compress the data with the zip library
858    int compLength = 0;
859    if ( SharedNetworkData::getInstance()->isMasterServer() ||
860         SharedNetworkData::getInstance()->isProxyServerActive())
861      compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE, dictServer );
862    else
863      compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE, dictClient );
864
865    if ( compLength <= 0 )
866    {
867      PRINTF(1)("compression failed!\n");
868      continue;
869    }
870
871    assert( peer->second.socket->writePacket( compBuf, compLength ) );
872
873    if ( this->remainingBytesToWriteToDict > 0 )
874      writeToNewDict( buf, offset, true );
875
876    peer->second.connectionMonitor->processUnzippedOutgoingPacket( tick, buf, offset, currentState );
877    peer->second.connectionMonitor->processZippedOutgoingPacket( tick, compBuf, compLength, currentState );
878
879  }
880}
881
882/**
883 * handle downstream network traffic
884 */
885void NetworkStream::handleDownstream( int tick )
886{
887  int offset = 0;
888
889  int length = 0;
890  int packetLength = 0;
891  int compLength = 0;
892  int uniqueId = 0;
893  int state = 0;
894  int ackedState = 0;
895  int fromState = 0;
896  int syncDataLength = 0;
897
898  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
899  {
900
901    if ( !peer->second.socket )
902      continue;
903
904    while ( 0 < (compLength = peer->second.socket->readPacket( compBuf, UDP_PACKET_SIZE )) )
905    {
906      peer->second.connectionMonitor->processZippedIncomingPacket( tick, compBuf, compLength );
907
908      packetLength = Zip::getInstance()->unZip( compBuf, compLength, buf, UDP_PACKET_SIZE );
909
910      if ( packetLength < 4*INTSIZE )
911      {
912        if ( packetLength != 0 )
913          PRINTF(1)("got too small packet: %d\n", packetLength);
914        continue;
915      }
916
917      if ( this->remainingBytesToWriteToDict > 0 )
918        writeToNewDict( buf, packetLength, false );
919
920      assert( Converter::byteArrayToInt( buf, &length ) == INTSIZE );
921      assert( Converter::byteArrayToInt( buf + INTSIZE, &state ) == INTSIZE );
922      assert( Converter::byteArrayToInt( buf + 2*INTSIZE, &fromState ) == INTSIZE );
923      assert( Converter::byteArrayToInt( buf + 3*INTSIZE, &ackedState ) == INTSIZE );
924      offset = 4*INTSIZE;
925
926      peer->second.connectionMonitor->processUnzippedIncomingPacket( tick, buf, packetLength, state, ackedState );
927
928
929      //if this is an old state drop it
930      if ( state <= peer->second.lastRecvedState )
931        continue;
932
933      if ( packetLength != length )
934      {
935        PRINTF(1)("real packet length (%d) and transmitted packet length (%d) do not match!\n", packetLength, length);
936        peer->second.socket->disconnectServer();
937        continue;
938      }
939
940      while ( offset + 2 * INTSIZE < length )
941      {
942        // read the unique id of the sync
943        assert( offset > 0 );
944        assert( Converter::byteArrayToInt( buf + offset, &uniqueId ) == INTSIZE );
945        offset += INTSIZE;
946
947        // read the data length
948        assert( Converter::byteArrayToInt( buf + offset, &syncDataLength ) == INTSIZE );
949        offset += INTSIZE;
950
951        assert( syncDataLength > 0 );
952        assert( syncDataLength < 10000 );
953
954        Synchronizeable * sync = NULL;
955
956        // look for the synchronizeable in question
957        for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
958        {
959          // client thinks his handshake has a special id: hostId * 1000 (host id of this server)
960          if ( (*it)->getUniqueID() == uniqueId ||
961                 ( uniqueId == 0  && (*it)->getUniqueID() == peer->second.userId ) )
962          {
963            sync = *it;
964            break;
965          }
966        }
967
968        // this synchronizeable does not yet exist! create it
969        if ( sync == NULL )
970        {
971          PRINTF(0)("could not find sync with id %d. try to create it\n", uniqueId);
972
973          // if it is an old synchronizeable already removed, ignore it
974          if ( oldSynchronizeables.find( uniqueId ) != oldSynchronizeables.end() )
975          {
976            offset += syncDataLength;
977            continue;
978          }
979
980          // if the node we got this unknown sync we ignore it if:
981          //  - the remote host is a client
982          //  - the remote host is a proxy server and we are master server
983          // (since it has no rights to create a new sync)
984          if ( peers[peer->second.userId].isClient() ||
985               (peers[peer->second.userId].isProxyServerActive() && SharedNetworkData::getInstance()->isMasterServer()))
986          {
987            offset += syncDataLength;
988            continue;
989          }
990
991          int leafClassId;
992          if ( INTSIZE > length - offset )
993          {
994            offset += syncDataLength;
995            continue;
996          }
997
998          Converter::byteArrayToInt( buf + offset, &leafClassId );
999
1000          assert( leafClassId != 0 );
1001
1002
1003          BaseObject * b = NULL;
1004          /* These are some small exeptions in creation: Not all objects can/should be created via Factory */
1005          /* Exception 1: NullParent */
1006          if( leafClassId == CL_NULL_PARENT || leafClassId == CL_SYNCHRONIZEABLE || leafClassId == CL_NETWORK_GAME_MANAGER )
1007          {
1008            PRINTF(1)("Don't create Object with ID %x, ignored!\n", (int)leafClassId);
1009            offset += syncDataLength;
1010            continue;
1011          }
1012          else
1013            b = Factory::fabricate( (ClassID)leafClassId );
1014
1015          if ( !b )
1016          {
1017            PRINTF(1)("Could not fabricate Object with classID %x\n", leafClassId);
1018            offset += syncDataLength;
1019            continue;
1020          }
1021
1022          if ( b->isA(CL_SYNCHRONIZEABLE) )
1023          {
1024            sync = dynamic_cast<Synchronizeable*>(b);
1025            sync->setUniqueID( uniqueId );
1026            sync->setSynchronized(true);
1027
1028            PRINTF(0)("Fabricated %s with id %d\n", sync->getClassCName(), sync->getUniqueID());
1029          }
1030          else
1031          {
1032            PRINTF(1)("Class with ID %x is not a synchronizeable!\n", (int)leafClassId);
1033            delete b;
1034            offset += syncDataLength;
1035            continue;
1036          }
1037        }
1038
1039
1040        int n = sync->setStateDiff( peer->second.userId, buf+offset, syncDataLength, state, fromState );
1041        offset += n;
1042
1043      }
1044
1045      if ( offset != length )
1046      {
1047        PRINTF(0)("offset (%d) != length (%d)\n", offset, length);
1048        peer->second.socket->disconnectServer();
1049      }
1050
1051
1052      for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
1053      {
1054        Synchronizeable & sync = **it;
1055
1056        if ( !sync.beSynchronized() || sync.getUniqueID() <= NET_UID_UNASSIGNED )
1057          continue;
1058
1059        sync.handleRecvState( peer->second.userId, state, fromState );
1060      }
1061
1062      assert( peer->second.lastAckedState <= ackedState );
1063      peer->second.lastAckedState = ackedState;
1064
1065      assert( peer->second.lastRecvedState < state );
1066      peer->second.lastRecvedState = state;
1067
1068    }
1069
1070  }
1071
1072}
1073
1074/**
1075 * is executed when a handshake has finished
1076 */
1077void NetworkStream::handleNewClient( int userId )
1078{
1079  // init and assign the message manager
1080  MessageManager::getInstance()->initUser( userId );
1081  // do all game relevant stuff here
1082  networkGameManager->signalNewPlayer( userId );
1083}
1084
1085
1086/**
1087 * removes old items from oldSynchronizeables
1088 */
1089void NetworkStream::cleanUpOldSyncList( )
1090{
1091  int now = SDL_GetTicks();
1092
1093  for ( std::map<int,int>::iterator it = oldSynchronizeables.begin(); it != oldSynchronizeables.end();  )
1094  {
1095    if ( it->second < now - 10*1000 )
1096    {
1097      std::map<int,int>::iterator delIt = it;
1098      it++;
1099      oldSynchronizeables.erase( delIt );
1100      continue;
1101    }
1102    it++;
1103  }
1104}
1105
1106/**
1107 * writes data to DATA/dicts/newdict
1108 * @param data pointer to data
1109 * @param length length
1110 */
1111void NetworkStream::writeToNewDict( byte * data, int length, bool upstream )
1112{
1113  if ( remainingBytesToWriteToDict <= 0 )
1114    return;
1115
1116  if ( length > remainingBytesToWriteToDict )
1117    length = remainingBytesToWriteToDict;
1118
1119  std::string fileName = ResourceManager::getInstance()->getDataDir();
1120  fileName += "/dicts/newdict";
1121
1122  if ( upstream )
1123    fileName += "_upstream";
1124  else
1125    fileName += "_downstream";
1126
1127  FILE * f = fopen( fileName.c_str(), "a" );
1128
1129  if ( !f )
1130  {
1131    PRINTF(2)("could not open %s\n", fileName.c_str());
1132    remainingBytesToWriteToDict = 0;
1133    return;
1134  }
1135
1136  if ( fwrite( data, 1, length, f ) != length )
1137  {
1138    PRINTF(2)("could not write to file\n");
1139    fclose( f );
1140    return;
1141  }
1142
1143  fclose( f );
1144
1145  remainingBytesToWriteToDict -= length;
1146}
1147
1148
1149
1150
1151
1152
Note: See TracBrowser for help on using the repository browser.