/* orxonox - the future of 3D-vertical-scrollers Copyright (C) 2004 orx This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2, or (at your option) any later version. ### File Specific: main-programmer: Christoph Renner co-programmer: ... */ //#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_ #include "message_manager.h" #include "network_stream.h" #include "shared_network_data.h" #include "converter.h" #include MessageManager* MessageManager::singletonRef = NULL; /** * standard constructor */ MessageManager::MessageManager () { this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" ); newNumber = 1; setSynchronized( true ); } /** * standard deconstructor */ MessageManager::~MessageManager () { for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ ) { for ( std::list::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ ) { if ( it2->data ) { delete [] it2->data; it2->data = NULL; } } it->second.messages.clear(); it->second.toAck.clear(); } outgoingMessageQueue.clear(); this->messageHandlerMap.clear(); MessageManager::singletonRef = NULL; } /** * get the diff to last acked state of userId * * this class does not use the normal SynchronizeableVars for zynchronisation. instead * it defines its own protocol * * @param userId user to create diff for * @param data buffer to copy diff in * @param maxLength max bytes to copy into data * @param stateId id of current state * @param fromStateId the reference state for the delta state * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH * @return n bytes copied into data */ int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH ) { int i = 0; int n; n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength ); i += n; assert( n == INTSIZE ); for ( std::list::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++) { n = Converter::intToByteArray( *it, data + i, maxLength ); i += n; assert( n == INTSIZE ); } outgoingMessageQueue[userId].toAck.clear(); n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength ); i += n; assert( n == INTSIZE ); // write the message down, a message has this structure: // | data_length | serial_number | message_type | source_id | dest_id | ...data... | // 4byte 4byte 4byte 4byte 4byte data_length for ( std::list::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ ) { // send data length n = Converter::intToByteArray( it->length, data + i, maxLength ); i += n; assert( n == INTSIZE ); // send serial number n = Converter::intToByteArray( it->number, data + i, maxLength ); i += n; assert( n == INTSIZE ); // send message type n = Converter::intToByteArray( it->messageType, data + i, maxLength ); i += n; assert( n == INTSIZE ); // send sender id n = Converter::intToByteArray( it->senderId, data + i, maxLength ); i += n; assert( n == INTSIZE ); // send destination id n = Converter::intToByteArray( it->destinationId, data + i, maxLength ); i += n; assert( n == INTSIZE ); // send receiver type n = Converter::intToByteArray( it->recieverType, data + i, maxLength ); i += n; assert( n == INTSIZE ); // and copy the data assert( i + it->length <= maxLength ); memcpy( data + i, it->data, it->length ); i += it->length; } return i; } /** * sets a new state out of a diff created on another host * @param userId hostId of user who send me that diff * @param data pointer to diff * @param length length of diff * @param stateId id of current state * @param fromStateId id of the base state id * @return number bytes read * @todo check for permissions */ int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId ) { int i = 0; int n; int nAcks; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &nAcks ); assert( n == INTSIZE ); i += n; std::list acks; int number; for ( int j = 0; j < nAcks; j++ ) { assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &number ); assert( n == INTSIZE ); i += n; acks.push_back( number ); } int nMessages; assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &nMessages ); assert( n == INTSIZE ); i += n; int messageLength, messageType; int senderId, destinationId, recieverType; // now go through all newly received messages and assemble them for ( int j = 0; j < nMessages; j++ ) { // read the length assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &messageLength ); assert( n == INTSIZE ); i += n; // read the serial number assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &number ); assert( n == INTSIZE ); i += n; // read the message type assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &messageType ); assert( n == INTSIZE ); i += n; // read the sender id assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &senderId ); assert( n == INTSIZE ); i += n; //read the destination id assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &destinationId); assert( n == INTSIZE ); i += n; // read the receiver type assert( i + INTSIZE <= length ); n = Converter::byteArrayToInt( data + i, &recieverType); assert( n == INTSIZE ); i += n; if ( number > 0 ) outgoingMessageQueue[userId].toAck.push_back( number ); assert( i + messageLength <= length ); // make sure there is a message handler for this message type assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() ); if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) == outgoingMessageQueue[userId].recievedMessages.end() ) { // find out if this message is addressed for this client too if( recieverType == RT_ALL_BUT_ME && SharedNetworkData::getInstance()->getHostID() != destinationId || recieverType == RT_ALL_ME || recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId || recieverType == RT_USER && SharedNetworkData::getInstance()->getHostID() == destinationId || recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() || recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive()) { PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId); // call the handler function and handle errors if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength, messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) ) { // if the message is not handled correctly, bush it back to the incoming packets NetworkMessage msg; msg.data = new byte[messageLength]; memcpy( msg.data, data + i, messageLength ); msg.length = messageLength; msg.messageType = (MessageType)messageType; msg.number = userId; msg.senderId = senderId; msg.recieverType = (RecieverType)recieverType; msg.destinationId = destinationId; incomingMessageQueue.push_back( msg ); } } // or else forward the message to the other servers else { // forwarding the messages but only if its a proxy if( /*!SharedNetworkData::getInstance()->isClient()*/ SharedNetworkData::getInstance()->isProxyServerActive()) { PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId); NetworkMessage msg; msg.data = new byte[messageLength]; memcpy( msg.data, data + i, messageLength ); msg.length = messageLength; msg.messageType = (MessageType)messageType; msg.number = userId; msg.senderId = senderId; msg.destinationId = destinationId; msg.recieverType = (RecieverType)recieverType; this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.destinationId, MP_HIGHBANDWIDTH); } } // save the serial number for ack signaling outgoingMessageQueue[userId].recievedMessages.push_back( number ); } i += messageLength; } // now call the message handlers with the new message //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected for ( std::list::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end(); ) { if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData, /*it->number, */it->senderId, it->destinationId ) ) { std::list::iterator delIt = it; if ( it->data ) delete it->data; it++; incomingMessageQueue.erase( delIt ); continue; } it++; } //walk throu message queue and remove acked messages for ( std::list::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); ) { if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() ) { std::list::iterator delIt = it; it++; outgoingMessageQueue[userId].messages.erase( delIt ); continue; } it++; } //TODO find bether way. maybe with timestamp if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 ) { for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ ) outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() ); } return i; } /** * clean up memory reserved for user * @param userId userid */ void MessageManager::cleanUpUser( int userId ) { if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() ) return; for ( std::list::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ ) { if ( it->data ) delete it->data; it->data = NULL; } outgoingMessageQueue[userId].toAck.clear(); outgoingMessageQueue.erase( userId ); } /** * registers function to handle messages with id messageType. someData is passed to callbackfuntion * @param messageType message id to handle * @param cb function pointer to callback function * @param someData this pointer is passed to callback function without modification * @return true on success */ bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData ) { MessageHandler messageHandler; messageHandler.cb = cb; messageHandler.messageType = messageType; messageHandler.someData = someData; messageHandlerMap[messageType] = messageHandler; return true; } /** * initializes buffers for user * @param userId userId */ void MessageManager::initUser( int userId ) { // just do something so map creates a new entry outgoingMessageQueue[userId].toAck.clear(); //assert( outgoingMessageQueue[userId].messages.size() == 0 ); } /** * send a message to one or more clients * recieverType: * RT_ALL send to all users. reciever is ignored * RT_USER send only to reciever * RT_NOT_USER send to all but reciever * * @param messageType message id * @param data pointer to data * @param dataLength length of data * @param recieverType type of the receiver * @param reciever the userId of the receiver if needed (depends on the ReceiverType) */ void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority ) { PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever); // go through all outgoing message queues and add the message if its appropriate for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ ) { // relay server special handling // if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first)) // { // NetworkMessage msg; // // msg.data = new byte[dataLength]; // memcpy( msg.data, data, dataLength ); // msg.length = dataLength; // msg.messageType = messageType; // msg.number = this->newNumber++; // msg.senderId = SharedNetworkData::getInstance()->getHostID(); // msg.destinationId = reciever; // msg.recieverType = recieverType; // msg.priority = messagePriority; // // it->second.messages.push_back( msg ); // } // // proxy server to master server // else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first)) // { // NetworkMessage msg; // // msg.data = new byte[dataLength]; // memcpy( msg.data, data, dataLength ); // msg.length = dataLength; // msg.messageType = messageType; // msg.number = this->newNumber++; // msg.senderId = SharedNetworkData::getInstance()->getHostID(); // msg.destinationId = reciever; // msg.recieverType = recieverType; // msg.priority = messagePriority; // // it->second.messages.push_back( msg ); // } // check for every node if the message is for it also /*else*/ if ( recieverType == RT_ALL_ME || recieverType == RT_ALL_BUT_ME || recieverType == RT_USER && it->first == reciever || recieverType == RT_NOT_USER && it->first != reciever || recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) || recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first ) ) { NetworkMessage msg; msg.data = new byte[dataLength]; memcpy( msg.data, data, dataLength ); msg.length = dataLength; msg.messageType = messageType; msg.number = this->newNumber++; msg.senderId = SharedNetworkData::getInstance()->getHostID(); msg.destinationId = reciever; msg.recieverType = recieverType; msg.priority = messagePriority; it->second.messages.push_back( msg ); } } // if the message is also for myself, handle it here if ( recieverType == RT_ALL_ME ) { NetworkMessage msg; msg.data = new byte[dataLength]; memcpy( msg.data, data, dataLength ); msg.length = dataLength; msg.messageType = messageType; msg.number = SharedNetworkData::getInstance()->getHostID(); msg.senderId = SharedNetworkData::getInstance()->getHostID(); msg.destinationId = reciever; msg.recieverType = recieverType; msg.priority = messagePriority; incomingMessageQueue.push_back( msg ); } }