Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9521

Last change on this file since 9521 was 9521, checked in by patrick, 18 years ago

extended the callback function for message handlers to match the new sender/dest message structure

File size: 13.5 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22#include "converter.h"
23#include <cassert>
24
25
26
27MessageManager* MessageManager::singletonRef = NULL;
28
29
30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
36  newNumber = 1;
37  setSynchronized( true );
38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
52        delete [] it2->data;
53        it2->data = NULL;
54      }
55    }
56
57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
60
61  outgoingMessageQueue.clear();
62
63  this->messageHandlerMap.clear();
64
65  MessageManager::singletonRef = NULL;
66}
67
68/**
69 * get the diff to last acked state of userId
70 *
71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
84  int i = 0;
85  int n;
86
87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
90
91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
97
98  outgoingMessageQueue[userId].toAck.clear();
99
100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
103
104  // write the message down, a message has this structure:
105  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
106  //      4byte        4byte            4byte         4byte      4byte     data_length
107  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
108  {
109    n = Converter::intToByteArray( it->length, data + i, maxLength );
110    i += n;
111    assert( n == INTSIZE );
112
113    n = Converter::intToByteArray( it->number, data + i, maxLength );
114    i += n;
115    assert( n == INTSIZE );
116
117    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
118    i += n;
119    assert( n == INTSIZE );
120
121    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
122    i += n;
123    assert( n == INTSIZE );
124
125    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
126    i += n;
127    assert( n == INTSIZE );
128
129    assert( i + it->length <= maxLength );
130    memcpy( data + i, it->data, it->length );
131    i += it->length;
132  }
133
134  return i;
135}
136
137/**
138 * sets a new state out of a diff created on another host
139 * @param userId hostId of user who send me that diff
140 * @param data pointer to diff
141 * @param length length of diff
142 * @param stateId id of current state
143 * @param fromStateId id of the base state id
144 * @return number bytes read
145 * @todo check for permissions
146 */
147int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
148{
149  int i = 0;
150  int n;
151
152  int nAcks;
153
154
155  assert( i + INTSIZE <= length );
156  n = Converter::byteArrayToInt( data + i, &nAcks );
157  assert( n == INTSIZE );
158  i += n;
159
160  std::list<int> acks;
161
162  int number;
163
164  for ( int j = 0; j < nAcks; j++ )
165  {
166    assert( i + INTSIZE <= length );
167    n = Converter::byteArrayToInt( data + i, &number );
168    assert( n == INTSIZE );
169    i += n;
170
171    acks.push_back( number );
172  }
173
174  int nMessages;
175
176  assert( i + INTSIZE <= length );
177  n = Converter::byteArrayToInt( data + i, &nMessages );
178  assert( n == INTSIZE );
179  i += n;
180
181  int messageLength, messageType;
182  int senderId, destinationId;
183
184  // now go through all newly received messages and assemble them
185  for ( int j = 0; j < nMessages; j++ )
186  {
187    // read the length
188    assert( i + INTSIZE <= length );
189    n = Converter::byteArrayToInt( data + i, &messageLength );
190    assert( n == INTSIZE );
191    i += n;
192
193    // read the serial number
194    assert( i + INTSIZE <= length );
195    n = Converter::byteArrayToInt( data + i, &number );
196    assert( n == INTSIZE );
197    i += n;
198
199    // read the message type
200    assert( i + INTSIZE <= length );
201    n = Converter::byteArrayToInt( data + i, &messageType );
202    assert( n == INTSIZE );
203    i += n;
204
205    // read the sender id
206    assert( i + INTSIZE <= length );
207    n = Converter::byteArrayToInt( data + i, &senderId );
208    assert( n == INTSIZE );
209    i += n;
210
211    //read the destination id
212    assert( i + INTSIZE <= length );
213    n = Converter::byteArrayToInt( data + i, &destinationId);
214    assert( n == INTSIZE );
215    i += n;
216
217
218    if ( number > 0 )
219      outgoingMessageQueue[userId].toAck.push_back( number );
220
221    assert( i + messageLength <= length );
222    // make sure there is a message handler for this message type
223    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
224
225
226    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
227         outgoingMessageQueue[userId].recievedMessages.end() )
228    {
229      // call the handler function and handle errors
230      if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
231                                                                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
232      {
233        // if the message is not handled correctly, bush it back to the incoming packets
234        NetworkMessage msg;
235
236        msg.data = new byte[messageLength];
237        memcpy( msg.data, data + i, messageLength );
238        msg.length = messageLength;
239        msg.messageType = (MessageType)messageType;
240        msg.number = userId;
241        msg.senderId = senderId;
242        msg.destinationId = destinationId;
243
244        incomingMessageQueue.push_back( msg );
245      }
246      // save the serial number for ack signaling
247      outgoingMessageQueue[userId].recievedMessages.push_back( number );
248      PRINTF(0)("<<< MessageManager: got msg with type: %i\n", messageType);
249    }
250    i += messageLength;
251  }
252
253  // now call the message handlers with the new message
254  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
255  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
256  {
257    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
258                                                     /*it->number, */it->senderId, it->destinationId ) )
259    {
260      std::list<NetworkMessage>::iterator delIt = it;
261      if ( it->data )
262        delete it->data;
263      it++;
264      incomingMessageQueue.erase( delIt );
265      continue;
266    }
267    it++;
268  }
269
270  //walk throu message queue and remove acked messages
271  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
272  {
273    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
274    {
275      std::list<NetworkMessage>::iterator delIt = it;
276      it++;
277      outgoingMessageQueue[userId].messages.erase( delIt );
278      continue;
279    }
280    it++;
281  }
282
283  //TODO find bether way. maybe with timestamp
284  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
285  {
286    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
287      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
288  }
289
290  return i;
291}
292
293
294
295/**
296 * clean up memory reserved for user
297 * @param userId userid
298 */
299void MessageManager::cleanUpUser( int userId )
300{
301  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
302    return;
303
304  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
305  {
306    if ( it->data )
307      delete it->data;
308    it->data = NULL;
309  }
310
311  outgoingMessageQueue[userId].toAck.clear();
312
313  outgoingMessageQueue.erase( userId );
314}
315
316/**
317 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
318 * @param messageType message id to handle
319 * @param cb function pointer to callback function
320 * @param someData this pointer is passed to callback function without modification
321 * @return true on success
322 */
323bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
324{
325  MessageHandler messageHandler;
326
327  messageHandler.cb = cb;
328  messageHandler.messageType = messageType;
329  messageHandler.someData = someData;
330
331  messageHandlerMap[messageType] = messageHandler;
332
333  return true;
334}
335
336/**
337 * initializes buffers for user
338 * @param userId userId
339 */
340void MessageManager::initUser( int userId )
341{
342  // just do something so map creates a new entry
343  outgoingMessageQueue[userId].toAck.clear();
344  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
345}
346
347/**
348 * send a message to one or more clients
349 * recieverType:
350 *               RT_ALL send to all users. reciever is ignored
351 *               RT_USER send only to reciever
352 *               RT_NOT_USER send to all but reciever
353 *
354 * @param messageType message id
355 * @param data pointer to data
356 * @param dataLength length of data
357 * @param recieverType type of the receiver
358 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
359 */
360void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
361{
362  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
363
364  // go through all outgoing message queues and add the message if its appropriate
365  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
366  {
367
368    // relay server special handling
369    if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
370    {
371      NetworkMessage msg;
372
373      msg.data = new byte[dataLength];
374      memcpy( msg.data, data, dataLength );
375      msg.length = dataLength;
376      msg.messageType = messageType;
377      msg.number = this->newNumber++;
378      msg.senderId = SharedNetworkData::getInstance()->getHostID();
379      msg.destinationId = reciever;
380      msg.priority = messagePriority;
381
382      it->second.messages.push_back( msg );
383    }
384    // proxy server to master server
385    else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
386    {
387      NetworkMessage msg;
388
389      msg.data = new byte[dataLength];
390      memcpy( msg.data, data, dataLength );
391      msg.length = dataLength;
392      msg.messageType = messageType;
393      msg.number = this->newNumber++;
394      msg.senderId = SharedNetworkData::getInstance()->getHostID();
395      msg.destinationId = reciever;
396      msg.priority = messagePriority;
397
398      it->second.messages.push_back( msg );
399    }
400    // check for every node if the message is for it also
401    else if (
402         recieverType == RT_ALL_ME ||
403         recieverType == RT_ALL_BUT_ME ||
404         recieverType == RT_USER && it->first == reciever ||
405         recieverType == RT_NOT_USER && it->first != reciever ||
406         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
407         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
408       )
409    {
410      NetworkMessage msg;
411
412      msg.data = new byte[dataLength];
413      memcpy( msg.data, data, dataLength );
414      msg.length = dataLength;
415      msg.messageType = messageType;
416      msg.number = this->newNumber++;
417      msg.senderId = SharedNetworkData::getInstance()->getHostID();
418      msg.destinationId = reciever;
419      msg.priority = messagePriority;
420
421      it->second.messages.push_back( msg );
422    }
423
424
425  }
426
427
428  // if the message is also for myself, handle it here
429  if ( recieverType == RT_ALL_ME )
430  {
431    NetworkMessage msg;
432
433    msg.data = new byte[dataLength];
434    memcpy( msg.data, data, dataLength );
435    msg.length = dataLength;
436    msg.messageType = messageType;
437    msg.number = SharedNetworkData::getInstance()->getHostID();
438    msg.senderId = SharedNetworkData::getInstance()->getHostID();
439    msg.destinationId = reciever;
440    msg.priority = messagePriority;
441
442    incomingMessageQueue.push_back( msg );
443  }
444}
445
446
Note: See TracBrowser for help on using the repository browser.