Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9512

Last change on this file since 9512 was 9512, checked in by patrick, 18 years ago

now the proxy server has also special handling for message piping

File size: 11.4 KB
RevLine 
[7631]1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
[8228]20#include "network_stream.h"
[8708]21#include "shared_network_data.h"
[9406]22#include "converter.h"
23#include <cassert>
[8228]24
[7631]25
[9406]26
[7671]27MessageManager* MessageManager::singletonRef = NULL;
[7631]28
[7671]29
[7631]30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
[7681]36  newNumber = 1;
37  setSynchronized( true );
[7631]38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
[9508]46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
[7671]47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
[8623]52        delete [] it2->data;
[7671]53        it2->data = NULL;
54      }
55    }
[9406]56
[7671]57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
[9406]60
[9508]61  outgoingMessageQueue.clear();
[9406]62
[9059]63  this->messageHandlerMap.clear();
[9406]64
[9059]65  MessageManager::singletonRef = NULL;
[7631]66}
67
68/**
69 * get the diff to last acked state of userId
70 *
[7671]71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
[7631]73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
[7671]84  int i = 0;
85  int n;
[9406]86
[9508]87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
[7671]88  i += n;
89  assert( n == INTSIZE );
[9406]90
[9508]91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
[7671]92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
[9406]97
[9508]98  outgoingMessageQueue[userId].toAck.clear();
[9406]99
[9508]100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
[7671]101  i += n;
102  assert( n == INTSIZE );
[9406]103
[9508]104  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
[7671]105  {
106    n = Converter::intToByteArray( it->length, data + i, maxLength );
107    i += n;
108    assert( n == INTSIZE );
[9406]109
[7671]110    n = Converter::intToByteArray( it->number, data + i, maxLength );
111    i += n;
112    assert( n == INTSIZE );
[9406]113
[9508]114    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
[7671]115    i += n;
116    assert( n == INTSIZE );
[9406]117
[7671]118    assert( i + it->length <= maxLength );
119    memcpy( data + i, it->data, it->length );
120    i += it->length;
121  }
[9406]122
[7671]123  return i;
[7631]124}
125
126/**
127 * sets a new state out of a diff created on another host
128 * @param userId hostId of user who send me that diff
129 * @param data pointer to diff
130 * @param length length of diff
131 * @param stateId id of current state
132 * @param fromStateId id of the base state id
133 * @return number bytes read
134 * @todo check for permissions
135 */
136int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
137{
[7671]138  int i = 0;
139  int n;
[9406]140
[7671]141  int nAcks;
[9406]142
[9504]143
[7678]144  assert( i + INTSIZE <= length );
[7671]145  n = Converter::byteArrayToInt( data + i, &nAcks );
146  assert( n == INTSIZE );
147  i += n;
[9406]148
[7671]149  std::list<int> acks;
[9406]150
[7671]151  int number;
[9406]152
[7671]153  for ( int j = 0; j < nAcks; j++ )
154  {
[7678]155    assert( i + INTSIZE <= length );
[7671]156    n = Converter::byteArrayToInt( data + i, &number );
157    assert( n == INTSIZE );
158    i += n;
[9406]159
[7671]160    acks.push_back( number );
161  }
[9406]162
[7671]163  int nMessages;
[9406]164
[7678]165  assert( i + INTSIZE <= length );
[7671]166  n = Converter::byteArrayToInt( data + i, &nMessages );
167  assert( n == INTSIZE );
168  i += n;
[7678]169
[9508]170  int messageLength, messageType;
[9406]171
[9508]172  // now go through all messages
[7671]173  for ( int j = 0; j < nMessages; j++ )
174  {
[7678]175    assert( i + INTSIZE <= length );
176    n = Converter::byteArrayToInt( data + i, &messageLength );
177    assert( n == INTSIZE );
178    i += n;
[9406]179
[7678]180    assert( i + INTSIZE <= length );
181    n = Converter::byteArrayToInt( data + i, &number );
182    assert( n == INTSIZE );
183    i += n;
[9406]184
[7678]185    assert( i + INTSIZE <= length );
[9508]186    n = Converter::byteArrayToInt( data + i, &messageType );
[7678]187    assert( n == INTSIZE );
188    i += n;
[9406]189
[7678]190    if ( number > 0 )
[9508]191      outgoingMessageQueue[userId].toAck.push_back( number );
[9406]192
[7678]193    assert( i + messageLength <= length );
[9508]194    // make sure there is a message handler for this message type
195    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
196
197
198    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
199         outgoingMessageQueue[userId].recievedMessages.end() )
[7681]200    {
[9508]201      if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength, messageHandlerMap[(MessageType)messageType].someData, userId ) )
[7693]202      {
203        NetworkMessage msg;
[9406]204
[7693]205        msg.data = new byte[messageLength];
206        memcpy( msg.data, data + i, messageLength );
207        msg.length = messageLength;
[9508]208        msg.messageType = (MessageType)messageType;
[7693]209        msg.number = userId;
[9406]210
[9508]211        incomingMessageQueue.push_back( msg );
[7693]212      }
[9508]213      outgoingMessageQueue[userId].recievedMessages.push_back( number );
[7681]214    }
[7678]215    i += messageLength;
[7671]216  }
[9406]217
218
[8708]219  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
[9508]220  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
[7693]221  {
[9508]222    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData, it->number ) )
[7693]223    {
224      std::list<NetworkMessage>::iterator delIt = it;
[7697]225      if ( it->data )
226        delete it->data;
[7693]227      it++;
[9508]228      incomingMessageQueue.erase( delIt );
[7693]229      continue;
230    }
231    it++;
232  }
[9406]233
[7681]234  //walk throu message queue and remove acked messages
[9508]235  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
[7681]236  {
237    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
238    {
239      std::list<NetworkMessage>::iterator delIt = it;
240      it++;
[9508]241      outgoingMessageQueue[userId].messages.erase( delIt );
[7681]242      continue;
243    }
244    it++;
245  }
[9406]246
[7681]247  //TODO find bether way. maybe with timestamp
[9508]248  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
[7681]249  {
[9508]250    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
251      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
[7681]252  }
[7678]253
254  return i;
[7631]255}
256
[9504]257
258
[7631]259/**
260 * clean up memory reserved for user
261 * @param userId userid
262 */
263void MessageManager::cleanUpUser( int userId )
264{
[9508]265  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
[7678]266    return;
[9406]267
[9508]268  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
[7678]269  {
270    if ( it->data )
271      delete it->data;
272    it->data = NULL;
273  }
[9406]274
[9508]275  outgoingMessageQueue[userId].toAck.clear();
[9406]276
[9508]277  outgoingMessageQueue.erase( userId );
[7631]278}
[7671]279
280/**
[9508]281 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
282 * @param messageType message id to handle
[7671]283 * @param cb function pointer to callback function
284 * @param someData this pointer is passed to callback function without modification
285 * @return true on success
286 */
[9508]287bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
[7671]288{
[7678]289  MessageHandler messageHandler;
[9406]290
[7678]291  messageHandler.cb = cb;
[9508]292  messageHandler.messageType = messageType;
[7678]293  messageHandler.someData = someData;
[9406]294
[9508]295  messageHandlerMap[messageType] = messageHandler;
[9406]296
[7678]297  return true;
[7671]298}
299
300/**
301 * initializes buffers for user
302 * @param userId userId
303 */
304void MessageManager::initUser( int userId )
305{
[7678]306  // just do something so map creates a new entry
[9508]307  outgoingMessageQueue[userId].toAck.clear();
308  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
[7671]309}
[7681]310
311/**
312 * send a message to one or more clients
313 * recieverType:
314 *               RT_ALL send to all users. reciever is ignored
315 *               RT_USER send only to reciever
316 *               RT_NOT_USER send to all but reciever
317 *
[9508]318 * @param messageType message id
[7681]319 * @param data pointer to data
320 * @param dataLength length of data
[9508]321 * @param recieverType type of the receiver
322 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
[7681]323 */
[9508]324void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
[7681]325{
[9509]326  // go through all outgoing message queues and add the message if its appropriate
[9508]327  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
[7681]328  {
[9511]329
[9512]330    // relay server special handling
[9511]331    if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
[7681]332    {
333      NetworkMessage msg;
[8623]334
[7681]335      msg.data = new byte[dataLength];
336      memcpy( msg.data, data, dataLength );
337      msg.length = dataLength;
[9508]338      msg.messageType = messageType;
[9509]339      msg.number = this->newNumber++;
[7681]340      msg.priority = messagePriority;
[8623]341
[7681]342      it->second.messages.push_back( msg );
343    }
[9512]344    // proxy server to master server
345    else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
346    {
347      NetworkMessage msg;
348
349      msg.data = new byte[dataLength];
350      memcpy( msg.data, data, dataLength );
351      msg.length = dataLength;
352      msg.messageType = messageType;
353      msg.number = this->newNumber++;
354      msg.priority = messagePriority;
355
356      it->second.messages.push_back( msg );
357    }
[9511]358    // check for every node if the message is for it also
359    else if (
360         recieverType == RT_ALL_ME ||
361         recieverType == RT_ALL_BUT_ME ||
362         recieverType == RT_USER && it->first == reciever ||
363         recieverType == RT_NOT_USER && it->first != reciever ||
364         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
365         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
366       )
[9509]367    {
368      NetworkMessage msg;
369
370      msg.data = new byte[dataLength];
371      memcpy( msg.data, data, dataLength );
372      msg.length = dataLength;
373      msg.messageType = messageType;
374      msg.number = this->newNumber++;
375      msg.priority = messagePriority;
376
377      it->second.messages.push_back( msg );
378    }
[9511]379
380
[7681]381  }
[9406]382
[9509]383
384  // if the message is also for myself, handle it here
[8623]385  if ( recieverType == RT_ALL_ME )
386  {
387    NetworkMessage msg;
388
389    msg.data = new byte[dataLength];
390    memcpy( msg.data, data, dataLength );
391    msg.length = dataLength;
[9508]392    msg.messageType = messageType;
[8708]393    msg.number = SharedNetworkData::getInstance()->getHostID();
[8623]394    msg.priority = messagePriority;
395
[9508]396    incomingMessageQueue.push_back( msg );
[8623]397  }
[7681]398}
399
400
Note: See TracBrowser for help on using the repository browser.