Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9522

Last change on this file since 9522 was 9522, checked in by patrick, 18 years ago

new the message handler is checking, if this message is also for the local host. if not the system tries to relay the message

File size: 14.6 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22#include "converter.h"
23#include <cassert>
24
25
26
27MessageManager* MessageManager::singletonRef = NULL;
28
29
30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
36  newNumber = 1;
37  setSynchronized( true );
38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
52        delete [] it2->data;
53        it2->data = NULL;
54      }
55    }
56
57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
60
61  outgoingMessageQueue.clear();
62
63  this->messageHandlerMap.clear();
64
65  MessageManager::singletonRef = NULL;
66}
67
68/**
69 * get the diff to last acked state of userId
70 *
71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
84  int i = 0;
85  int n;
86
87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
90
91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
97
98  outgoingMessageQueue[userId].toAck.clear();
99
100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
103
104  // write the message down, a message has this structure:
105  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
106  //      4byte        4byte            4byte         4byte      4byte     data_length
107  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
108  {
109    n = Converter::intToByteArray( it->length, data + i, maxLength );
110    i += n;
111    assert( n == INTSIZE );
112
113    n = Converter::intToByteArray( it->number, data + i, maxLength );
114    i += n;
115    assert( n == INTSIZE );
116
117    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
118    i += n;
119    assert( n == INTSIZE );
120
121    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
122    i += n;
123    assert( n == INTSIZE );
124
125    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
126    i += n;
127    assert( n == INTSIZE );
128
129    assert( i + it->length <= maxLength );
130    memcpy( data + i, it->data, it->length );
131    i += it->length;
132  }
133
134  return i;
135}
136
137/**
138 * sets a new state out of a diff created on another host
139 * @param userId hostId of user who send me that diff
140 * @param data pointer to diff
141 * @param length length of diff
142 * @param stateId id of current state
143 * @param fromStateId id of the base state id
144 * @return number bytes read
145 * @todo check for permissions
146 */
147int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
148{
149  int i = 0;
150  int n;
151
152  int nAcks;
153
154
155  assert( i + INTSIZE <= length );
156  n = Converter::byteArrayToInt( data + i, &nAcks );
157  assert( n == INTSIZE );
158  i += n;
159
160  std::list<int> acks;
161
162  int number;
163
164  for ( int j = 0; j < nAcks; j++ )
165  {
166    assert( i + INTSIZE <= length );
167    n = Converter::byteArrayToInt( data + i, &number );
168    assert( n == INTSIZE );
169    i += n;
170
171    acks.push_back( number );
172  }
173
174  int nMessages;
175
176  assert( i + INTSIZE <= length );
177  n = Converter::byteArrayToInt( data + i, &nMessages );
178  assert( n == INTSIZE );
179  i += n;
180
181  int messageLength, messageType;
182  int senderId, destinationId;
183
184  // now go through all newly received messages and assemble them
185  for ( int j = 0; j < nMessages; j++ )
186  {
187    // read the length
188    assert( i + INTSIZE <= length );
189    n = Converter::byteArrayToInt( data + i, &messageLength );
190    assert( n == INTSIZE );
191    i += n;
192
193    // read the serial number
194    assert( i + INTSIZE <= length );
195    n = Converter::byteArrayToInt( data + i, &number );
196    assert( n == INTSIZE );
197    i += n;
198
199    // read the message type
200    assert( i + INTSIZE <= length );
201    n = Converter::byteArrayToInt( data + i, &messageType );
202    assert( n == INTSIZE );
203    i += n;
204
205    // read the sender id
206    assert( i + INTSIZE <= length );
207    n = Converter::byteArrayToInt( data + i, &senderId );
208    assert( n == INTSIZE );
209    i += n;
210
211    //read the destination id
212    assert( i + INTSIZE <= length );
213    n = Converter::byteArrayToInt( data + i, &destinationId);
214    assert( n == INTSIZE );
215    i += n;
216
217
218    if ( number > 0 )
219      outgoingMessageQueue[userId].toAck.push_back( number );
220
221    assert( i + messageLength <= length );
222    // make sure there is a message handler for this message type
223    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
224
225
226    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
227         outgoingMessageQueue[userId].recievedMessages.end() )
228    {
229
230//       RT_ALL_BUT_ME = 1,   //!< message is sent to all users but myself
231//       RT_ALL_ME,           //!< message is sent to all users
232//       RT_USER,             //!< message is only sent to reciever
233//       RT_NOT_USER,         //!< message is sent to all but reciever
234//       RT_SERVER
235
236      // find out if this message is addressed for this client too
237      if( messageType == RT_ALL_BUT_ME ||
238          messageType == RT_ALL_ME ||
239          messageType == RT_USER  && SharedNetworkData::getInstance()->getHostID() == destinationId ||
240          messageType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer())
241      {
242
243      // call the handler function and handle errors
244        if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
245                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
246        {
247        // if the message is not handled correctly, bush it back to the incoming packets
248          NetworkMessage msg;
249
250          msg.data = new byte[messageLength];
251          memcpy( msg.data, data + i, messageLength );
252          msg.length = messageLength;
253          msg.messageType = (MessageType)messageType;
254          msg.number = userId;
255          msg.senderId = senderId;
256          msg.destinationId = destinationId;
257
258          incomingMessageQueue.push_back( msg );
259        }
260      // save the serial number for ack signaling
261        outgoingMessageQueue[userId].recievedMessages.push_back( number );
262        PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
263      }
264      // or else forward the message to the other servers
265      else
266      {
267        PRINTF(0)("===========>> Forwarding Message\n");
268        NetworkMessage msg;
269
270        msg.data = new byte[messageLength];
271        memcpy( msg.data, data + i, messageLength );
272        msg.length = messageLength;
273        msg.messageType = (MessageType)messageType;
274        msg.number = userId;
275        msg.senderId = senderId;
276        msg.destinationId = destinationId;
277
278        incomingMessageQueue.push_back( msg );
279      }
280    }
281
282    i += messageLength;
283  }
284
285  // now call the message handlers with the new message
286  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
287  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
288  {
289    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
290                                                     /*it->number, */it->senderId, it->destinationId ) )
291    {
292      std::list<NetworkMessage>::iterator delIt = it;
293      if ( it->data )
294        delete it->data;
295      it++;
296      incomingMessageQueue.erase( delIt );
297      continue;
298    }
299    it++;
300  }
301
302  //walk throu message queue and remove acked messages
303  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
304  {
305    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
306    {
307      std::list<NetworkMessage>::iterator delIt = it;
308      it++;
309      outgoingMessageQueue[userId].messages.erase( delIt );
310      continue;
311    }
312    it++;
313  }
314
315  //TODO find bether way. maybe with timestamp
316  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
317  {
318    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
319      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
320  }
321
322  return i;
323}
324
325
326
327/**
328 * clean up memory reserved for user
329 * @param userId userid
330 */
331void MessageManager::cleanUpUser( int userId )
332{
333  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
334    return;
335
336  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
337  {
338    if ( it->data )
339      delete it->data;
340    it->data = NULL;
341  }
342
343  outgoingMessageQueue[userId].toAck.clear();
344
345  outgoingMessageQueue.erase( userId );
346}
347
348/**
349 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
350 * @param messageType message id to handle
351 * @param cb function pointer to callback function
352 * @param someData this pointer is passed to callback function without modification
353 * @return true on success
354 */
355bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
356{
357  MessageHandler messageHandler;
358
359  messageHandler.cb = cb;
360  messageHandler.messageType = messageType;
361  messageHandler.someData = someData;
362
363  messageHandlerMap[messageType] = messageHandler;
364
365  return true;
366}
367
368/**
369 * initializes buffers for user
370 * @param userId userId
371 */
372void MessageManager::initUser( int userId )
373{
374  // just do something so map creates a new entry
375  outgoingMessageQueue[userId].toAck.clear();
376  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
377}
378
379/**
380 * send a message to one or more clients
381 * recieverType:
382 *               RT_ALL send to all users. reciever is ignored
383 *               RT_USER send only to reciever
384 *               RT_NOT_USER send to all but reciever
385 *
386 * @param messageType message id
387 * @param data pointer to data
388 * @param dataLength length of data
389 * @param recieverType type of the receiver
390 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
391 */
392void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
393{
394  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
395
396  // go through all outgoing message queues and add the message if its appropriate
397  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
398  {
399
400    // relay server special handling
401    if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
402    {
403      NetworkMessage msg;
404
405      msg.data = new byte[dataLength];
406      memcpy( msg.data, data, dataLength );
407      msg.length = dataLength;
408      msg.messageType = messageType;
409      msg.number = this->newNumber++;
410      msg.senderId = SharedNetworkData::getInstance()->getHostID();
411      msg.destinationId = reciever;
412      msg.priority = messagePriority;
413
414      it->second.messages.push_back( msg );
415    }
416    // proxy server to master server
417    else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
418    {
419      NetworkMessage msg;
420
421      msg.data = new byte[dataLength];
422      memcpy( msg.data, data, dataLength );
423      msg.length = dataLength;
424      msg.messageType = messageType;
425      msg.number = this->newNumber++;
426      msg.senderId = SharedNetworkData::getInstance()->getHostID();
427      msg.destinationId = reciever;
428      msg.priority = messagePriority;
429
430      it->second.messages.push_back( msg );
431    }
432    // check for every node if the message is for it also
433    else if (
434         recieverType == RT_ALL_ME ||
435         recieverType == RT_ALL_BUT_ME ||
436         recieverType == RT_USER && it->first == reciever ||
437         recieverType == RT_NOT_USER && it->first != reciever ||
438         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
439         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
440       )
441    {
442      NetworkMessage msg;
443
444      msg.data = new byte[dataLength];
445      memcpy( msg.data, data, dataLength );
446      msg.length = dataLength;
447      msg.messageType = messageType;
448      msg.number = this->newNumber++;
449      msg.senderId = SharedNetworkData::getInstance()->getHostID();
450      msg.destinationId = reciever;
451      msg.priority = messagePriority;
452
453      it->second.messages.push_back( msg );
454    }
455
456
457  }
458
459
460  // if the message is also for myself, handle it here
461  if ( recieverType == RT_ALL_ME )
462  {
463    NetworkMessage msg;
464
465    msg.data = new byte[dataLength];
466    memcpy( msg.data, data, dataLength );
467    msg.length = dataLength;
468    msg.messageType = messageType;
469    msg.number = SharedNetworkData::getInstance()->getHostID();
470    msg.senderId = SharedNetworkData::getInstance()->getHostID();
471    msg.destinationId = reciever;
472    msg.priority = messagePriority;
473
474    incomingMessageQueue.push_back( msg );
475  }
476}
477
478
Note: See TracBrowser for help on using the repository browser.