Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

Changeset 9656 in orxonox.OLD for trunk/src/lib/network/message_manager.cc


Ignore:
Timestamp:
Aug 4, 2006, 11:01:28 PM (18 years ago)
Author:
bensch
Message:

orxonox/trunk: merged the proxy bache back with no conflicts

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/src/lib/network/message_manager.cc

    r9494 r9656  
    1111   ### File Specific:
    1212   main-programmer: Christoph Renner
    13    co-programmer: ...
     13   co-programmer: Patrick Boenzli (patrick@orxonox.ethz.ch)
     14
     15     June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch)
     16     July 2006: some code rearangement and integration of the proxy server mechanism (boenzlip@ee.ethz.ch)
     17     July 2006: message forwarding algorithms
    1418*/
    1519
     
    4448MessageManager::~MessageManager ()
    4549{
    46   for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
     50  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
    4751  {
    4852    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
     
    5963  }
    6064
    61   messageQueue.clear();
     65  outgoingMessageQueue.clear();
    6266
    6367  this->messageHandlerMap.clear();
     
    8589  int n;
    8690
    87   n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
     91  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
    8892  i += n;
    8993  assert( n == INTSIZE );
    9094
    91   for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
     95  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
    9296  {
    9397    n = Converter::intToByteArray( *it, data + i, maxLength );
     
    96100  }
    97101
    98   messageQueue[userId].toAck.clear();
    99 
    100   n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
     102  outgoingMessageQueue[userId].toAck.clear();
     103
     104  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
    101105  i += n;
    102106  assert( n == INTSIZE );
    103107
    104   for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
    105   {
     108  // write the message down, a message has this structure:
     109  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
     110  //      4byte        4byte            4byte         4byte      4byte     data_length
     111  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
     112  {
     113    // send data length
    106114    n = Converter::intToByteArray( it->length, data + i, maxLength );
    107115    i += n;
    108116    assert( n == INTSIZE );
    109117
     118    // send serial number
    110119    n = Converter::intToByteArray( it->number, data + i, maxLength );
    111120    i += n;
    112121    assert( n == INTSIZE );
    113122
    114     n = Converter::intToByteArray( it->messageId, data + i, maxLength );
    115     i += n;
    116     assert( n == INTSIZE );
    117 
     123    // send message type
     124    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
     125    i += n;
     126    assert( n == INTSIZE );
     127
     128    // send sender id
     129    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
     130    i += n;
     131    assert( n == INTSIZE );
     132
     133    // send destination id
     134    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
     135    i += n;
     136    assert( n == INTSIZE );
     137
     138    // send receiver type
     139    n = Converter::intToByteArray( it->recieverType, data + i, maxLength );
     140    i += n;
     141    assert( n == INTSIZE );
     142
     143    // and copy the data
    118144    assert( i + it->length <= maxLength );
    119145    memcpy( data + i, it->data, it->length );
     
    141167  int nAcks;
    142168
     169
    143170  assert( i + INTSIZE <= length );
    144171  n = Converter::byteArrayToInt( data + i, &nAcks );
     
    167194  i += n;
    168195
    169   int messageLength, messageId;
    170 
     196  int messageLength, messageType;
     197  int senderId, destinationId, recieverType;
     198
     199  // now go through all newly received messages and assemble them
    171200  for ( int j = 0; j < nMessages; j++ )
    172201  {
     202    // read the length
    173203    assert( i + INTSIZE <= length );
    174204    n = Converter::byteArrayToInt( data + i, &messageLength );
     
    176206    i += n;
    177207
     208    // read the serial number
    178209    assert( i + INTSIZE <= length );
    179210    n = Converter::byteArrayToInt( data + i, &number );
     
    181212    i += n;
    182213
    183     assert( i + INTSIZE <= length );
    184     n = Converter::byteArrayToInt( data + i, &messageId );
     214    // read the message type
     215    assert( i + INTSIZE <= length );
     216    n = Converter::byteArrayToInt( data + i, &messageType );
     217    assert( n == INTSIZE );
     218    i += n;
     219
     220    // read the sender id
     221    assert( i + INTSIZE <= length );
     222    n = Converter::byteArrayToInt( data + i, &senderId );
     223    assert( n == INTSIZE );
     224    i += n;
     225
     226    //read the destination id
     227    assert( i + INTSIZE <= length );
     228    n = Converter::byteArrayToInt( data + i, &destinationId);
     229    assert( n == INTSIZE );
     230    i += n;
     231
     232    // read the receiver type
     233    assert( i + INTSIZE <= length );
     234    n = Converter::byteArrayToInt( data + i, &recieverType);
    185235    assert( n == INTSIZE );
    186236    i += n;
    187237
    188238    if ( number > 0 )
    189       messageQueue[userId].toAck.push_back( number );
    190 
     239      outgoingMessageQueue[userId].toAck.push_back( number );
     240
     241//     PRINTF(0)("got message with type: %i\n", messageType);
    191242    assert( i + messageLength <= length );
    192     assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
    193     if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
     243    // make sure there is a message handler for this message type
     244    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end());
     245
     246
     247    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
     248         outgoingMessageQueue[userId].recievedMessages.end() )
    194249    {
    195       if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
     250
     251      // find out if this message is addressed for this client too
     252      if( recieverType == RT_ALL_BUT_ME  && SharedNetworkData::getInstance()->getHostID() != senderId ||
     253          recieverType == RT_ALL_ME ||
     254          recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId ||
     255          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() == destinationId ||
     256          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() ||
     257          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive())
    196258      {
    197         NetworkMessage msg;
    198 
    199         msg.data = new byte[messageLength];
    200         memcpy( msg.data, data + i, messageLength );
    201         msg.length = messageLength;
    202         msg.messageId = (MessageId)messageId;
    203         msg.number = userId;
    204 
    205         incomingMessageBuffer.push_back( msg );
     259
     260        PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
     261      // call the handler function and handle errors
     262        if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
     263                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
     264        {
     265        // if the message is not handled correctly, bush it back to the incoming packets therefore trying it later
     266          NetworkMessage msg;
     267
     268          msg.data = new byte[messageLength];
     269          memcpy( msg.data, data + i, messageLength );
     270          msg.length = messageLength;
     271          msg.messageType = (MessageType)messageType;
     272          msg.number = userId;
     273          msg.senderId = senderId;
     274          msg.recieverType = (RecieverType)recieverType;
     275          msg.destinationId = destinationId;
     276
     277          incomingMessageQueue.push_back( msg );
     278        }
    206279      }
    207       messageQueue[userId].recievedMessages.push_back( number );
     280
     281
     282      // check if the message needs to be forwarded
     283      if( recieverType == RT_ALL_BUT_ME ||
     284          recieverType == RT_ALL_ME ||
     285          recieverType == RT_NOT_USER ||
     286          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() != destinationId ||
     287          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive() )
     288      {
     289        // forwarding the messages but only if its a proxy
     290        if( SharedNetworkData::getInstance()->isProxyServerActive())
     291        {
     292          PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
     293          NetworkMessage msg;
     294
     295          msg.data = new byte[messageLength];
     296          memcpy( msg.data, data + i, messageLength );
     297          msg.length = messageLength;
     298          msg.messageType = (MessageType)messageType;
     299          msg.number = userId;
     300          msg.senderId = senderId;
     301          msg.destinationId = destinationId;
     302          msg.recieverType = (RecieverType)recieverType;
     303
     304          this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.senderId = senderId, msg.destinationId, MP_HIGHBANDWIDTH);
     305        }
     306      }
     307
     308      // save the serial number for ack signaling
     309      outgoingMessageQueue[userId].recievedMessages.push_back( number );
    208310    }
     311
    209312    i += messageLength;
    210313  }
    211314
    212315
    213   //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
    214   for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end();  )
    215   {
    216     if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
     316  //walk throu message queue and remove acked messages
     317  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
     318  {
     319    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
     320    {
     321      std::list<NetworkMessage>::iterator delIt = it;
     322      it++;
     323      outgoingMessageQueue[userId].messages.erase( delIt );
     324      continue;
     325    }
     326    it++;
     327  }
     328
     329  //TODO find bether way. maybe with timestamp
     330  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
     331  {
     332    for ( int j = 0; j < (int)outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
     333      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
     334  }
     335
     336  return i;
     337}
     338
     339
     340
     341
     342/**
     343 * processes the message manager data, specialy check for localy generated messages
     344 */
     345void MessageManager::processData()
     346{
     347  // now call the message handlers with the new message
     348  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
     349  {
     350    PRINTF(0)("<<< MessageManager: got local msg with type: %i, from sender %i, to rec: %i\n", (*it).messageType, (*it).senderId, (*it).destinationId);
     351
     352    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
     353                                                     /*it->number, */it->senderId, it->destinationId ) )
    217354    {
    218355      std::list<NetworkMessage>::iterator delIt = it;
     
    220357        delete it->data;
    221358      it++;
    222       incomingMessageBuffer.erase( delIt );
     359      incomingMessageQueue.erase( delIt );
    223360      continue;
    224361    }
     
    226363  }
    227364
    228   //walk throu message queue and remove acked messages
    229   for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
    230   {
    231     if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
    232     {
    233       std::list<NetworkMessage>::iterator delIt = it;
    234       it++;
    235       messageQueue[userId].messages.erase( delIt );
    236       continue;
    237     }
    238     it++;
    239   }
    240 
    241   //TODO find bether way. maybe with timestamp
    242   if ( messageQueue[userId].recievedMessages.size() > 1000 )
    243   {
    244     for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
    245       messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
    246   }
    247 
    248   return i;
    249 }
     365}
     366
     367
     368
    250369
    251370/**
     
    255374void MessageManager::cleanUpUser( int userId )
    256375{
    257   if ( messageQueue.find( userId ) == messageQueue.end() )
     376  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
    258377    return;
    259378
    260   for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
     379  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
    261380  {
    262381    if ( it->data )
     
    265384  }
    266385
    267   messageQueue[userId].toAck.clear();
    268 
    269   messageQueue.erase( userId );
    270 }
    271 
    272 /**
    273  * registers function to handle messages with id messageId. someData is passed to callbackfuntion
    274  * @param messageId message id to handle
     386  outgoingMessageQueue[userId].toAck.clear();
     387
     388  outgoingMessageQueue.erase( userId );
     389}
     390
     391/**
     392 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
     393 * @param messageType message id to handle
    275394 * @param cb function pointer to callback function
    276395 * @param someData this pointer is passed to callback function without modification
    277396 * @return true on success
    278397 */
    279 bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
     398bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
    280399{
    281400  MessageHandler messageHandler;
    282401
    283402  messageHandler.cb = cb;
    284   messageHandler.messageId = messageId;
     403  messageHandler.messageType = messageType;
    285404  messageHandler.someData = someData;
    286405
    287   messageHandlerMap[messageId] = messageHandler;
     406  messageHandlerMap[messageType] = messageHandler;
    288407
    289408  return true;
     
    297416{
    298417  // just do something so map creates a new entry
    299   messageQueue[userId].toAck.clear();
    300   //assert( messageQueue[userId].messages.size() == 0 );
    301 }
     418  outgoingMessageQueue[userId].toAck.clear();
     419  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
     420}
     421
     422
    302423
    303424/**
     
    308429 *               RT_NOT_USER send to all but reciever
    309430 *
    310  * @param messageId message id
     431 * @param messageType message id
    311432 * @param data pointer to data
    312433 * @param dataLength length of data
    313  * @param recieverType
    314  * @param reciever
    315  */
    316 void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
    317 {
    318   for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
    319   {
     434 * @param recieverType type of the receiver
     435 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
     436 */
     437void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
     438{
     439  this->sendMessage(messageType, data, dataLength, recieverType, SharedNetworkData::getInstance()->getHostID(), reciever, messagePriority);
     440}
     441
     442
     443/**
     444 * send a message to one or more clients as a special client
     445 * recieverType:
     446 *               RT_ALL send to all users. reciever is ignored
     447 *               RT_USER send only to reciever
     448 *               RT_NOT_USER send to all but reciever
     449 *
     450 * @param messageType message id
     451 * @param data pointer to data
     452 * @param dataLength length of data
     453 * @param recieverType type of the receiver
     454 * @param sender the userId of the sender if there is need for shadowing it (eg. for msg forwarding)
     455 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
     456 */
     457    void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int sender, int reciever, MessagePriority messagePriority )
     458{
     459  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
     460
     461  // go through all outgoing message queues and add the message if its appropriate
     462  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
     463  {
     464
    320465    if (
    321          recieverType == RT_ALL_ME ||
    322          recieverType == RT_ALL_BUT_ME ||
    323          recieverType == RT_USER && it->first == reciever ||
    324          recieverType == RT_NOT_USER && it->first != reciever ||
    325          recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
    326          recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
    327        )
     466         recieverType == RT_ALL_ME      ||
     467         recieverType == RT_ALL_BUT_ME  ||
     468         recieverType == RT_USER        && it->first == reciever ||
     469         recieverType == RT_USER        && reciever == NET_ID_MASTER_SERVER && !getNetworkStream()->isUserMasterServer( it->first ) ||  //(*)
     470         recieverType == RT_NOT_USER    && it->first != reciever ||
     471         recieverType == RT_SERVER      && getNetworkStream()->isUserMasterServer( it->first ) ||
     472         recieverType == RT_SERVER      && getNetworkStream()->isUserProxyServerActive( it->first )
     473       )// (*) special case: forward
    328474    {
    329475      NetworkMessage msg;
     
    332478      memcpy( msg.data, data, dataLength );
    333479      msg.length = dataLength;
    334       msg.messageId = messageId;
    335       msg.number = newNumber++;
     480      msg.messageType = messageType;
     481      msg.number = this->newNumber++;
     482      msg.senderId = sender;
     483      msg.destinationId = reciever;
     484      msg.recieverType = recieverType;
    336485      msg.priority = messagePriority;
    337486
    338487      it->second.messages.push_back( msg );
    339488    }
    340   }
    341 
    342   if ( recieverType == RT_ALL_ME )
     489
     490
     491  }
     492
     493
     494  // if the message is also for myself, handle it here
     495  if ( recieverType == RT_ALL_ME ||
     496       recieverType == RT_USER   && reciever == SharedNetworkData::getInstance()->getHostID()
     497     )
    343498  {
    344499    NetworkMessage msg;
     
    347502    memcpy( msg.data, data, dataLength );
    348503    msg.length = dataLength;
    349     msg.messageId = messageId;
     504    msg.messageType = messageType;
    350505    msg.number = SharedNetworkData::getInstance()->getHostID();
     506    msg.senderId = sender;
     507    msg.destinationId = reciever;
     508    msg.recieverType = recieverType;
    351509    msg.priority = messagePriority;
    352510
    353     incomingMessageBuffer.push_back( msg );
    354   }
    355 }
    356 
    357 
     511    this->incomingMessageQueue.push_back( msg );
     512  }
     513}
     514
     515
Note: See TracChangeset for help on using the changeset viewer.