Changeset 9656 in orxonox.OLD for trunk/src/lib/network/message_manager.cc
- Timestamp:
- Aug 4, 2006, 11:01:28 PM (18 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/src/lib/network/message_manager.cc
r9494 r9656 11 11 ### File Specific: 12 12 main-programmer: Christoph Renner 13 co-programmer: ... 13 co-programmer: Patrick Boenzli (patrick@orxonox.ethz.ch) 14 15 June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch) 16 July 2006: some code rearangement and integration of the proxy server mechanism (boenzlip@ee.ethz.ch) 17 July 2006: message forwarding algorithms 14 18 */ 15 19 … … 44 48 MessageManager::~MessageManager () 45 49 { 46 for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )50 for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ ) 47 51 { 48 52 for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ ) … … 59 63 } 60 64 61 messageQueue.clear();65 outgoingMessageQueue.clear(); 62 66 63 67 this->messageHandlerMap.clear(); … … 85 89 int n; 86 90 87 n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );91 n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength ); 88 92 i += n; 89 93 assert( n == INTSIZE ); 90 94 91 for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)95 for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++) 92 96 { 93 97 n = Converter::intToByteArray( *it, data + i, maxLength ); … … 96 100 } 97 101 98 messageQueue[userId].toAck.clear();99 100 n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );102 outgoingMessageQueue[userId].toAck.clear(); 103 104 n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength ); 101 105 i += n; 102 106 assert( n == INTSIZE ); 103 107 104 for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ ) 105 { 108 // write the message down, a message has this structure: 109 // | data_length | serial_number | message_type | source_id | dest_id | ...data... | 110 // 4byte 4byte 4byte 4byte 4byte data_length 111 for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ ) 112 { 113 // send data length 106 114 n = Converter::intToByteArray( it->length, data + i, maxLength ); 107 115 i += n; 108 116 assert( n == INTSIZE ); 109 117 118 // send serial number 110 119 n = Converter::intToByteArray( it->number, data + i, maxLength ); 111 120 i += n; 112 121 assert( n == INTSIZE ); 113 122 114 n = Converter::intToByteArray( it->messageId, data + i, maxLength ); 115 i += n; 116 assert( n == INTSIZE ); 117 123 // send message type 124 n = Converter::intToByteArray( it->messageType, data + i, maxLength ); 125 i += n; 126 assert( n == INTSIZE ); 127 128 // send sender id 129 n = Converter::intToByteArray( it->senderId, data + i, maxLength ); 130 i += n; 131 assert( n == INTSIZE ); 132 133 // send destination id 134 n = Converter::intToByteArray( it->destinationId, data + i, maxLength ); 135 i += n; 136 assert( n == INTSIZE ); 137 138 // send receiver type 139 n = Converter::intToByteArray( it->recieverType, data + i, maxLength ); 140 i += n; 141 assert( n == INTSIZE ); 142 143 // and copy the data 118 144 assert( i + it->length <= maxLength ); 119 145 memcpy( data + i, it->data, it->length ); … … 141 167 int nAcks; 142 168 169 143 170 assert( i + INTSIZE <= length ); 144 171 n = Converter::byteArrayToInt( data + i, &nAcks ); … … 167 194 i += n; 168 195 169 int messageLength, messageId; 170 196 int messageLength, messageType; 197 int senderId, destinationId, recieverType; 198 199 // now go through all newly received messages and assemble them 171 200 for ( int j = 0; j < nMessages; j++ ) 172 201 { 202 // read the length 173 203 assert( i + INTSIZE <= length ); 174 204 n = Converter::byteArrayToInt( data + i, &messageLength ); … … 176 206 i += n; 177 207 208 // read the serial number 178 209 assert( i + INTSIZE <= length ); 179 210 n = Converter::byteArrayToInt( data + i, &number ); … … 181 212 i += n; 182 213 183 assert( i + INTSIZE <= length ); 184 n = Converter::byteArrayToInt( data + i, &messageId ); 214 // read the message type 215 assert( i + INTSIZE <= length ); 216 n = Converter::byteArrayToInt( data + i, &messageType ); 217 assert( n == INTSIZE ); 218 i += n; 219 220 // read the sender id 221 assert( i + INTSIZE <= length ); 222 n = Converter::byteArrayToInt( data + i, &senderId ); 223 assert( n == INTSIZE ); 224 i += n; 225 226 //read the destination id 227 assert( i + INTSIZE <= length ); 228 n = Converter::byteArrayToInt( data + i, &destinationId); 229 assert( n == INTSIZE ); 230 i += n; 231 232 // read the receiver type 233 assert( i + INTSIZE <= length ); 234 n = Converter::byteArrayToInt( data + i, &recieverType); 185 235 assert( n == INTSIZE ); 186 236 i += n; 187 237 188 238 if ( number > 0 ) 189 messageQueue[userId].toAck.push_back( number ); 190 239 outgoingMessageQueue[userId].toAck.push_back( number ); 240 241 // PRINTF(0)("got message with type: %i\n", messageType); 191 242 assert( i + messageLength <= length ); 192 assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() ); 193 if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() ) 243 // make sure there is a message handler for this message type 244 assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end()); 245 246 247 if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) == 248 outgoingMessageQueue[userId].recievedMessages.end() ) 194 249 { 195 if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) ) 250 251 // find out if this message is addressed for this client too 252 if( recieverType == RT_ALL_BUT_ME && SharedNetworkData::getInstance()->getHostID() != senderId || 253 recieverType == RT_ALL_ME || 254 recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId || 255 recieverType == RT_USER && SharedNetworkData::getInstance()->getHostID() == destinationId || 256 recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() || 257 recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive()) 196 258 { 197 NetworkMessage msg; 198 199 msg.data = new byte[messageLength]; 200 memcpy( msg.data, data + i, messageLength ); 201 msg.length = messageLength; 202 msg.messageId = (MessageId)messageId; 203 msg.number = userId; 204 205 incomingMessageBuffer.push_back( msg ); 259 260 PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId); 261 // call the handler function and handle errors 262 if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength, 263 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) ) 264 { 265 // if the message is not handled correctly, bush it back to the incoming packets therefore trying it later 266 NetworkMessage msg; 267 268 msg.data = new byte[messageLength]; 269 memcpy( msg.data, data + i, messageLength ); 270 msg.length = messageLength; 271 msg.messageType = (MessageType)messageType; 272 msg.number = userId; 273 msg.senderId = senderId; 274 msg.recieverType = (RecieverType)recieverType; 275 msg.destinationId = destinationId; 276 277 incomingMessageQueue.push_back( msg ); 278 } 206 279 } 207 messageQueue[userId].recievedMessages.push_back( number ); 280 281 282 // check if the message needs to be forwarded 283 if( recieverType == RT_ALL_BUT_ME || 284 recieverType == RT_ALL_ME || 285 recieverType == RT_NOT_USER || 286 recieverType == RT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId || 287 recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive() ) 288 { 289 // forwarding the messages but only if its a proxy 290 if( SharedNetworkData::getInstance()->isProxyServerActive()) 291 { 292 PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId); 293 NetworkMessage msg; 294 295 msg.data = new byte[messageLength]; 296 memcpy( msg.data, data + i, messageLength ); 297 msg.length = messageLength; 298 msg.messageType = (MessageType)messageType; 299 msg.number = userId; 300 msg.senderId = senderId; 301 msg.destinationId = destinationId; 302 msg.recieverType = (RecieverType)recieverType; 303 304 this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.senderId = senderId, msg.destinationId, MP_HIGHBANDWIDTH); 305 } 306 } 307 308 // save the serial number for ack signaling 309 outgoingMessageQueue[userId].recievedMessages.push_back( number ); 208 310 } 311 209 312 i += messageLength; 210 313 } 211 314 212 315 213 //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected 214 for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end(); ) 215 { 216 if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) ) 316 //walk throu message queue and remove acked messages 317 for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); ) 318 { 319 if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() ) 320 { 321 std::list<NetworkMessage>::iterator delIt = it; 322 it++; 323 outgoingMessageQueue[userId].messages.erase( delIt ); 324 continue; 325 } 326 it++; 327 } 328 329 //TODO find bether way. maybe with timestamp 330 if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 ) 331 { 332 for ( int j = 0; j < (int)outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ ) 333 outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() ); 334 } 335 336 return i; 337 } 338 339 340 341 342 /** 343 * processes the message manager data, specialy check for localy generated messages 344 */ 345 void MessageManager::processData() 346 { 347 // now call the message handlers with the new message 348 for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end(); ) 349 { 350 PRINTF(0)("<<< MessageManager: got local msg with type: %i, from sender %i, to rec: %i\n", (*it).messageType, (*it).senderId, (*it).destinationId); 351 352 if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData, 353 /*it->number, */it->senderId, it->destinationId ) ) 217 354 { 218 355 std::list<NetworkMessage>::iterator delIt = it; … … 220 357 delete it->data; 221 358 it++; 222 incomingMessage Buffer.erase( delIt );359 incomingMessageQueue.erase( delIt ); 223 360 continue; 224 361 } … … 226 363 } 227 364 228 //walk throu message queue and remove acked messages 229 for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); ) 230 { 231 if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() ) 232 { 233 std::list<NetworkMessage>::iterator delIt = it; 234 it++; 235 messageQueue[userId].messages.erase( delIt ); 236 continue; 237 } 238 it++; 239 } 240 241 //TODO find bether way. maybe with timestamp 242 if ( messageQueue[userId].recievedMessages.size() > 1000 ) 243 { 244 for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ ) 245 messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() ); 246 } 247 248 return i; 249 } 365 } 366 367 368 250 369 251 370 /** … … 255 374 void MessageManager::cleanUpUser( int userId ) 256 375 { 257 if ( messageQueue.find( userId ) == messageQueue.end() )376 if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() ) 258 377 return; 259 378 260 for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )379 for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ ) 261 380 { 262 381 if ( it->data ) … … 265 384 } 266 385 267 messageQueue[userId].toAck.clear();268 269 messageQueue.erase( userId );270 } 271 272 /** 273 * registers function to handle messages with id message Id. someData is passed to callbackfuntion274 * @param message Idmessage id to handle386 outgoingMessageQueue[userId].toAck.clear(); 387 388 outgoingMessageQueue.erase( userId ); 389 } 390 391 /** 392 * registers function to handle messages with id messageType. someData is passed to callbackfuntion 393 * @param messageType message id to handle 275 394 * @param cb function pointer to callback function 276 395 * @param someData this pointer is passed to callback function without modification 277 396 * @return true on success 278 397 */ 279 bool MessageManager::registerMessageHandler( Message Id messageId, MessageCallback cb, void * someData )398 bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData ) 280 399 { 281 400 MessageHandler messageHandler; 282 401 283 402 messageHandler.cb = cb; 284 messageHandler.message Id = messageId;403 messageHandler.messageType = messageType; 285 404 messageHandler.someData = someData; 286 405 287 messageHandlerMap[message Id] = messageHandler;406 messageHandlerMap[messageType] = messageHandler; 288 407 289 408 return true; … … 297 416 { 298 417 // just do something so map creates a new entry 299 messageQueue[userId].toAck.clear(); 300 //assert( messageQueue[userId].messages.size() == 0 ); 301 } 418 outgoingMessageQueue[userId].toAck.clear(); 419 //assert( outgoingMessageQueue[userId].messages.size() == 0 ); 420 } 421 422 302 423 303 424 /** … … 308 429 * RT_NOT_USER send to all but reciever 309 430 * 310 * @param message Idmessage id431 * @param messageType message id 311 432 * @param data pointer to data 312 433 * @param dataLength length of data 313 * @param recieverType 314 * @param reciever 315 */ 316 void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority ) 317 { 318 for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ ) 319 { 434 * @param recieverType type of the receiver 435 * @param reciever the userId of the receiver if needed (depends on the ReceiverType) 436 */ 437 void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority ) 438 { 439 this->sendMessage(messageType, data, dataLength, recieverType, SharedNetworkData::getInstance()->getHostID(), reciever, messagePriority); 440 } 441 442 443 /** 444 * send a message to one or more clients as a special client 445 * recieverType: 446 * RT_ALL send to all users. reciever is ignored 447 * RT_USER send only to reciever 448 * RT_NOT_USER send to all but reciever 449 * 450 * @param messageType message id 451 * @param data pointer to data 452 * @param dataLength length of data 453 * @param recieverType type of the receiver 454 * @param sender the userId of the sender if there is need for shadowing it (eg. for msg forwarding) 455 * @param reciever the userId of the receiver if needed (depends on the ReceiverType) 456 */ 457 void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int sender, int reciever, MessagePriority messagePriority ) 458 { 459 PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever); 460 461 // go through all outgoing message queues and add the message if its appropriate 462 for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ ) 463 { 464 320 465 if ( 321 recieverType == RT_ALL_ME || 322 recieverType == RT_ALL_BUT_ME || 323 recieverType == RT_USER && it->first == reciever || 324 recieverType == RT_NOT_USER && it->first != reciever || 325 recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) || 326 recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first ) 327 ) 466 recieverType == RT_ALL_ME || 467 recieverType == RT_ALL_BUT_ME || 468 recieverType == RT_USER && it->first == reciever || 469 recieverType == RT_USER && reciever == NET_ID_MASTER_SERVER && !getNetworkStream()->isUserMasterServer( it->first ) || //(*) 470 recieverType == RT_NOT_USER && it->first != reciever || 471 recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) || 472 recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first ) 473 )// (*) special case: forward 328 474 { 329 475 NetworkMessage msg; … … 332 478 memcpy( msg.data, data, dataLength ); 333 479 msg.length = dataLength; 334 msg.messageId = messageId; 335 msg.number = newNumber++; 480 msg.messageType = messageType; 481 msg.number = this->newNumber++; 482 msg.senderId = sender; 483 msg.destinationId = reciever; 484 msg.recieverType = recieverType; 336 485 msg.priority = messagePriority; 337 486 338 487 it->second.messages.push_back( msg ); 339 488 } 340 } 341 342 if ( recieverType == RT_ALL_ME ) 489 490 491 } 492 493 494 // if the message is also for myself, handle it here 495 if ( recieverType == RT_ALL_ME || 496 recieverType == RT_USER && reciever == SharedNetworkData::getInstance()->getHostID() 497 ) 343 498 { 344 499 NetworkMessage msg; … … 347 502 memcpy( msg.data, data, dataLength ); 348 503 msg.length = dataLength; 349 msg.message Id = messageId;504 msg.messageType = messageType; 350 505 msg.number = SharedNetworkData::getInstance()->getHostID(); 506 msg.senderId = sender; 507 msg.destinationId = reciever; 508 msg.recieverType = recieverType; 351 509 msg.priority = messagePriority; 352 510 353 incomingMessageBuffer.push_back( msg );354 } 355 } 356 357 511 this->incomingMessageQueue.push_back( msg ); 512 } 513 } 514 515
Note: See TracChangeset
for help on using the changeset viewer.