Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Changeset 7954 in orxonox.OLD for trunk/src/lib/network/network_stream.cc


Ignore:
Timestamp:
May 29, 2006, 3:28:41 PM (19 years ago)
Author:
patrick
Message:

trunk: merged the network branche back to trunk.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/lib/network/network_stream.cc

    r6959 r7954  
    2323#include "base_object.h"
    2424#include "network_protocol.h"
    25 #include "network_socket.h"
     25#include "udp_socket.h"
     26#include "udp_server_socket.h"
    2627#include "connection_monitor.h"
    2728#include "synchronizeable.h"
    2829#include "network_game_manager.h"
    2930#include "shared_network_data.h"
     31#include "message_manager.h"
     32#include "preferences.h"
     33#include "zip.h"
     34
     35#include "src/lib/util/loading/resource_manager.h"
     36
     37#include "network_log.h"
     38
     39
     40#include "lib/util/loading/factory.h"
    3041
    3142#include "debug.h"
     
    4960  /* initialize the references */
    5061  this->type = NET_CLIENT;
    51   this->networkProtocol = new NetworkProtocol();
    52   this->connectionMonitor = new ConnectionMonitor();
    53 }
    54 
    55 
    56 NetworkStream::NetworkStream(IPaddress& address)
     62}
     63
     64
     65NetworkStream::NetworkStream( std::string host, int port )
    5766{
    5867  this->type = NET_CLIENT;
    5968  this->init();
    60   this->networkSockets.push_back(new NetworkSocket(address));
    61   this->networkProtocol = new NetworkProtocol();
    62   this->connectionMonitor = new ConnectionMonitor();
    63   this->maxConnections = 1;
    64 }
    65 
    66 
    67 NetworkStream::NetworkStream(unsigned int port)
     69  this->peers[0].socket = new UdpSocket( host, port );
     70  this->peers[0].userId = 0;
     71  this->peers[0].isServer = true;
     72  this->peers[0].connectionMonitor = new ConnectionMonitor( 0 );
     73}
     74
     75
     76NetworkStream::NetworkStream( int port )
    6877{
    6978  this->type = NET_SERVER;
    7079  this->init();
    71   this->serverSocket = new ServerSocket(port);
    72   this->networkProtocol = new NetworkProtocol();
    73   this->connectionMonitor = new ConnectionMonitor();
    74   this->networkSockets.push_back( NULL );
    75   this->networkSockets[0] = NULL; //TODO: remove this
    76   this->handshakes.push_back( NULL );
     80  this->serverSocket = new UdpServerSocket(port);
    7781  this->bActive = true;
    7882}
     
    8791  this->networkGameManager = NULL;
    8892  myHostId = 0;
     93  currentState = 0;
     94 
     95  remainingBytesToWriteToDict = Preferences::getInstance()->getInt( "compression", "writedict", 0 );
     96 
     97  assert( Zip::getInstance()->loadDictionary( "testdict" ) );
    8998}
    9099
     
    98107  }
    99108
    100   for (NetworkSocketVector::iterator i = networkSockets.begin(); i!=networkSockets.end(); i++)
    101   {
    102     if ( *i )
    103     {
    104       (*i)->disconnectServer();
    105       (*i)->destroy();
    106     }
    107   }
    108 
    109   for (HandshakeVector::iterator i = handshakes.begin(); i!=handshakes.end(); i++)
    110   {
    111     if ( *i )
    112     {
    113       delete (*i);
    114     }
    115   }
    116 
    117   delete connectionMonitor;
    118   delete networkProtocol;
     109  for ( PeerList::iterator i = peers.begin(); i!=peers.end(); i++)
     110  {
     111    if ( i->second.socket )
     112    {
     113      i->second.socket->disconnectServer();
     114      delete i->second.socket;
     115      i->second.socket = NULL;
     116    }
     117   
     118    if ( i->second.handshake )
     119    {
     120      delete i->second.handshake;
     121      i->second.handshake = NULL;
     122    }
     123  }
     124 
     125  if ( serverSocket )
     126  {
     127    delete serverSocket;
     128    serverSocket = NULL;
     129  }
     130
    119131}
    120132
     
    125137  // setUniqueID( maxCon+2 ) because we need one id for every handshake
    126138  // and one for handshake to reject client maxCon+1
    127   this->networkGameManager->setUniqueID( this->maxConnections + 2 );
    128   //this->connectSynchronizeable( *(this->networkGameManager) );
    129   this->setMaxConnections( 10 );
     139  this->networkGameManager->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
     140  MessageManager::getInstance()->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() );
    130141}
    131142
     
    135146  Handshake* hs = new Handshake(false);
    136147  hs->setUniqueID( 0 );
    137   this->handshakes.push_back(hs);
     148  assert( peers[0].handshake == NULL );
     149  peers[0].handshake = hs;
     150//   peers[0].handshake->setSynchronized( true );
    138151  //this->connectSynchronizeable(*hs);
    139   PRINTF(0)("NetworkStream: %s\n", hs->getName());
     152  //this->connectSynchronizeable(*hs);
     153  PRINTF(0)("NetworkStream: Handshake created: %s\n", hs->getName());
    140154}
    141155
     
    146160  sync.setNetworkStream( this );
    147161
    148   if( this->networkSockets.size()>0 )
    149     this->bActive = true;
     162  this->bActive = true;
    150163}
    151164
     
    157170  if (disconnectSynchro != this->synchronizeables.end())
    158171    this->synchronizeables.erase(disconnectSynchro);
    159 
    160   if( this->networkSockets.size()<=0 )
    161     this->bActive = false;
     172 
     173  oldSynchronizeables[sync.getUniqueID()] = SDL_GetTicks();
    162174}
    163175
     
    165177void NetworkStream::processData()
    166178{
     179  currentState++;
     180 
    167181  if ( this->type == NET_SERVER )
     182  {
     183    if ( serverSocket )
     184      serverSocket->update();
     185   
    168186    this->updateConnectionList();
     187  }
    169188  else
    170189  {
    171     if ( networkSockets[0] && !networkSockets[0]->isOk() )
     190    if ( peers[0].socket && ( !peers[0].socket->isOk() || peers[0].connectionMonitor->hasTimedOut() ) )
    172191    {
    173192      PRINTF(1)("lost connection to server\n");
    174193
    175       //delete networkSockets[i];
    176       networkSockets[0]->disconnectServer();
    177       networkSockets[0]->destroy();
    178       networkSockets[0] = NULL;
    179 
    180       if ( handshakes[0] )
    181         delete handshakes[0];
    182       handshakes[0] = NULL;
    183     }
    184   }
    185 
    186   for (int i = 0; i<handshakes.size(); i++)
    187   {
    188     if ( handshakes[i] )
    189     {
    190       if ( handshakes[i]->completed() )
     194      peers[0].socket->disconnectServer();
     195      delete peers[0].socket;
     196      peers[0].socket = NULL;
     197
     198      if ( peers[0].handshake )
     199        delete peers[0].handshake;
     200      peers[0].handshake = NULL;
     201    }
     202  }
     203
     204  cleanUpOldSyncList();
     205  handleHandshakes();
     206 
     207  // order of up/downstream is important!!!!
     208  // don't change it
     209  handleDownstream();
     210  handleUpstream();
     211
     212}
     213
     214void NetworkStream::updateConnectionList( )
     215{
     216  //check for new connections
     217
     218  NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket();
     219
     220  if ( tempNetworkSocket )
     221  {
     222    int clientId;
     223    if ( freeSocketSlots.size() >0 )
     224    {
     225      clientId = freeSocketSlots.back();
     226      freeSocketSlots.pop_back();
     227      peers[clientId].socket = tempNetworkSocket;
     228      peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID() );
     229      peers[clientId].connectionMonitor = new ConnectionMonitor( clientId );
     230      peers[clientId].handshake->setUniqueID(clientId);
     231      peers[clientId].userId = clientId;
     232      peers[clientId].isServer = false;
     233    } else
     234    {
     235      clientId = 1;
     236     
     237      for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
     238        if ( it->first >= clientId )
     239          clientId = it->first + 1;
     240     
     241      peers[clientId].socket = tempNetworkSocket;
     242      peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID());
     243      peers[clientId].handshake->setUniqueID(clientId);
     244      peers[clientId].connectionMonitor = new ConnectionMonitor( clientId );
     245      peers[clientId].userId = clientId;
     246      peers[clientId].isServer = false;
     247     
     248      PRINTF(0)("num sync: %d\n", synchronizeables.size());
     249    }
     250
     251    if ( clientId > MAX_CONNECTIONS )
     252    {
     253      peers[clientId].handshake->doReject( "too many connections" );
     254      PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId);
     255    }
     256    else
     257
     258    PRINTF(0)("New Client: %d\n", clientId);
     259
     260    //this->connectSynchronizeable(*handshakes[clientId]);
     261  }
     262
     263  //check if connections are ok else remove them
     264  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
     265  {
     266    if ( 
     267          it->second.socket &&
     268          (
     269            !it->second.socket->isOk()  ||
     270            it->second.connectionMonitor->hasTimedOut()
     271          )
     272       )
     273    {
     274      std::string reason = "disconnected";
     275      if ( it->second.connectionMonitor->hasTimedOut() )
     276        reason = "timeout";
     277      PRINTF(0)("Client is gone: %d (%s)\n", it->second.userId, reason.c_str());
     278     
     279      assert(false);
     280
     281      it->second.socket->disconnectServer();
     282      delete it->second.socket;
     283      it->second.socket = NULL;
     284
     285      if ( it->second.handshake )
     286        delete it->second.handshake;
     287      it->second.handshake = NULL;
     288     
     289      for ( SynchronizeableList::iterator it2 = synchronizeables.begin(); it2 != synchronizeables.end(); it2++ )
    191290      {
    192         if ( handshakes[i]->ok() )
     291        (*it2)->cleanUpUser( it->second.userId );
     292      }
     293
     294      NetworkGameManager::getInstance()->signalLeftPlayer(it->second.userId);
     295
     296      freeSocketSlots.push_back( it->second.userId );
     297
     298    }
     299  }
     300
     301
     302}
     303
     304void NetworkStream::debug()
     305{
     306  if( this->isServer())
     307    PRINT(0)(" Host ist Server with ID: %i\n", this->myHostId);
     308  else
     309    PRINT(0)(" Host ist Client with ID: %i\n", this->myHostId);
     310
     311  PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size());
     312  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
     313  {
     314    if( (*it)->beSynchronized() == true)
     315      PRINT(0)("  Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassName(), (*it)->getName(),
     316               (*it)->getUniqueID(), (*it)->beSynchronized());
     317  }
     318  PRINT(0)(" Maximal Connections: %i\n", MAX_CONNECTIONS );
     319
     320}
     321
     322
     323int NetworkStream::getSyncCount()
     324{
     325  int n = 0;
     326  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
     327    if( (*it)->beSynchronized() == true)
     328      ++n;
     329
     330  //return synchronizeables.size();
     331  return n;
     332}
     333
     334/**
     335 * check if handshakes completed
     336 */
     337void NetworkStream::handleHandshakes( )
     338{
     339  for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ )
     340  {
     341    if ( it->second.handshake )
     342    {
     343      if ( it->second.handshake->completed() )
     344      {
     345        if ( it->second.handshake->ok() )
    193346        {
    194           if ( type != NET_SERVER )
    195           {
    196             SharedNetworkData::getInstance()->setHostID( handshakes[i]->getHostId() );
    197             myHostId = SharedNetworkData::getInstance()->getHostID();
    198 
    199             this->networkGameManager = NetworkGameManager::getInstance();
    200             this->networkGameManager->setUniqueID( handshakes[i]->getNetworkGameManagerId() );
    201             //this->connectSynchronizeable( *(this->networkGameManager) );
     347          if ( !it->second.handshake->allowDel() )
     348          {
     349            if ( type != NET_SERVER )
     350            {
     351              SharedNetworkData::getInstance()->setHostID( it->second.handshake->getHostId() );
     352              myHostId = SharedNetworkData::getInstance()->getHostID();
     353
     354              this->networkGameManager = NetworkGameManager::getInstance();
     355              this->networkGameManager->setUniqueID( it->second.handshake->getNetworkGameManagerId() );
     356              MessageManager::getInstance()->setUniqueID( it->second.handshake->getMessageManagerId() );
     357            }
     358             
     359
     360            PRINT(0)("handshake finished id=%d\n", it->second.handshake->getNetworkGameManagerId());
     361
     362            it->second.handshake->del();
    202363          }
    203364          else
    204365          {
    205 
    206           }
    207           PRINT(0)("handshake finished id=%d\n", handshakes[i]->getNetworkGameManagerId());
    208 
    209 
    210           delete handshakes[i];
    211           handshakes[i] = NULL;
     366            if ( it->second.handshake->canDel() )
     367            {
     368              if ( type == NET_SERVER )
     369              {
     370                handleNewClient( it->second.userId );
     371              }
     372             
     373              PRINT(0)("handshake finished delete it\n");
     374              delete it->second.handshake;
     375              it->second.handshake = NULL;
     376            }
     377          }
     378
    212379        }
    213380        else
    214381        {
    215382          PRINT(1)("handshake failed!\n");
    216           networkSockets[i]->disconnectServer();
    217           delete handshakes[i];
    218           handshakes[i] = NULL;
    219           //TODO: handle error
     383          it->second.socket->disconnectServer();
    220384        }
    221385      }
    222386    }
    223387  }
    224 
    225 
    226   /* DOWNSTREAM */
    227 
    228 
    229 
    230   int dataLength;
    231   int reciever;
    232   Header header;
    233   int counter;
    234 
    235   for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    236   {
    237     counter = 0;
    238 
    239     if ( (*it)!=NULL && (*it)->beSynchronized() /*&& (*it)->getOwner() == myHostId*/ )
    240     {
    241       do {
    242         counter++;
    243 
    244         //check for endless loop
    245         if ( counter > 50 )
     388}
     389
     390/**
     391 * handle upstream network traffic
     392 */
     393void NetworkStream::handleUpstream( )
     394{
     395  int offset;
     396  int n;
     397 
     398  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
     399  {
     400    offset = INTSIZE; //make already space for length
     401   
     402    if ( !peer->second.socket )
     403      continue;
     404   
     405    n = Converter::intToByteArray( currentState, buf + offset, UDP_PACKET_SIZE - offset );
     406    assert( n == INTSIZE );
     407    offset += n;
     408   
     409    n = Converter::intToByteArray( peer->second.lastAckedState, buf + offset, UDP_PACKET_SIZE - offset );
     410    assert( n == INTSIZE );
     411    offset += n;
     412   
     413    n = Converter::intToByteArray( peer->second.lastRecvedState, buf + offset, UDP_PACKET_SIZE - offset );
     414    assert( n == INTSIZE );
     415    offset += n;
     416   
     417    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
     418    {
     419      int oldOffset = offset;
     420      Synchronizeable & sync = **it;
     421     
     422      if ( !sync.beSynchronized() || sync.getUniqueID() < 0 )
     423        continue;
     424
     425      //if handshake not finished only sync handshake
     426      if ( peer->second.handshake && sync.getLeafClassID() != CL_HANDSHAKE )
     427        continue;
     428     
     429      if ( isServer() && sync.getLeafClassID() == CL_HANDSHAKE && sync.getUniqueID() != peer->second.userId )
     430        continue;
     431     
     432      //do not sync null parent
     433      if ( sync.getLeafClassID() == CL_NULL_PARENT )
     434        continue;
     435
     436      assert( offset + INTSIZE <= UDP_PACKET_SIZE );
     437     
     438      //server fakes uniqueid=0 for handshake
     439      if ( this->isServer() && sync.getUniqueID() < MAX_CONNECTIONS - 1 )
     440        n = Converter::intToByteArray( 0, buf + offset, UDP_PACKET_SIZE - offset );
     441      else
     442        n = Converter::intToByteArray( sync.getUniqueID(), buf + offset, UDP_PACKET_SIZE - offset );
     443      assert( n == INTSIZE );
     444      offset += n;
     445     
     446      //make space for size
     447      offset += INTSIZE;
     448
     449      n = sync.getStateDiff( peer->second.userId, buf + offset, UDP_PACKET_SIZE-offset, currentState, peer->second.lastAckedState, -1000 );
     450      offset += n;
     451      //NETPRINTF(0)("GGGGGEEEEETTTTT: %s (%d) %d\n",sync.getClassName(), sync.getUniqueID(), n);
     452     
     453      assert( Converter::intToByteArray( n, buf + offset - n - INTSIZE, INTSIZE ) == INTSIZE );
     454     
     455      //check if all bytes == 0 -> remove data
     456      //TODO not all synchronizeables like this maybe add Synchronizeable::canRemoveZeroDiff()
     457      bool allZero = true;
     458      for ( int i = 0; i < n; i++ )
     459      {
     460         if ( buf[i+oldOffset+2*INTSIZE] != 0 )
     461           allZero = false;
     462      }
     463
     464      if ( allZero )
     465      {
     466        //NETPRINTF(n)("REMOVE ZERO DIFF: %s (%d)\n", sync.getClassName(), sync.getUniqueID());
     467        offset = oldOffset;
     468      }
     469
     470     
     471    }
     472   
     473    for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
     474    {
     475      Synchronizeable & sync = **it;
     476     
     477      if ( !sync.beSynchronized() || sync.getUniqueID() < 0 )
     478        continue;
     479     
     480      sync.handleSentState( peer->second.userId, currentState, peer->second.lastAckedState );
     481    }
     482   
     483    assert( Converter::intToByteArray( offset, buf, INTSIZE ) == INTSIZE );
     484   
     485    int compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE );
     486   
     487    if ( compLength < 0 )
     488    {
     489      PRINTF(1)("compression failed!\n");
     490      continue;
     491    }
     492   
     493    assert( peer->second.socket->writePacket( compBuf, compLength ) );
     494   
     495    if ( this->remainingBytesToWriteToDict > 0 )
     496      writeToNewDict( buf, offset );
     497   
     498    peer->second.connectionMonitor->processUnzippedOutgoingPacket( buf, offset, currentState );
     499    peer->second.connectionMonitor->processZippedOutgoingPacket( compBuf, compLength, currentState );
     500   
     501    //NETPRINTF(n)("send packet: %d userId = %d\n", offset, peer->second.userId);
     502  }
     503}
     504
     505/**
     506 * handle downstream network traffic
     507 */
     508void NetworkStream::handleDownstream( )
     509{
     510  int offset = 0;
     511 
     512  int length = 0;
     513  int packetLength = 0;
     514  int compLength = 0;
     515  int uniqueId = 0;
     516  int state = 0;
     517  int ackedState = 0;
     518  int fromState = 0;
     519  int syncDataLength = 0;
     520 
     521  for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ )
     522  {
     523   
     524    if ( !peer->second.socket )
     525      continue;
     526
     527    while ( 0 < (compLength = peer->second.socket->readPacket( compBuf, UDP_PACKET_SIZE )) )
     528    {
     529      //TODO tell monitor about zipped packet. because dropped packets dont count to bandwidth
     530      //PRINTF(0)("GGGGGOOOOOOOOOOTTTTTTTT: %d\n", compLength);
     531      packetLength = Zip::getInstance()->unZip( compBuf, compLength, buf, UDP_PACKET_SIZE );
     532     
     533      if ( packetLength < 4*INTSIZE )
     534      {
     535        if ( packetLength != 0 )
     536          PRINTF(1)("got too small packet: %d\n", packetLength);
     537        continue;
     538      }
     539     
     540      if ( this->remainingBytesToWriteToDict > 0 )
     541        writeToNewDict( buf, packetLength );
     542   
     543      assert( Converter::byteArrayToInt( buf, &length ) == INTSIZE );
     544      assert( Converter::byteArrayToInt( buf + INTSIZE, &state ) == INTSIZE );
     545      assert( Converter::byteArrayToInt( buf + 2*INTSIZE, &fromState ) == INTSIZE );
     546      assert( Converter::byteArrayToInt( buf + 3*INTSIZE, &ackedState ) == INTSIZE );
     547      //NETPRINTF(n)("ackedstate: %d\n", ackedState);
     548      offset = 4*INTSIZE;
     549
     550      //NETPRINTF(n)("got packet: %d, %d\n", length, packetLength);
     551   
     552    //if this is an old state drop it
     553      if ( state <= peer->second.lastRecvedState )
     554        continue;
     555   
     556      if ( packetLength != length )
     557      {
     558        PRINTF(1)("real packet length (%d) and transmitted packet length (%d) do not match!\n", packetLength, length);
     559        peer->second.socket->disconnectServer();
     560        continue;
     561      }
     562     
     563      while ( offset + 2*INTSIZE < length )
     564      {
     565        assert( offset > 0 );
     566        assert( Converter::byteArrayToInt( buf + offset, &uniqueId ) == INTSIZE );
     567        offset += INTSIZE;
     568     
     569        assert( Converter::byteArrayToInt( buf + offset, &syncDataLength ) == INTSIZE );
     570        offset += INTSIZE;
     571       
     572        assert( syncDataLength > 0 );
     573        assert( syncDataLength < 10000 );
     574     
     575        Synchronizeable * sync = NULL;
     576       
     577        for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
     578        {
     579        //                                        client thinks his handshake has id 0!!!!!
     580          if ( (*it)->getUniqueID() == uniqueId || ( uniqueId == 0 && (*it)->getUniqueID() == peer->second.userId ) )
     581          {
     582            sync = *it;
     583            break;
     584          }
     585        }
     586       
     587        if ( sync == NULL )
    246588        {
    247           PRINTF(1)("there seems to be an error in readBytes of %s\n", (*it)->getClassName());
    248           assert(false);
     589          PRINTF(0)("could not find sync with id %d. try to create it\n", uniqueId);
     590          if ( oldSynchronizeables.find( uniqueId ) != oldSynchronizeables.end() )
     591          {
     592            offset += syncDataLength;
     593            continue;
     594          }
     595         
     596          if ( !peers[peer->second.userId].isServer )
     597          {
     598            offset += syncDataLength;
     599            continue;
     600          }
     601         
     602          int leafClassId;
     603          if ( INTSIZE > length - offset )
     604          {
     605            offset += syncDataLength;
     606            continue;
     607          }
     608
     609          Converter::byteArrayToInt( buf + offset, &leafClassId );
     610         
     611          assert( leafClassId != 0 );
     612       
     613          BaseObject * b = NULL;
     614          /* These are some small exeptions in creation: Not all objects can/should be created via Factory */
     615          /* Exception 1: NullParent */
     616          if( leafClassId == CL_NULL_PARENT || leafClassId == CL_SYNCHRONIZEABLE || leafClassId == CL_NETWORK_GAME_MANAGER )
     617          {
     618            PRINTF(1)("Can not create Class with ID %x!\n", (int)leafClassId);
     619            offset += syncDataLength;
     620            continue;
     621          }
     622          else
     623            b = Factory::fabricate( (ClassID)leafClassId );
     624
     625          if ( !b )
     626          {
     627            PRINTF(1)("Could not fabricate Object with classID %x\n", leafClassId);
     628            offset += syncDataLength;
     629            continue;
     630          }
     631
     632          if ( b->isA(CL_SYNCHRONIZEABLE) )
     633          {
     634            sync = dynamic_cast<Synchronizeable*>(b);
     635            sync->setUniqueID( uniqueId );
     636            sync->setSynchronized(true);
     637 
     638            PRINTF(0)("Fabricated %s with id %d\n", sync->getClassName(), sync->getUniqueID());
     639          }
     640          else
     641          {
     642            PRINTF(1)("Class with ID %x is not a synchronizeable!\n", (int)leafClassId);
     643            delete b;
     644            offset += syncDataLength;
     645            continue;
     646          }
    249647        }
    250648
    251         reciever = 0;
    252         dataLength = (*it)->readBytes(downBuffer, DATA_STREAM_BUFFER_SIZE, &reciever);
    253 
    254         if ( dataLength<=0 ){
    255           reciever = 0;
     649        int n = sync->setStateDiff( peer->second.userId, buf+offset, syncDataLength, state, fromState );
     650        offset += n;
     651        //NETPRINTF(0)("SSSSSEEEEETTTTT: %s %d\n",sync->getClassName(), n);
     652
     653      }
     654     
     655      if ( offset != length )
     656      {
     657        PRINTF(0)("offset (%d) != length (%d)\n", offset, length);
     658        peer->second.socket->disconnectServer();
     659      }
     660     
     661      //TODO REMOVE THIS
     662      int saveOffset = offset;
     663     
     664      for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ )
     665      {
     666        Synchronizeable & sync = **it;
     667     
     668        if ( !sync.beSynchronized() || sync.getUniqueID() < 0 )
    256669          continue;
    257         }
    258 
    259         dataLength = networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE, static_cast<const Synchronizeable&>(*(*it)));
    260 
    261         Header* header = (Header*)downBuffer;
    262         if ( header->synchronizeableID < this->maxConnections+2 )
    263         {
    264           //if ( !isServer() ) PRINTF(0)("RESET UNIQUEID FROM %d TO 0 maxCon=%d\n", header->synchronizeableID, this->maxConnections);
    265           header->synchronizeableID = 0;
    266         }
    267         else
    268         {
    269           //if ( !isServer() ) PRINTF(0)("UNIQUEID=%d\n", header->synchronizeableID);
    270         }
    271 
    272         if ( dataLength<=0 )
    273           continue;
    274 
    275         if ( reciever!=0 )
    276         {
    277           if ( reciever < 0)
    278           {
    279             for ( int i = 0; i<networkSockets.size(); i++)
    280             {
    281               if ( i!=abs(reciever) && networkSockets[i] != NULL )
    282               {
    283                 PRINTF(0)("write %d bytes to socket %d uniqueid %d reciever %d\n", dataLength, i, (*it)->getUniqueID(), reciever);
    284                 networkSockets[i]->writePacket(downBuffer, dataLength);
    285               }
    286             }
    287           }
    288           else
    289           {
    290             if ( networkSockets[reciever] != NULL )
    291             {
    292               PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever);
    293               networkSockets[reciever]->writePacket(downBuffer, dataLength);
    294             }
    295             else
    296             {
    297               PRINTF(1)("networkSockets[reciever] == NULL\n");
    298             }
    299           }
    300         }
    301         else
    302         {
    303           for ( int i = 0; i<networkSockets.size(); i++)
    304           {
    305             if ( networkSockets[i] != NULL )
    306             {
    307               PRINTF(5)("write %d bytes to socket %d\n", dataLength, i);
    308               networkSockets[i]->writePacket(downBuffer, dataLength);
    309             }
    310           }
    311         }
    312 
    313       } while( reciever!=0 );
    314     }
    315   }
    316 
    317   /* UPSTREAM */
    318 
    319   for ( int i = 0; i<networkSockets.size(); i++)
    320   {
    321     if ( networkSockets[i] )
    322     {
    323       do {
    324         dataLength = networkSockets[i]->readPacket(upBuffer, DATA_STREAM_BUFFER_SIZE);
    325 
    326         if ( dataLength<=0 )
    327           continue;
    328 
    329         header = networkProtocol->extractHeader(upBuffer, dataLength);
    330         dataLength -= sizeof(header);
    331 
    332         PRINTF(5)("read %d bytes from socket uniqueID = %d\n", dataLength, header.synchronizeableID);
    333 
    334         if ( dataLength != header.length )
    335         {
    336           PRINTF(1)("packetsize in header and real packetsize do not match! %d:%d\n", dataLength, header.length);
    337           continue;
    338         }
    339 
    340         if ( header.synchronizeableID == 0 )
    341         {
    342           header.synchronizeableID = i;
    343         }
    344 
    345         for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    346         {
    347           if ( *it && (*it)->getUniqueID()==header.synchronizeableID )
    348           {
    349             if ( (*it)->writeBytes(upBuffer+sizeof(header), dataLength, i) != header.length )
    350             {
    351               PRINTF(1)("%s did not read all the data id = %d!\n", (*it)->getClassName(), (*it)->getUniqueID());
    352               break;
    353             }
    354             continue;
    355           }
    356         }
    357 
    358       } while ( dataLength>0 );
    359     }
    360   }
    361 }
    362 
    363 void NetworkStream::updateConnectionList( )
    364 {
    365   //check for new connections
    366 
    367   NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket();
    368 
    369   if ( tempNetworkSocket )
    370   {
    371     int clientId;
    372     if ( freeSocketSlots.size() >0 )
    373     {
    374       clientId = freeSocketSlots.back();
    375       freeSocketSlots.pop_back();
    376       networkSockets[clientId] = tempNetworkSocket;
    377       handshakes[clientId] = new Handshake(true, clientId, this->networkGameManager->getUniqueID());
    378       handshakes[clientId]->setUniqueID(clientId);
    379     } else
    380     {
    381       clientId = networkSockets.size();
    382       networkSockets.push_back(tempNetworkSocket);
    383       Handshake* tHs = new Handshake(true, clientId, this->networkGameManager->getUniqueID());
    384       tHs->setUniqueID(clientId);
    385       handshakes.push_back(tHs);
    386     }
    387 
    388     if ( clientId > this->maxConnections )
    389     {
    390       handshakes[clientId]->doReject();
    391       PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId);
    392     }
    393     else
    394 
    395     PRINTF(0)("New Client: %d\n", clientId);
    396 
    397     //this->connectSynchronizeable(*handshakes[clientId]);
    398   }
    399 
    400 
    401   //check if connections are ok else remove them
    402   for ( int i = 1; i<networkSockets.size(); i++)
    403   {
    404     if ( networkSockets[i] && !networkSockets[i]->isOk() )
    405     {
    406       //TODO: tell EntityManager that this player left the game
    407       PRINTF(0)("Client is gone: %d\n", i);
    408 
    409       //delete networkSockets[i];
    410       networkSockets[i]->disconnectServer();
    411       networkSockets[i]->destroy();
    412       networkSockets[i] = NULL;
    413 
    414       if ( handshakes[i] )
    415         delete handshakes[i];
    416       handshakes[i] = NULL;
    417 
    418 
    419       NetworkGameManager::getInstance()->signalLeftPlayer(i);
    420 
    421       if ( i == networkSockets.size()-1 )
    422       {
    423         networkSockets.pop_back();
    424         handshakes.pop_back();
     670     
     671        sync.handleRecvState( peer->second.userId, state, fromState );
    425672      }
    426       else
    427       {
    428         freeSocketSlots.push_back(i);
    429       }
    430     }
    431   }
    432 
    433 
    434 }
    435 
    436 void NetworkStream::setMaxConnections( int n )
    437 {
    438   if ( !this->isServer() )
    439   {
    440     PRINTF(1)("Cannot set maxConnections because I am no server.\n");
    441   }
    442   if ( this->networkSockets.size() > 1 )
    443   {
    444     PRINTF(1)("Cannot set maxConnections because there are already %d connections.\n", this->networkSockets.size());
     673     
     674      peer->second.connectionMonitor->processZippedIncomingPacket( compBuf, compLength, state, ackedState );
     675      peer->second.connectionMonitor->processUnzippedIncomingPacket( buf, offset, state, ackedState );
     676   
     677      assert( peer->second.lastAckedState <= ackedState );
     678      peer->second.lastAckedState = ackedState;
     679     
     680      assert( peer->second.lastRecvedState < state );
     681      peer->second.lastRecvedState = state;
     682     
     683      assert( saveOffset == offset );
     684     
     685    }
     686 
     687  }
     688 
     689}
     690
     691/**
     692 * is executed when a handshake has finished
     693 * @todo create playable for new user
     694 */
     695void NetworkStream::handleNewClient( int userId )
     696{
     697  MessageManager::getInstance()->initUser( userId );
     698 
     699  networkGameManager->signalNewPlayer( userId );
     700}
     701
     702/**
     703 * removes old items from oldSynchronizeables
     704 */
     705void NetworkStream::cleanUpOldSyncList( )
     706{
     707  int now = SDL_GetTicks();
     708 
     709  for ( std::map<int,int>::iterator it = oldSynchronizeables.begin(); it != oldSynchronizeables.end();  )
     710  {
     711    if ( it->second < now - 10*1000 )
     712    {
     713      std::map<int,int>::iterator delIt = it;
     714      it++;
     715      oldSynchronizeables.erase( delIt );
     716      continue;
     717    }
     718    it++;
     719  }
     720}
     721
     722/**
     723 * writes data to DATA/dicts/newdict
     724 * @param data pointer to data
     725 * @param length length
     726 */
     727void NetworkStream::writeToNewDict( byte * data, int length )
     728{
     729  if ( remainingBytesToWriteToDict <= 0 )
    445730    return;
    446   }
    447 
    448   if ( n > MAX_CONNECTIONS )
    449   {
    450     PRINTF(1)("Cannot set maxConnectiosn to %d because of hardcoded limit %d\n", n, MAX_CONNECTIONS);
     731 
     732  if ( length > remainingBytesToWriteToDict )
     733    length = remainingBytesToWriteToDict;
     734 
     735  std::string fileName = ResourceManager::getInstance()->getDataDir();
     736  fileName += "/dicts/newdict";
     737 
     738  FILE * f = fopen( fileName.c_str(), "a" );
     739 
     740  if ( !f )
     741  {
     742    PRINTF(2)("could not open %s\n", fileName.c_str());
     743    remainingBytesToWriteToDict = 0;
    451744    return;
    452745  }
    453 
    454   this->maxConnections = n;
    455   this->networkGameManager->setUniqueID( n+2 );
    456 }
    457 
    458 
    459 
    460 void NetworkStream::debug()
    461 {
    462   if( this->isServer())
    463     PRINT(0)(" Host ist Server with ID: %i\n", this->myHostId);
    464   else
    465     PRINT(0)(" Host ist Client with ID: %i\n", this->myHostId);
    466 
    467   PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size());
    468   for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    469   {
    470     if( (*it)->beSynchronized() == true)
    471       PRINT(0)("  Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassName(), (*it)->getName(),
    472                (*it)->getUniqueID(), (*it)->beSynchronized());
    473   }
    474   PRINT(0)(" Maximal Connections: %i\n", this->maxConnections);
    475 
    476 }
    477 
    478 
    479 int NetworkStream::getSyncCount()
    480 {
    481   int n = 0;
    482   for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
    483     if( (*it)->beSynchronized() == true)
    484       ++n;
    485 
    486   //return synchronizeables.size();
    487   return n;
    488 }
    489 
    490 
    491 
    492 
    493 
    494 
     746 
     747  if ( fwrite( data, 1, length, f ) != length )
     748  {
     749    PRINTF(2)("could not write to file\n");
     750    fclose( f );
     751    return;
     752  }
     753 
     754  fclose( f );
     755 
     756  remainingBytesToWriteToDict -= length; 
     757}
     758
     759
     760
     761
     762
     763
Note: See TracChangeset for help on using the changeset viewer.