Changeset 7954 in orxonox.OLD for trunk/src/lib/network/network_stream.cc
- Timestamp:
- May 29, 2006, 3:28:41 PM (19 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/lib/network/network_stream.cc
r6959 r7954 23 23 #include "base_object.h" 24 24 #include "network_protocol.h" 25 #include "network_socket.h" 25 #include "udp_socket.h" 26 #include "udp_server_socket.h" 26 27 #include "connection_monitor.h" 27 28 #include "synchronizeable.h" 28 29 #include "network_game_manager.h" 29 30 #include "shared_network_data.h" 31 #include "message_manager.h" 32 #include "preferences.h" 33 #include "zip.h" 34 35 #include "src/lib/util/loading/resource_manager.h" 36 37 #include "network_log.h" 38 39 40 #include "lib/util/loading/factory.h" 30 41 31 42 #include "debug.h" … … 49 60 /* initialize the references */ 50 61 this->type = NET_CLIENT; 51 this->networkProtocol = new NetworkProtocol(); 52 this->connectionMonitor = new ConnectionMonitor(); 53 } 54 55 56 NetworkStream::NetworkStream(IPaddress& address) 62 } 63 64 65 NetworkStream::NetworkStream( std::string host, int port ) 57 66 { 58 67 this->type = NET_CLIENT; 59 68 this->init(); 60 this-> networkSockets.push_back(new NetworkSocket(address));61 this-> networkProtocol = new NetworkProtocol();62 this-> connectionMonitor = new ConnectionMonitor();63 this-> maxConnections = 1;64 } 65 66 67 NetworkStream::NetworkStream( unsigned int port)69 this->peers[0].socket = new UdpSocket( host, port ); 70 this->peers[0].userId = 0; 71 this->peers[0].isServer = true; 72 this->peers[0].connectionMonitor = new ConnectionMonitor( 0 ); 73 } 74 75 76 NetworkStream::NetworkStream( int port ) 68 77 { 69 78 this->type = NET_SERVER; 70 79 this->init(); 71 this->serverSocket = new ServerSocket(port); 72 this->networkProtocol = new NetworkProtocol(); 73 this->connectionMonitor = new ConnectionMonitor(); 74 this->networkSockets.push_back( NULL ); 75 this->networkSockets[0] = NULL; //TODO: remove this 76 this->handshakes.push_back( NULL ); 80 this->serverSocket = new UdpServerSocket(port); 77 81 this->bActive = true; 78 82 } … … 87 91 this->networkGameManager = NULL; 88 92 myHostId = 0; 93 currentState = 0; 94 95 remainingBytesToWriteToDict = Preferences::getInstance()->getInt( "compression", "writedict", 0 ); 96 97 assert( Zip::getInstance()->loadDictionary( "testdict" ) ); 89 98 } 90 99 … … 98 107 } 99 108 100 for (NetworkSocketVector::iterator i = networkSockets.begin(); i!=networkSockets.end(); i++) 101 { 102 if ( *i ) 103 { 104 (*i)->disconnectServer(); 105 (*i)->destroy(); 106 } 107 } 108 109 for (HandshakeVector::iterator i = handshakes.begin(); i!=handshakes.end(); i++) 110 { 111 if ( *i ) 112 { 113 delete (*i); 114 } 115 } 116 117 delete connectionMonitor; 118 delete networkProtocol; 109 for ( PeerList::iterator i = peers.begin(); i!=peers.end(); i++) 110 { 111 if ( i->second.socket ) 112 { 113 i->second.socket->disconnectServer(); 114 delete i->second.socket; 115 i->second.socket = NULL; 116 } 117 118 if ( i->second.handshake ) 119 { 120 delete i->second.handshake; 121 i->second.handshake = NULL; 122 } 123 } 124 125 if ( serverSocket ) 126 { 127 delete serverSocket; 128 serverSocket = NULL; 129 } 130 119 131 } 120 132 … … 125 137 // setUniqueID( maxCon+2 ) because we need one id for every handshake 126 138 // and one for handshake to reject client maxCon+1 127 this->networkGameManager->setUniqueID( this->maxConnections + 2 ); 128 //this->connectSynchronizeable( *(this->networkGameManager) ); 129 this->setMaxConnections( 10 ); 139 this->networkGameManager->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() ); 140 MessageManager::getInstance()->setUniqueID( SharedNetworkData::getInstance()->getNewUniqueID() ); 130 141 } 131 142 … … 135 146 Handshake* hs = new Handshake(false); 136 147 hs->setUniqueID( 0 ); 137 this->handshakes.push_back(hs); 148 assert( peers[0].handshake == NULL ); 149 peers[0].handshake = hs; 150 // peers[0].handshake->setSynchronized( true ); 138 151 //this->connectSynchronizeable(*hs); 139 PRINTF(0)("NetworkStream: %s\n", hs->getName()); 152 //this->connectSynchronizeable(*hs); 153 PRINTF(0)("NetworkStream: Handshake created: %s\n", hs->getName()); 140 154 } 141 155 … … 146 160 sync.setNetworkStream( this ); 147 161 148 if( this->networkSockets.size()>0 ) 149 this->bActive = true; 162 this->bActive = true; 150 163 } 151 164 … … 157 170 if (disconnectSynchro != this->synchronizeables.end()) 158 171 this->synchronizeables.erase(disconnectSynchro); 159 160 if( this->networkSockets.size()<=0 ) 161 this->bActive = false; 172 173 oldSynchronizeables[sync.getUniqueID()] = SDL_GetTicks(); 162 174 } 163 175 … … 165 177 void NetworkStream::processData() 166 178 { 179 currentState++; 180 167 181 if ( this->type == NET_SERVER ) 182 { 183 if ( serverSocket ) 184 serverSocket->update(); 185 168 186 this->updateConnectionList(); 187 } 169 188 else 170 189 { 171 if ( networkSockets[0] && !networkSockets[0]->isOk() )190 if ( peers[0].socket && ( !peers[0].socket->isOk() || peers[0].connectionMonitor->hasTimedOut() ) ) 172 191 { 173 192 PRINTF(1)("lost connection to server\n"); 174 193 175 //delete networkSockets[i]; 176 networkSockets[0]->disconnectServer(); 177 networkSockets[0]->destroy(); 178 networkSockets[0] = NULL; 179 180 if ( handshakes[0] ) 181 delete handshakes[0]; 182 handshakes[0] = NULL; 183 } 184 } 185 186 for (int i = 0; i<handshakes.size(); i++) 187 { 188 if ( handshakes[i] ) 189 { 190 if ( handshakes[i]->completed() ) 194 peers[0].socket->disconnectServer(); 195 delete peers[0].socket; 196 peers[0].socket = NULL; 197 198 if ( peers[0].handshake ) 199 delete peers[0].handshake; 200 peers[0].handshake = NULL; 201 } 202 } 203 204 cleanUpOldSyncList(); 205 handleHandshakes(); 206 207 // order of up/downstream is important!!!! 208 // don't change it 209 handleDownstream(); 210 handleUpstream(); 211 212 } 213 214 void NetworkStream::updateConnectionList( ) 215 { 216 //check for new connections 217 218 NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket(); 219 220 if ( tempNetworkSocket ) 221 { 222 int clientId; 223 if ( freeSocketSlots.size() >0 ) 224 { 225 clientId = freeSocketSlots.back(); 226 freeSocketSlots.pop_back(); 227 peers[clientId].socket = tempNetworkSocket; 228 peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID() ); 229 peers[clientId].connectionMonitor = new ConnectionMonitor( clientId ); 230 peers[clientId].handshake->setUniqueID(clientId); 231 peers[clientId].userId = clientId; 232 peers[clientId].isServer = false; 233 } else 234 { 235 clientId = 1; 236 237 for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ ) 238 if ( it->first >= clientId ) 239 clientId = it->first + 1; 240 241 peers[clientId].socket = tempNetworkSocket; 242 peers[clientId].handshake = new Handshake(true, clientId, this->networkGameManager->getUniqueID(), MessageManager::getInstance()->getUniqueID()); 243 peers[clientId].handshake->setUniqueID(clientId); 244 peers[clientId].connectionMonitor = new ConnectionMonitor( clientId ); 245 peers[clientId].userId = clientId; 246 peers[clientId].isServer = false; 247 248 PRINTF(0)("num sync: %d\n", synchronizeables.size()); 249 } 250 251 if ( clientId > MAX_CONNECTIONS ) 252 { 253 peers[clientId].handshake->doReject( "too many connections" ); 254 PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId); 255 } 256 else 257 258 PRINTF(0)("New Client: %d\n", clientId); 259 260 //this->connectSynchronizeable(*handshakes[clientId]); 261 } 262 263 //check if connections are ok else remove them 264 for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ ) 265 { 266 if ( 267 it->second.socket && 268 ( 269 !it->second.socket->isOk() || 270 it->second.connectionMonitor->hasTimedOut() 271 ) 272 ) 273 { 274 std::string reason = "disconnected"; 275 if ( it->second.connectionMonitor->hasTimedOut() ) 276 reason = "timeout"; 277 PRINTF(0)("Client is gone: %d (%s)\n", it->second.userId, reason.c_str()); 278 279 assert(false); 280 281 it->second.socket->disconnectServer(); 282 delete it->second.socket; 283 it->second.socket = NULL; 284 285 if ( it->second.handshake ) 286 delete it->second.handshake; 287 it->second.handshake = NULL; 288 289 for ( SynchronizeableList::iterator it2 = synchronizeables.begin(); it2 != synchronizeables.end(); it2++ ) 191 290 { 192 if ( handshakes[i]->ok() ) 291 (*it2)->cleanUpUser( it->second.userId ); 292 } 293 294 NetworkGameManager::getInstance()->signalLeftPlayer(it->second.userId); 295 296 freeSocketSlots.push_back( it->second.userId ); 297 298 } 299 } 300 301 302 } 303 304 void NetworkStream::debug() 305 { 306 if( this->isServer()) 307 PRINT(0)(" Host ist Server with ID: %i\n", this->myHostId); 308 else 309 PRINT(0)(" Host ist Client with ID: %i\n", this->myHostId); 310 311 PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size()); 312 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 313 { 314 if( (*it)->beSynchronized() == true) 315 PRINT(0)(" Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassName(), (*it)->getName(), 316 (*it)->getUniqueID(), (*it)->beSynchronized()); 317 } 318 PRINT(0)(" Maximal Connections: %i\n", MAX_CONNECTIONS ); 319 320 } 321 322 323 int NetworkStream::getSyncCount() 324 { 325 int n = 0; 326 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 327 if( (*it)->beSynchronized() == true) 328 ++n; 329 330 //return synchronizeables.size(); 331 return n; 332 } 333 334 /** 335 * check if handshakes completed 336 */ 337 void NetworkStream::handleHandshakes( ) 338 { 339 for ( PeerList::iterator it = peers.begin(); it != peers.end(); it++ ) 340 { 341 if ( it->second.handshake ) 342 { 343 if ( it->second.handshake->completed() ) 344 { 345 if ( it->second.handshake->ok() ) 193 346 { 194 if ( type != NET_SERVER ) 195 { 196 SharedNetworkData::getInstance()->setHostID( handshakes[i]->getHostId() ); 197 myHostId = SharedNetworkData::getInstance()->getHostID(); 198 199 this->networkGameManager = NetworkGameManager::getInstance(); 200 this->networkGameManager->setUniqueID( handshakes[i]->getNetworkGameManagerId() ); 201 //this->connectSynchronizeable( *(this->networkGameManager) ); 347 if ( !it->second.handshake->allowDel() ) 348 { 349 if ( type != NET_SERVER ) 350 { 351 SharedNetworkData::getInstance()->setHostID( it->second.handshake->getHostId() ); 352 myHostId = SharedNetworkData::getInstance()->getHostID(); 353 354 this->networkGameManager = NetworkGameManager::getInstance(); 355 this->networkGameManager->setUniqueID( it->second.handshake->getNetworkGameManagerId() ); 356 MessageManager::getInstance()->setUniqueID( it->second.handshake->getMessageManagerId() ); 357 } 358 359 360 PRINT(0)("handshake finished id=%d\n", it->second.handshake->getNetworkGameManagerId()); 361 362 it->second.handshake->del(); 202 363 } 203 364 else 204 365 { 205 206 } 207 PRINT(0)("handshake finished id=%d\n", handshakes[i]->getNetworkGameManagerId()); 208 209 210 delete handshakes[i]; 211 handshakes[i] = NULL; 366 if ( it->second.handshake->canDel() ) 367 { 368 if ( type == NET_SERVER ) 369 { 370 handleNewClient( it->second.userId ); 371 } 372 373 PRINT(0)("handshake finished delete it\n"); 374 delete it->second.handshake; 375 it->second.handshake = NULL; 376 } 377 } 378 212 379 } 213 380 else 214 381 { 215 382 PRINT(1)("handshake failed!\n"); 216 networkSockets[i]->disconnectServer(); 217 delete handshakes[i]; 218 handshakes[i] = NULL; 219 //TODO: handle error 383 it->second.socket->disconnectServer(); 220 384 } 221 385 } 222 386 } 223 387 } 224 225 226 /* DOWNSTREAM */ 227 228 229 230 int dataLength; 231 int reciever; 232 Header header; 233 int counter; 234 235 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 236 { 237 counter = 0; 238 239 if ( (*it)!=NULL && (*it)->beSynchronized() /*&& (*it)->getOwner() == myHostId*/ ) 240 { 241 do { 242 counter++; 243 244 //check for endless loop 245 if ( counter > 50 ) 388 } 389 390 /** 391 * handle upstream network traffic 392 */ 393 void NetworkStream::handleUpstream( ) 394 { 395 int offset; 396 int n; 397 398 for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ ) 399 { 400 offset = INTSIZE; //make already space for length 401 402 if ( !peer->second.socket ) 403 continue; 404 405 n = Converter::intToByteArray( currentState, buf + offset, UDP_PACKET_SIZE - offset ); 406 assert( n == INTSIZE ); 407 offset += n; 408 409 n = Converter::intToByteArray( peer->second.lastAckedState, buf + offset, UDP_PACKET_SIZE - offset ); 410 assert( n == INTSIZE ); 411 offset += n; 412 413 n = Converter::intToByteArray( peer->second.lastRecvedState, buf + offset, UDP_PACKET_SIZE - offset ); 414 assert( n == INTSIZE ); 415 offset += n; 416 417 for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ ) 418 { 419 int oldOffset = offset; 420 Synchronizeable & sync = **it; 421 422 if ( !sync.beSynchronized() || sync.getUniqueID() < 0 ) 423 continue; 424 425 //if handshake not finished only sync handshake 426 if ( peer->second.handshake && sync.getLeafClassID() != CL_HANDSHAKE ) 427 continue; 428 429 if ( isServer() && sync.getLeafClassID() == CL_HANDSHAKE && sync.getUniqueID() != peer->second.userId ) 430 continue; 431 432 //do not sync null parent 433 if ( sync.getLeafClassID() == CL_NULL_PARENT ) 434 continue; 435 436 assert( offset + INTSIZE <= UDP_PACKET_SIZE ); 437 438 //server fakes uniqueid=0 for handshake 439 if ( this->isServer() && sync.getUniqueID() < MAX_CONNECTIONS - 1 ) 440 n = Converter::intToByteArray( 0, buf + offset, UDP_PACKET_SIZE - offset ); 441 else 442 n = Converter::intToByteArray( sync.getUniqueID(), buf + offset, UDP_PACKET_SIZE - offset ); 443 assert( n == INTSIZE ); 444 offset += n; 445 446 //make space for size 447 offset += INTSIZE; 448 449 n = sync.getStateDiff( peer->second.userId, buf + offset, UDP_PACKET_SIZE-offset, currentState, peer->second.lastAckedState, -1000 ); 450 offset += n; 451 //NETPRINTF(0)("GGGGGEEEEETTTTT: %s (%d) %d\n",sync.getClassName(), sync.getUniqueID(), n); 452 453 assert( Converter::intToByteArray( n, buf + offset - n - INTSIZE, INTSIZE ) == INTSIZE ); 454 455 //check if all bytes == 0 -> remove data 456 //TODO not all synchronizeables like this maybe add Synchronizeable::canRemoveZeroDiff() 457 bool allZero = true; 458 for ( int i = 0; i < n; i++ ) 459 { 460 if ( buf[i+oldOffset+2*INTSIZE] != 0 ) 461 allZero = false; 462 } 463 464 if ( allZero ) 465 { 466 //NETPRINTF(n)("REMOVE ZERO DIFF: %s (%d)\n", sync.getClassName(), sync.getUniqueID()); 467 offset = oldOffset; 468 } 469 470 471 } 472 473 for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ ) 474 { 475 Synchronizeable & sync = **it; 476 477 if ( !sync.beSynchronized() || sync.getUniqueID() < 0 ) 478 continue; 479 480 sync.handleSentState( peer->second.userId, currentState, peer->second.lastAckedState ); 481 } 482 483 assert( Converter::intToByteArray( offset, buf, INTSIZE ) == INTSIZE ); 484 485 int compLength = Zip::getInstance()->zip( buf, offset, compBuf, UDP_PACKET_SIZE ); 486 487 if ( compLength < 0 ) 488 { 489 PRINTF(1)("compression failed!\n"); 490 continue; 491 } 492 493 assert( peer->second.socket->writePacket( compBuf, compLength ) ); 494 495 if ( this->remainingBytesToWriteToDict > 0 ) 496 writeToNewDict( buf, offset ); 497 498 peer->second.connectionMonitor->processUnzippedOutgoingPacket( buf, offset, currentState ); 499 peer->second.connectionMonitor->processZippedOutgoingPacket( compBuf, compLength, currentState ); 500 501 //NETPRINTF(n)("send packet: %d userId = %d\n", offset, peer->second.userId); 502 } 503 } 504 505 /** 506 * handle downstream network traffic 507 */ 508 void NetworkStream::handleDownstream( ) 509 { 510 int offset = 0; 511 512 int length = 0; 513 int packetLength = 0; 514 int compLength = 0; 515 int uniqueId = 0; 516 int state = 0; 517 int ackedState = 0; 518 int fromState = 0; 519 int syncDataLength = 0; 520 521 for ( PeerList::iterator peer = peers.begin(); peer != peers.end(); peer++ ) 522 { 523 524 if ( !peer->second.socket ) 525 continue; 526 527 while ( 0 < (compLength = peer->second.socket->readPacket( compBuf, UDP_PACKET_SIZE )) ) 528 { 529 //TODO tell monitor about zipped packet. because dropped packets dont count to bandwidth 530 //PRINTF(0)("GGGGGOOOOOOOOOOTTTTTTTT: %d\n", compLength); 531 packetLength = Zip::getInstance()->unZip( compBuf, compLength, buf, UDP_PACKET_SIZE ); 532 533 if ( packetLength < 4*INTSIZE ) 534 { 535 if ( packetLength != 0 ) 536 PRINTF(1)("got too small packet: %d\n", packetLength); 537 continue; 538 } 539 540 if ( this->remainingBytesToWriteToDict > 0 ) 541 writeToNewDict( buf, packetLength ); 542 543 assert( Converter::byteArrayToInt( buf, &length ) == INTSIZE ); 544 assert( Converter::byteArrayToInt( buf + INTSIZE, &state ) == INTSIZE ); 545 assert( Converter::byteArrayToInt( buf + 2*INTSIZE, &fromState ) == INTSIZE ); 546 assert( Converter::byteArrayToInt( buf + 3*INTSIZE, &ackedState ) == INTSIZE ); 547 //NETPRINTF(n)("ackedstate: %d\n", ackedState); 548 offset = 4*INTSIZE; 549 550 //NETPRINTF(n)("got packet: %d, %d\n", length, packetLength); 551 552 //if this is an old state drop it 553 if ( state <= peer->second.lastRecvedState ) 554 continue; 555 556 if ( packetLength != length ) 557 { 558 PRINTF(1)("real packet length (%d) and transmitted packet length (%d) do not match!\n", packetLength, length); 559 peer->second.socket->disconnectServer(); 560 continue; 561 } 562 563 while ( offset + 2*INTSIZE < length ) 564 { 565 assert( offset > 0 ); 566 assert( Converter::byteArrayToInt( buf + offset, &uniqueId ) == INTSIZE ); 567 offset += INTSIZE; 568 569 assert( Converter::byteArrayToInt( buf + offset, &syncDataLength ) == INTSIZE ); 570 offset += INTSIZE; 571 572 assert( syncDataLength > 0 ); 573 assert( syncDataLength < 10000 ); 574 575 Synchronizeable * sync = NULL; 576 577 for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ ) 578 { 579 // client thinks his handshake has id 0!!!!! 580 if ( (*it)->getUniqueID() == uniqueId || ( uniqueId == 0 && (*it)->getUniqueID() == peer->second.userId ) ) 581 { 582 sync = *it; 583 break; 584 } 585 } 586 587 if ( sync == NULL ) 246 588 { 247 PRINTF(1)("there seems to be an error in readBytes of %s\n", (*it)->getClassName()); 248 assert(false); 589 PRINTF(0)("could not find sync with id %d. try to create it\n", uniqueId); 590 if ( oldSynchronizeables.find( uniqueId ) != oldSynchronizeables.end() ) 591 { 592 offset += syncDataLength; 593 continue; 594 } 595 596 if ( !peers[peer->second.userId].isServer ) 597 { 598 offset += syncDataLength; 599 continue; 600 } 601 602 int leafClassId; 603 if ( INTSIZE > length - offset ) 604 { 605 offset += syncDataLength; 606 continue; 607 } 608 609 Converter::byteArrayToInt( buf + offset, &leafClassId ); 610 611 assert( leafClassId != 0 ); 612 613 BaseObject * b = NULL; 614 /* These are some small exeptions in creation: Not all objects can/should be created via Factory */ 615 /* Exception 1: NullParent */ 616 if( leafClassId == CL_NULL_PARENT || leafClassId == CL_SYNCHRONIZEABLE || leafClassId == CL_NETWORK_GAME_MANAGER ) 617 { 618 PRINTF(1)("Can not create Class with ID %x!\n", (int)leafClassId); 619 offset += syncDataLength; 620 continue; 621 } 622 else 623 b = Factory::fabricate( (ClassID)leafClassId ); 624 625 if ( !b ) 626 { 627 PRINTF(1)("Could not fabricate Object with classID %x\n", leafClassId); 628 offset += syncDataLength; 629 continue; 630 } 631 632 if ( b->isA(CL_SYNCHRONIZEABLE) ) 633 { 634 sync = dynamic_cast<Synchronizeable*>(b); 635 sync->setUniqueID( uniqueId ); 636 sync->setSynchronized(true); 637 638 PRINTF(0)("Fabricated %s with id %d\n", sync->getClassName(), sync->getUniqueID()); 639 } 640 else 641 { 642 PRINTF(1)("Class with ID %x is not a synchronizeable!\n", (int)leafClassId); 643 delete b; 644 offset += syncDataLength; 645 continue; 646 } 249 647 } 250 648 251 reciever = 0; 252 dataLength = (*it)->readBytes(downBuffer, DATA_STREAM_BUFFER_SIZE, &reciever); 253 254 if ( dataLength<=0 ){ 255 reciever = 0; 649 int n = sync->setStateDiff( peer->second.userId, buf+offset, syncDataLength, state, fromState ); 650 offset += n; 651 //NETPRINTF(0)("SSSSSEEEEETTTTT: %s %d\n",sync->getClassName(), n); 652 653 } 654 655 if ( offset != length ) 656 { 657 PRINTF(0)("offset (%d) != length (%d)\n", offset, length); 658 peer->second.socket->disconnectServer(); 659 } 660 661 //TODO REMOVE THIS 662 int saveOffset = offset; 663 664 for ( SynchronizeableList::iterator it = synchronizeables.begin(); it != synchronizeables.end(); it++ ) 665 { 666 Synchronizeable & sync = **it; 667 668 if ( !sync.beSynchronized() || sync.getUniqueID() < 0 ) 256 669 continue; 257 } 258 259 dataLength = networkProtocol->createHeader((byte*)downBuffer, dataLength, DATA_STREAM_BUFFER_SIZE, static_cast<const Synchronizeable&>(*(*it))); 260 261 Header* header = (Header*)downBuffer; 262 if ( header->synchronizeableID < this->maxConnections+2 ) 263 { 264 //if ( !isServer() ) PRINTF(0)("RESET UNIQUEID FROM %d TO 0 maxCon=%d\n", header->synchronizeableID, this->maxConnections); 265 header->synchronizeableID = 0; 266 } 267 else 268 { 269 //if ( !isServer() ) PRINTF(0)("UNIQUEID=%d\n", header->synchronizeableID); 270 } 271 272 if ( dataLength<=0 ) 273 continue; 274 275 if ( reciever!=0 ) 276 { 277 if ( reciever < 0) 278 { 279 for ( int i = 0; i<networkSockets.size(); i++) 280 { 281 if ( i!=abs(reciever) && networkSockets[i] != NULL ) 282 { 283 PRINTF(0)("write %d bytes to socket %d uniqueid %d reciever %d\n", dataLength, i, (*it)->getUniqueID(), reciever); 284 networkSockets[i]->writePacket(downBuffer, dataLength); 285 } 286 } 287 } 288 else 289 { 290 if ( networkSockets[reciever] != NULL ) 291 { 292 PRINTF(5)("write %d bytes to socket %d\n", dataLength, reciever); 293 networkSockets[reciever]->writePacket(downBuffer, dataLength); 294 } 295 else 296 { 297 PRINTF(1)("networkSockets[reciever] == NULL\n"); 298 } 299 } 300 } 301 else 302 { 303 for ( int i = 0; i<networkSockets.size(); i++) 304 { 305 if ( networkSockets[i] != NULL ) 306 { 307 PRINTF(5)("write %d bytes to socket %d\n", dataLength, i); 308 networkSockets[i]->writePacket(downBuffer, dataLength); 309 } 310 } 311 } 312 313 } while( reciever!=0 ); 314 } 315 } 316 317 /* UPSTREAM */ 318 319 for ( int i = 0; i<networkSockets.size(); i++) 320 { 321 if ( networkSockets[i] ) 322 { 323 do { 324 dataLength = networkSockets[i]->readPacket(upBuffer, DATA_STREAM_BUFFER_SIZE); 325 326 if ( dataLength<=0 ) 327 continue; 328 329 header = networkProtocol->extractHeader(upBuffer, dataLength); 330 dataLength -= sizeof(header); 331 332 PRINTF(5)("read %d bytes from socket uniqueID = %d\n", dataLength, header.synchronizeableID); 333 334 if ( dataLength != header.length ) 335 { 336 PRINTF(1)("packetsize in header and real packetsize do not match! %d:%d\n", dataLength, header.length); 337 continue; 338 } 339 340 if ( header.synchronizeableID == 0 ) 341 { 342 header.synchronizeableID = i; 343 } 344 345 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 346 { 347 if ( *it && (*it)->getUniqueID()==header.synchronizeableID ) 348 { 349 if ( (*it)->writeBytes(upBuffer+sizeof(header), dataLength, i) != header.length ) 350 { 351 PRINTF(1)("%s did not read all the data id = %d!\n", (*it)->getClassName(), (*it)->getUniqueID()); 352 break; 353 } 354 continue; 355 } 356 } 357 358 } while ( dataLength>0 ); 359 } 360 } 361 } 362 363 void NetworkStream::updateConnectionList( ) 364 { 365 //check for new connections 366 367 NetworkSocket* tempNetworkSocket = serverSocket->getNewSocket(); 368 369 if ( tempNetworkSocket ) 370 { 371 int clientId; 372 if ( freeSocketSlots.size() >0 ) 373 { 374 clientId = freeSocketSlots.back(); 375 freeSocketSlots.pop_back(); 376 networkSockets[clientId] = tempNetworkSocket; 377 handshakes[clientId] = new Handshake(true, clientId, this->networkGameManager->getUniqueID()); 378 handshakes[clientId]->setUniqueID(clientId); 379 } else 380 { 381 clientId = networkSockets.size(); 382 networkSockets.push_back(tempNetworkSocket); 383 Handshake* tHs = new Handshake(true, clientId, this->networkGameManager->getUniqueID()); 384 tHs->setUniqueID(clientId); 385 handshakes.push_back(tHs); 386 } 387 388 if ( clientId > this->maxConnections ) 389 { 390 handshakes[clientId]->doReject(); 391 PRINTF(0)("Will reject client %d because there are to many connections!\n", clientId); 392 } 393 else 394 395 PRINTF(0)("New Client: %d\n", clientId); 396 397 //this->connectSynchronizeable(*handshakes[clientId]); 398 } 399 400 401 //check if connections are ok else remove them 402 for ( int i = 1; i<networkSockets.size(); i++) 403 { 404 if ( networkSockets[i] && !networkSockets[i]->isOk() ) 405 { 406 //TODO: tell EntityManager that this player left the game 407 PRINTF(0)("Client is gone: %d\n", i); 408 409 //delete networkSockets[i]; 410 networkSockets[i]->disconnectServer(); 411 networkSockets[i]->destroy(); 412 networkSockets[i] = NULL; 413 414 if ( handshakes[i] ) 415 delete handshakes[i]; 416 handshakes[i] = NULL; 417 418 419 NetworkGameManager::getInstance()->signalLeftPlayer(i); 420 421 if ( i == networkSockets.size()-1 ) 422 { 423 networkSockets.pop_back(); 424 handshakes.pop_back(); 670 671 sync.handleRecvState( peer->second.userId, state, fromState ); 425 672 } 426 else 427 { 428 freeSocketSlots.push_back(i); 429 } 430 } 431 } 432 433 434 } 435 436 void NetworkStream::setMaxConnections( int n ) 437 { 438 if ( !this->isServer() ) 439 { 440 PRINTF(1)("Cannot set maxConnections because I am no server.\n"); 441 } 442 if ( this->networkSockets.size() > 1 ) 443 { 444 PRINTF(1)("Cannot set maxConnections because there are already %d connections.\n", this->networkSockets.size()); 673 674 peer->second.connectionMonitor->processZippedIncomingPacket( compBuf, compLength, state, ackedState ); 675 peer->second.connectionMonitor->processUnzippedIncomingPacket( buf, offset, state, ackedState ); 676 677 assert( peer->second.lastAckedState <= ackedState ); 678 peer->second.lastAckedState = ackedState; 679 680 assert( peer->second.lastRecvedState < state ); 681 peer->second.lastRecvedState = state; 682 683 assert( saveOffset == offset ); 684 685 } 686 687 } 688 689 } 690 691 /** 692 * is executed when a handshake has finished 693 * @todo create playable for new user 694 */ 695 void NetworkStream::handleNewClient( int userId ) 696 { 697 MessageManager::getInstance()->initUser( userId ); 698 699 networkGameManager->signalNewPlayer( userId ); 700 } 701 702 /** 703 * removes old items from oldSynchronizeables 704 */ 705 void NetworkStream::cleanUpOldSyncList( ) 706 { 707 int now = SDL_GetTicks(); 708 709 for ( std::map<int,int>::iterator it = oldSynchronizeables.begin(); it != oldSynchronizeables.end(); ) 710 { 711 if ( it->second < now - 10*1000 ) 712 { 713 std::map<int,int>::iterator delIt = it; 714 it++; 715 oldSynchronizeables.erase( delIt ); 716 continue; 717 } 718 it++; 719 } 720 } 721 722 /** 723 * writes data to DATA/dicts/newdict 724 * @param data pointer to data 725 * @param length length 726 */ 727 void NetworkStream::writeToNewDict( byte * data, int length ) 728 { 729 if ( remainingBytesToWriteToDict <= 0 ) 445 730 return; 446 } 447 448 if ( n > MAX_CONNECTIONS ) 449 { 450 PRINTF(1)("Cannot set maxConnectiosn to %d because of hardcoded limit %d\n", n, MAX_CONNECTIONS); 731 732 if ( length > remainingBytesToWriteToDict ) 733 length = remainingBytesToWriteToDict; 734 735 std::string fileName = ResourceManager::getInstance()->getDataDir(); 736 fileName += "/dicts/newdict"; 737 738 FILE * f = fopen( fileName.c_str(), "a" ); 739 740 if ( !f ) 741 { 742 PRINTF(2)("could not open %s\n", fileName.c_str()); 743 remainingBytesToWriteToDict = 0; 451 744 return; 452 745 } 453 454 this->maxConnections = n; 455 this->networkGameManager->setUniqueID( n+2 ); 456 } 457 458 459 460 void NetworkStream::debug() 461 { 462 if( this->isServer()) 463 PRINT(0)(" Host ist Server with ID: %i\n", this->myHostId); 464 else 465 PRINT(0)(" Host ist Client with ID: %i\n", this->myHostId); 466 467 PRINT(0)(" Got %i connected Synchronizeables, showing active Syncs:\n", this->synchronizeables.size()); 468 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 469 { 470 if( (*it)->beSynchronized() == true) 471 PRINT(0)(" Synchronizeable of class: %s::%s, with unique ID: %i, Synchronize: %i\n", (*it)->getClassName(), (*it)->getName(), 472 (*it)->getUniqueID(), (*it)->beSynchronized()); 473 } 474 PRINT(0)(" Maximal Connections: %i\n", this->maxConnections); 475 476 } 477 478 479 int NetworkStream::getSyncCount() 480 { 481 int n = 0; 482 for (SynchronizeableList::iterator it = synchronizeables.begin(); it!=synchronizeables.end(); it++) 483 if( (*it)->beSynchronized() == true) 484 ++n; 485 486 //return synchronizeables.size(); 487 return n; 488 } 489 490 491 492 493 494 746 747 if ( fwrite( data, 1, length, f ) != length ) 748 { 749 PRINTF(2)("could not write to file\n"); 750 fclose( f ); 751 return; 752 } 753 754 fclose( f ); 755 756 remainingBytesToWriteToDict -= length; 757 } 758 759 760 761 762 763
Note: See TracChangeset
for help on using the changeset viewer.