Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Changeset 6139 in orxonox.OLD for trunk/src/lib/network/network_stream.cc


Ignore:
Timestamp:
Dec 16, 2005, 6:45:32 PM (18 years ago)
Author:
patrick
Message:

trunk: merged branche network with trunk using command: svn merge -r5999:HEAD, conflicts resolved in favor of the trunk bla

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/lib/network/network_stream.cc

    r5996 r6139  
    2626#include "connection_monitor.h"
    2727#include "synchronizeable.h"
     28#include "network_manager.h"
    2829#include "list.h"
    2930#include "debug.h"
     31#include "class_list.h"
    3032
    3133/* include your own header */
     
    4547  /* initialize the references */
    4648  this->type = NET_CLIENT;
    47   this->networkSocket = new NetworkSocket();
    4849  this->networkProtocol = new NetworkProtocol();
    49   this->synchronizeables = NULL;
    5050  this->connectionMonitor = new ConnectionMonitor();
    5151}
    5252
    53 NetworkStream::NetworkStream(IPaddress& address, NodeType type)
    54 {
    55   this->type = type;
     53NetworkStream::NetworkStream(IPaddress& address)
     54{
     55  this->type = NET_CLIENT;
    5656  this->init();
    57   this->networkSocket = new NetworkSocket(address);
     57  this->networkSockets.push_back(new NetworkSocket(address));
    5858  this->networkProtocol = new NetworkProtocol();
    59   this->synchronizeables = NULL;
    6059  this->connectionMonitor = new ConnectionMonitor();
    61 }
    62 
    63 
    64 NetworkStream::NetworkStream(unsigned int port, NodeType type)
    65 {
    66   this->type = type;
     60
     61  Handshake* hs = new Handshake(false);
     62  hs->setUniqueID( 0 );
     63  this->handshakes.push_back(hs);
     64  this->connectSynchronizeable(*hs);
     65  PRINTF(0)("NetworkStream: %s\n", hs->getName());
     66}
     67
     68
     69NetworkStream::NetworkStream(unsigned int port)
     70{
     71  this->type = NET_SERVER;
    6772  this->init();
    68   this->networkSocket = new NetworkSocket();
    69 //  this->networkSocket->listen(port);
     73  this->serverSocket = new ServerSocket(port);
    7074  this->networkProtocol = new NetworkProtocol();
    71   this->synchronizeables = NULL;
    7275  this->connectionMonitor = new ConnectionMonitor();
    73 }
    74 
    75 
    76 NetworkStream::NetworkStream(IPaddress& address, Synchronizeable& sync, NodeType type)
    77     : DataStream()
    78 {
    79   this->type = type;
    80   this->init();
    81   this->networkSocket = new NetworkSocket(address);
    82   this->networkProtocol = new NetworkProtocol();
    83   this->synchronizeables = &sync;
    84   this->connectionMonitor = new ConnectionMonitor();
     76  this->networkSockets.push_back( NULL );
     77  this->handshakes.push_back( NULL );
    8578  this->bActive = true;
    86 }
    87 
    88 
    89 NetworkStream::NetworkStream(unsigned int port, Synchronizeable& sync, NodeType type)
    90     : DataStream()
    91 {
    92   this->type = type;
    93   this->init();
    94   this->networkSocket = new NetworkSocket();
    95 //  this->networkSocket->listen(port);
    96   this->networkProtocol = new NetworkProtocol();
    97   this->synchronizeables = &sync;
    98   this->connectionMonitor = new ConnectionMonitor();
    99   this->bActive = true;
     79
     80  this->setMaxConnections( 10 );
    10081}
    10182
     
    10586  /* set the class id for the base object */
    10687  this->setClassID(CL_NETWORK_STREAM, "NetworkStream");
    107   this->state = NET_REC_HEADER;
    10888  this->bActive = false;
     89  this->serverSocket = NULL;
     90  myHostId = 0;
    10991}
    11092
     
    11294NetworkStream::~NetworkStream()
    11395{
    114 
    115   networkSocket->disconnectServer();
    116 
    117   if( this->networkSocket)
    118     delete networkSocket;
     96  if ( this->serverSocket )
     97  {
     98    serverSocket->close();
     99    delete serverSocket;
     100  }
     101
     102  for (NetworkSocketVector::iterator i = networkSockets.begin(); i!=networkSockets.end(); i++)
     103  {
     104    if ( *i )
     105    {
     106      (*i)->disconnectServer();
     107      (*i)->destroy();
     108    }
     109  }
     110
     111  for (HandshakeVector::iterator i = handshakes.begin(); i!=handshakes.end(); i++)
     112  {
     113    if ( *i )
     114    {
     115      delete (*i);
     116    }
     117  }
    119118
    120119  delete connectionMonitor;
     
    125124void NetworkStream::connectSynchronizeable(Synchronizeable& sync)
    126125{
    127   this->synchronizeables = &sync;
    128   if( this->networkSocket != NULL)
     126  this->synchronizeables.push_back(&sync);
     127  sync.setNetworkStream( this );
     128
     129  if( this->networkSockets.size()>0 )
    129130    this->bActive = true;
    130131}
    131132
     133void NetworkStream::disconnectSynchronizeable(Synchronizeable& sync)
     134{
     135  this->synchronizeables.remove(&sync);
     136
     137  if( this->networkSockets.size()<=0 )
     138    this->bActive = false;
     139}
     140
    132141
    133142void NetworkStream::processData()
    134143{
    135   int dataLength = 0;
     144  if ( this->type == NET_SERVER )
     145    this->updateConnectionList();
     146  else
     147  {
     148    if ( networkSockets[0] && !networkSockets[0]->isOk() )
     149    {
     150      PRINTF(1)("lost connection to server\n");
     151
     152      //delete networkSockets[i];
     153      networkSockets[0]->disconnectServer();
     154      networkSockets[0]->destroy();
     155      networkSockets[0] = NULL;
     156      //TODO: delete handshake from synchronizeable list so i can delete it
     157      if ( handshakes[0] )
     158        delete handshakes[0];
     159      handshakes[0] = NULL;
     160    }
     161  }
     162
     163  for (int i = 0; i<handshakes.size(); i++)
     164  {
     165    if ( handshakes[i] )
     166    {
     167      if ( handshakes[i]->completed() )
     168      {
     169        if ( handshakes[i]->ok() )
     170        {
     171          if ( type != NET_SERVER )
     172          {
     173            NetworkManager::getInstance()->setHostID( handshakes[i]->getHostId() );
     174            myHostId = NetworkManager::getInstance()->getHostID();
     175          }
     176          PRINT(0)("handshake finished\n");
     177          delete handshakes[i];
     178          handshakes[i] = NULL;
     179          //TODO: replace handshake by entitymanager
     180        }
     181        else
     182        {
     183          PRINT(1)("handshake failed!\n");
     184          networkSockets[i]->disconnectServer();
     185          delete handshakes[i];
     186          handshakes[i] = NULL;
     187          //TODO: handle error
     188        }
     189      }
     190    }
     191  }
     192
    136193
    137194  /* DOWNSTREAM */
    138   printf("\nSynchronizeable: %s\n", this->synchronizeables->getName());
    139   PRINT(0)("============= DOWNSTREAM:===============\n");
    140   /* first of all read the synchronizeable's data: */
    141   if( this->isServer())
    142     dataLength = this->synchronizeables->readBytes((byte*)downBuffer);
    143 
    144   if( dataLength > 0)
    145   {
    146     /* send the received data to connectionMonitor */
    147     this->connectionMonitor->processPacket((byte*)downBuffer, dataLength);
    148 
    149     dataLength = this->networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE,
    150                  *(this->synchronizeables), 12/* some random number (no real id)*/);
    151 
    152     /* pass the data to the network socket */
    153  //   dataLength = this->networkSocket->writeBytes((byte*)downBuffer, dataLength);
    154     /* check if there was an error */
    155     if( dataLength == -1)
    156     {
    157       PRINTF(0)("Error in writing data to the NetworkSocket\n");
    158     }
    159   }
    160 
     195
     196  int dataLength;
     197  int reciever;
     198  Header header;
     199  for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
     200  {
     201    //TODO: remove items from synchronizeables if they dont exist
     202    if ( (*it)!=NULL && (*it)->getOwner() == myHostId )
     203    {
     204      do {
     205        reciever = 0;
     206        dataLength = (*it)->readBytes(downBuffer, DATA_STREAM_BUFFER_SIZE, &reciever);
     207
     208
     209        if ( dataLength<=0 ){
     210          reciever = 0;
     211          continue;
     212        }
     213
     214        dataLength = networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE, static_cast<const Synchronizeable&>(*(*it)));
     215
     216        //FIXME: this is a hack, find a better way
     217        Header* header = (Header*)downBuffer;
     218        if ( header->synchronizeableID<100 )
     219          header->synchronizeableID = 0;
     220
     221        if ( dataLength<=0 )
     222          continue;
     223
     224        if ( reciever!=0 )
     225        {
     226          if ( networkSockets[reciever] != NULL )
     227          {
     228            PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever);
     229            networkSockets[reciever]->writePacket(downBuffer, dataLength);
     230          }
     231          else
     232          {
     233            PRINTF(1)("networkSockets[reciever] == NULL\n");
     234          }
     235        }
     236        else
     237        {
     238          for ( int i = 0; i<networkSockets.size(); i++)
     239          {
     240            if ( networkSockets[i] != NULL )
     241            {
     242              PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever);
     243              networkSockets[i]->writePacket(downBuffer, dataLength);
     244            }
     245          }
     246        }
     247
     248      } while( reciever!=0 );
     249    }
     250    else
     251    {
     252      PRINTF(0)("synchronizeables == NULL");
     253    }
     254  }
    161255
    162256  /* UPSTREAM */
    163   dataLength = 0;
    164   PRINT(0)("============== UPSTREAM:================\n");
    165   /* first of all read the next Orxonox Network Header */
    166 
    167   if( this->state == NET_REC_HEADER)
    168   {
    169 //    dataLength = this->networkSocket->readBlock((byte*)upBuffer, sizeof(Header));
    170     if( dataLength == sizeof(Header))
    171     {
    172       this->packetHeader = this->networkProtocol->extractHeader((byte*) upBuffer , dataLength);
    173       printf("NetworkStream::processData() - Got Header: Protocol %u, Version: %u, Sender: %u, Receiver: %u, Length: %u\n",
    174              this->packetHeader.protocol, this->packetHeader.version, this->packetHeader.senderID,
    175              this->packetHeader.receiverID, this->packetHeader.length);
    176       /* FIXME: what if it was no this->packetHeader? catch? eg: the protocol identifier, receiver id*/
    177 
    178       this->state = NET_REC_DATA;
    179     }
    180   }
    181   if( this->state == NET_REC_DATA)
    182   {
    183     /* now read the data */
    184 //    dataLength = this->networkSocket->readBlock((byte*)upBuffer, this->packetHeader.length);
    185     /* check if the data is available and process it if so */
    186     if( dataLength == this->packetHeader.length)
    187     {
    188       printf("NetworkStream::processData() - Got Data: %i bytes\n", dataLength);
    189       /* send the received data to connectionMonitor */
    190       this->connectionMonitor->processPacket((byte*)upBuffer, this->packetHeader.length);
    191       /* now pass the data to the sync object */
    192       if( !this->isServer())
    193         this->synchronizeables->writeBytes((byte*)upBuffer, this->packetHeader.length);
    194 
    195       this->state = NET_REC_HEADER;
    196     }
    197   }
    198 
    199 
    200 }
     257
     258  for ( int i = 0; i<networkSockets.size(); i++)
     259  {
     260    if ( networkSockets[i] )
     261    {
     262      do {
     263        dataLength = networkSockets[i]->readPacket(upBuffer, DATA_STREAM_BUFFER_SIZE);
     264
     265        if ( dataLength<=0 )
     266          continue;
     267
     268        PRINTF(5)("read %d bytes from socket\n", dataLength);
     269        header = networkProtocol->extractHeader(upBuffer, dataLength);
     270        dataLength -= sizeof(header);
     271
     272        if ( dataLength != header.length )
     273        {
     274          PRINTF(1)("packetsize in header and real packetsize do not match! %d:%d\n", dataLength, header.length);
     275          continue;
     276        }
     277
     278        if ( header.synchronizeableID == 0 )
     279        {
     280          header.synchronizeableID = i;
     281        }
     282
     283        for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++)
     284        {
     285          if ( *it && (*it)->getUniqueID()==header.synchronizeableID )
     286            (*it)->writeBytes(upBuffer+sizeof(header), dataLength);
     287        }
     288
     289      } while ( dataLength>0 );
     290    }
     291  }
     292}
     293
     294void NetworkStream::updateConnectionList( )
     295{
     296  //check for new connections
     297
     298  NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket();
     299
     300  if ( tempNetworkSocket )
     301  {
     302    int clientId;
     303    if ( freeSocketSlots.size() >0 )
     304    {
     305      clientId = freeSocketSlots.back();
     306      freeSocketSlots.pop_back();
     307      networkSockets[clientId] = tempNetworkSocket;
     308      handshakes[clientId] = new Handshake(true, clientId);
     309      handshakes[clientId]->setUniqueID(clientId);
     310    } else
     311    {
     312      clientId = networkSockets.size();
     313      networkSockets.push_back(tempNetworkSocket);
     314      Handshake* tHs = new Handshake(true, clientId);
     315      tHs->setUniqueID(clientId);
     316      handshakes.push_back(tHs);
     317    }
     318
     319    if ( clientId > this->maxConnections )
     320    {
     321      handshakes[clientId]->doReject();
     322      PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId);
     323    }
     324    else
     325
     326    PRINTF(0)("New Client: %d\n", clientId);
     327
     328    this->connectSynchronizeable(*handshakes[clientId]);
     329  }
     330
     331
     332  //check if connections are ok else remove them
     333  for ( int i = 1; i<networkSockets.size(); i++)
     334  {
     335    if ( networkSockets[i] && !networkSockets[i]->isOk() )
     336    {
     337      //TODO: tell EntityManager that this player left the game
     338      PRINTF(0)("Client is gone: %d\n", i);
     339
     340      //delete networkSockets[i];
     341      networkSockets[i]->disconnectServer();
     342      networkSockets[i]->destroy();
     343      networkSockets[i] = NULL;
     344      //TODO: delete handshake from synchronizeable list so i can delete it
     345      if ( handshakes[i] )
     346        delete handshakes[i];
     347      handshakes[i] = NULL;
     348
     349      if ( i == networkSockets.size()-1 )
     350      {
     351        networkSockets.pop_back();
     352        handshakes.pop_back();
     353      }
     354      else
     355      {
     356        freeSocketSlots.push_back(i);
     357      }
     358    }
     359  }
     360
     361
     362}
     363
     364void NetworkStream::setMaxConnections( int n )
     365{
     366  if ( !this->isServer() )
     367  {
     368    PRINTF(1)("Cannot set maxConnections because I am no server.\n");
     369  }
     370  if ( this->networkSockets.size() > 1 )
     371  {
     372    PRINTF(1)("Cannot set maxConnections because there are already %d connections.\n", this->networkSockets.size());
     373    return;
     374  }
     375  this->maxConnections = n;
     376}
     377
     378
Note: See TracChangeset for help on using the changeset viewer.