Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9520

Last change on this file since 9520 was 9520, checked in by patrick, 19 years ago

sender now also include the source and destination userId so they can be propageted correctly through the network

File size: 13.4 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22#include "converter.h"
23#include <cassert>
24
25
26
27MessageManager* MessageManager::singletonRef = NULL;
28
29
30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
36  newNumber = 1;
37  setSynchronized( true );
38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
52        delete [] it2->data;
53        it2->data = NULL;
54      }
55    }
56
57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
60
61  outgoingMessageQueue.clear();
62
63  this->messageHandlerMap.clear();
64
65  MessageManager::singletonRef = NULL;
66}
67
68/**
69 * get the diff to last acked state of userId
70 *
71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
84  int i = 0;
85  int n;
86
87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
90
91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
97
98  outgoingMessageQueue[userId].toAck.clear();
99
100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
103
104  // write the message down, a message has this structure:
105  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
106  //      4byte        4byte            4byte         4byte      4byte     data_length
107  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
108  {
109    n = Converter::intToByteArray( it->length, data + i, maxLength );
110    i += n;
111    assert( n == INTSIZE );
112
113    n = Converter::intToByteArray( it->number, data + i, maxLength );
114    i += n;
115    assert( n == INTSIZE );
116
117    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
118    i += n;
119    assert( n == INTSIZE );
120
121    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
122    i += n;
123    assert( n == INTSIZE );
124
125    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
126    i += n;
127    assert( n == INTSIZE );
128
129    assert( i + it->length <= maxLength );
130    memcpy( data + i, it->data, it->length );
131    i += it->length;
132  }
133
134  return i;
135}
136
137/**
138 * sets a new state out of a diff created on another host
139 * @param userId hostId of user who send me that diff
140 * @param data pointer to diff
141 * @param length length of diff
142 * @param stateId id of current state
143 * @param fromStateId id of the base state id
144 * @return number bytes read
145 * @todo check for permissions
146 */
147int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
148{
149  int i = 0;
150  int n;
151
152  int nAcks;
153
154
155  assert( i + INTSIZE <= length );
156  n = Converter::byteArrayToInt( data + i, &nAcks );
157  assert( n == INTSIZE );
158  i += n;
159
160  std::list<int> acks;
161
162  int number;
163
164  for ( int j = 0; j < nAcks; j++ )
165  {
166    assert( i + INTSIZE <= length );
167    n = Converter::byteArrayToInt( data + i, &number );
168    assert( n == INTSIZE );
169    i += n;
170
171    acks.push_back( number );
172  }
173
174  int nMessages;
175
176  assert( i + INTSIZE <= length );
177  n = Converter::byteArrayToInt( data + i, &nMessages );
178  assert( n == INTSIZE );
179  i += n;
180
181  int messageLength, messageType;
182  int senderId, destinationId;
183
184  // now go through all newly received messages and assemble them
185  for ( int j = 0; j < nMessages; j++ )
186  {
187    // read the length
188    assert( i + INTSIZE <= length );
189    n = Converter::byteArrayToInt( data + i, &messageLength );
190    assert( n == INTSIZE );
191    i += n;
192
193    // read the serial number
194    assert( i + INTSIZE <= length );
195    n = Converter::byteArrayToInt( data + i, &number );
196    assert( n == INTSIZE );
197    i += n;
198
199    // read the message type
200    assert( i + INTSIZE <= length );
201    n = Converter::byteArrayToInt( data + i, &messageType );
202    assert( n == INTSIZE );
203    i += n;
204
205    // read the sender id
206    assert( i + INTSIZE <= length );
207    n = Converter::byteArrayToInt( data + i, &senderId );
208    assert( n == INTSIZE );
209    i += n;
210
211    //read the destination id
212    assert( i + INTSIZE <= length );
213    n = Converter::byteArrayToInt( data + i, &destinationId);
214    assert( n == INTSIZE );
215    i += n;
216
217
218    if ( number > 0 )
219      outgoingMessageQueue[userId].toAck.push_back( number );
220
221    assert( i + messageLength <= length );
222    // make sure there is a message handler for this message type
223    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
224
225
226    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
227         outgoingMessageQueue[userId].recievedMessages.end() )
228    {
229      // call the handler function and handle errors
230      if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
231                                                                 messageHandlerMap[(MessageType)messageType].someData, userId ) )
232      {
233        // if the message is not handled correctly, bush it back to the incoming packets
234        NetworkMessage msg;
235
236        msg.data = new byte[messageLength];
237        memcpy( msg.data, data + i, messageLength );
238        msg.length = messageLength;
239        msg.messageType = (MessageType)messageType;
240        msg.number = userId;
241        msg.senderId = senderId;
242        msg.destinationId = destinationId;
243
244        incomingMessageQueue.push_back( msg );
245      }
246      // save the serial number for ack signaling
247      outgoingMessageQueue[userId].recievedMessages.push_back( number );
248      PRINTF(0)("<<< MessageManager: got msg with type: %i\n", messageType);
249    }
250    i += messageLength;
251  }
252
253  // now call the message handlers with the new message
254  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
255  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
256  {
257    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData, it->number ) )
258    {
259      std::list<NetworkMessage>::iterator delIt = it;
260      if ( it->data )
261        delete it->data;
262      it++;
263      incomingMessageQueue.erase( delIt );
264      continue;
265    }
266    it++;
267  }
268
269  //walk throu message queue and remove acked messages
270  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
271  {
272    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
273    {
274      std::list<NetworkMessage>::iterator delIt = it;
275      it++;
276      outgoingMessageQueue[userId].messages.erase( delIt );
277      continue;
278    }
279    it++;
280  }
281
282  //TODO find bether way. maybe with timestamp
283  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
284  {
285    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
286      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
287  }
288
289  return i;
290}
291
292
293
294/**
295 * clean up memory reserved for user
296 * @param userId userid
297 */
298void MessageManager::cleanUpUser( int userId )
299{
300  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
301    return;
302
303  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
304  {
305    if ( it->data )
306      delete it->data;
307    it->data = NULL;
308  }
309
310  outgoingMessageQueue[userId].toAck.clear();
311
312  outgoingMessageQueue.erase( userId );
313}
314
315/**
316 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
317 * @param messageType message id to handle
318 * @param cb function pointer to callback function
319 * @param someData this pointer is passed to callback function without modification
320 * @return true on success
321 */
322bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
323{
324  MessageHandler messageHandler;
325
326  messageHandler.cb = cb;
327  messageHandler.messageType = messageType;
328  messageHandler.someData = someData;
329
330  messageHandlerMap[messageType] = messageHandler;
331
332  return true;
333}
334
335/**
336 * initializes buffers for user
337 * @param userId userId
338 */
339void MessageManager::initUser( int userId )
340{
341  // just do something so map creates a new entry
342  outgoingMessageQueue[userId].toAck.clear();
343  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
344}
345
346/**
347 * send a message to one or more clients
348 * recieverType:
349 *               RT_ALL send to all users. reciever is ignored
350 *               RT_USER send only to reciever
351 *               RT_NOT_USER send to all but reciever
352 *
353 * @param messageType message id
354 * @param data pointer to data
355 * @param dataLength length of data
356 * @param recieverType type of the receiver
357 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
358 */
359void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
360{
361  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
362
363  // go through all outgoing message queues and add the message if its appropriate
364  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
365  {
366
367    // relay server special handling
368    if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
369    {
370      NetworkMessage msg;
371
372      msg.data = new byte[dataLength];
373      memcpy( msg.data, data, dataLength );
374      msg.length = dataLength;
375      msg.messageType = messageType;
376      msg.number = this->newNumber++;
377      msg.senderId = SharedNetworkData::getInstance()->getHostID();
378      msg.destinationId = reciever;
379      msg.priority = messagePriority;
380
381      it->second.messages.push_back( msg );
382    }
383    // proxy server to master server
384    else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
385    {
386      NetworkMessage msg;
387
388      msg.data = new byte[dataLength];
389      memcpy( msg.data, data, dataLength );
390      msg.length = dataLength;
391      msg.messageType = messageType;
392      msg.number = this->newNumber++;
393      msg.senderId = SharedNetworkData::getInstance()->getHostID();
394      msg.destinationId = reciever;
395      msg.priority = messagePriority;
396
397      it->second.messages.push_back( msg );
398    }
399    // check for every node if the message is for it also
400    else if (
401         recieverType == RT_ALL_ME ||
402         recieverType == RT_ALL_BUT_ME ||
403         recieverType == RT_USER && it->first == reciever ||
404         recieverType == RT_NOT_USER && it->first != reciever ||
405         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
406         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
407       )
408    {
409      NetworkMessage msg;
410
411      msg.data = new byte[dataLength];
412      memcpy( msg.data, data, dataLength );
413      msg.length = dataLength;
414      msg.messageType = messageType;
415      msg.number = this->newNumber++;
416      msg.senderId = SharedNetworkData::getInstance()->getHostID();
417      msg.destinationId = reciever;
418      msg.priority = messagePriority;
419
420      it->second.messages.push_back( msg );
421    }
422
423
424  }
425
426
427  // if the message is also for myself, handle it here
428  if ( recieverType == RT_ALL_ME )
429  {
430    NetworkMessage msg;
431
432    msg.data = new byte[dataLength];
433    memcpy( msg.data, data, dataLength );
434    msg.length = dataLength;
435    msg.messageType = messageType;
436    msg.number = SharedNetworkData::getInstance()->getHostID();
437    msg.senderId = SharedNetworkData::getInstance()->getHostID();
438    msg.destinationId = reciever;
439    msg.priority = messagePriority;
440
441    incomingMessageQueue.push_back( msg );
442  }
443}
444
445
Note: See TracBrowser for help on using the repository browser.