Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9519

Last change on this file since 9519 was 9519, checked in by patrick, 19 years ago

extending the message structure

File size: 12.2 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22#include "converter.h"
23#include <cassert>
24
25
26
27MessageManager* MessageManager::singletonRef = NULL;
28
29
30/**
31 * standard constructor
32*/
33MessageManager::MessageManager ()
34{
35  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
36  newNumber = 1;
37  setSynchronized( true );
38}
39
40
41/**
42 * standard deconstructor
43*/
44MessageManager::~MessageManager ()
45{
46  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
47  {
48    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
49    {
50      if ( it2->data )
51      {
52        delete [] it2->data;
53        it2->data = NULL;
54      }
55    }
56
57    it->second.messages.clear();
58    it->second.toAck.clear();
59  }
60
61  outgoingMessageQueue.clear();
62
63  this->messageHandlerMap.clear();
64
65  MessageManager::singletonRef = NULL;
66}
67
68/**
69 * get the diff to last acked state of userId
70 *
71 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
72 * it defines its own protocol
73 *
74 * @param userId user to create diff for
75 * @param data buffer to copy diff in
76 * @param maxLength max bytes to copy into data
77 * @param stateId id of current state
78 * @param fromStateId the reference state for the delta state
79 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
80 * @return n bytes copied into data
81 */
82int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
83{
84  int i = 0;
85  int n;
86
87  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
88  i += n;
89  assert( n == INTSIZE );
90
91  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
92  {
93    n = Converter::intToByteArray( *it, data + i, maxLength );
94    i += n;
95    assert( n == INTSIZE );
96  }
97
98  outgoingMessageQueue[userId].toAck.clear();
99
100  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
101  i += n;
102  assert( n == INTSIZE );
103
104  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
105  {
106    n = Converter::intToByteArray( it->length, data + i, maxLength );
107    i += n;
108    assert( n == INTSIZE );
109
110    n = Converter::intToByteArray( it->number, data + i, maxLength );
111    i += n;
112    assert( n == INTSIZE );
113
114    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
115    i += n;
116    assert( n == INTSIZE );
117
118    assert( i + it->length <= maxLength );
119    memcpy( data + i, it->data, it->length );
120    i += it->length;
121  }
122
123  return i;
124}
125
126/**
127 * sets a new state out of a diff created on another host
128 * @param userId hostId of user who send me that diff
129 * @param data pointer to diff
130 * @param length length of diff
131 * @param stateId id of current state
132 * @param fromStateId id of the base state id
133 * @return number bytes read
134 * @todo check for permissions
135 */
136int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
137{
138  int i = 0;
139  int n;
140
141  int nAcks;
142
143
144  assert( i + INTSIZE <= length );
145  n = Converter::byteArrayToInt( data + i, &nAcks );
146  assert( n == INTSIZE );
147  i += n;
148
149  std::list<int> acks;
150
151  int number;
152
153  for ( int j = 0; j < nAcks; j++ )
154  {
155    assert( i + INTSIZE <= length );
156    n = Converter::byteArrayToInt( data + i, &number );
157    assert( n == INTSIZE );
158    i += n;
159
160    acks.push_back( number );
161  }
162
163  int nMessages;
164
165  assert( i + INTSIZE <= length );
166  n = Converter::byteArrayToInt( data + i, &nMessages );
167  assert( n == INTSIZE );
168  i += n;
169
170  int messageLength, messageType;
171
172  // now go through all newly received messages
173  for ( int j = 0; j < nMessages; j++ )
174  {
175    assert( i + INTSIZE <= length );
176    n = Converter::byteArrayToInt( data + i, &messageLength );
177    assert( n == INTSIZE );
178    i += n;
179
180    assert( i + INTSIZE <= length );
181    n = Converter::byteArrayToInt( data + i, &number );
182    assert( n == INTSIZE );
183    i += n;
184
185    assert( i + INTSIZE <= length );
186    n = Converter::byteArrayToInt( data + i, &messageType );
187    assert( n == INTSIZE );
188    i += n;
189
190    if ( number > 0 )
191      outgoingMessageQueue[userId].toAck.push_back( number );
192
193    assert( i + messageLength <= length );
194    // make sure there is a message handler for this message type
195    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
196
197
198    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
199         outgoingMessageQueue[userId].recievedMessages.end() )
200    {
201      // call the handler function and handle errors
202      if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
203                                                                 messageHandlerMap[(MessageType)messageType].someData, userId ) )
204      {
205        NetworkMessage msg;
206
207        msg.data = new byte[messageLength];
208        memcpy( msg.data, data + i, messageLength );
209        msg.length = messageLength;
210        msg.messageType = (MessageType)messageType;
211        msg.number = userId;
212
213        incomingMessageQueue.push_back( msg );
214      }
215      outgoingMessageQueue[userId].recievedMessages.push_back( number );
216      PRINTF(0)("<<< MessageManager: got msg with type: %i\n", messageType);
217    }
218    i += messageLength;
219  }
220
221  // now call the message handlers with the new message
222  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
223  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
224  {
225    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData, it->number ) )
226    {
227      std::list<NetworkMessage>::iterator delIt = it;
228      if ( it->data )
229        delete it->data;
230      it++;
231      incomingMessageQueue.erase( delIt );
232      continue;
233    }
234    it++;
235  }
236
237  //walk throu message queue and remove acked messages
238  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
239  {
240    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
241    {
242      std::list<NetworkMessage>::iterator delIt = it;
243      it++;
244      outgoingMessageQueue[userId].messages.erase( delIt );
245      continue;
246    }
247    it++;
248  }
249
250  //TODO find bether way. maybe with timestamp
251  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
252  {
253    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
254      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
255  }
256
257  return i;
258}
259
260
261
262/**
263 * clean up memory reserved for user
264 * @param userId userid
265 */
266void MessageManager::cleanUpUser( int userId )
267{
268  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
269    return;
270
271  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
272  {
273    if ( it->data )
274      delete it->data;
275    it->data = NULL;
276  }
277
278  outgoingMessageQueue[userId].toAck.clear();
279
280  outgoingMessageQueue.erase( userId );
281}
282
283/**
284 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
285 * @param messageType message id to handle
286 * @param cb function pointer to callback function
287 * @param someData this pointer is passed to callback function without modification
288 * @return true on success
289 */
290bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
291{
292  MessageHandler messageHandler;
293
294  messageHandler.cb = cb;
295  messageHandler.messageType = messageType;
296  messageHandler.someData = someData;
297
298  messageHandlerMap[messageType] = messageHandler;
299
300  return true;
301}
302
303/**
304 * initializes buffers for user
305 * @param userId userId
306 */
307void MessageManager::initUser( int userId )
308{
309  // just do something so map creates a new entry
310  outgoingMessageQueue[userId].toAck.clear();
311  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
312}
313
314/**
315 * send a message to one or more clients
316 * recieverType:
317 *               RT_ALL send to all users. reciever is ignored
318 *               RT_USER send only to reciever
319 *               RT_NOT_USER send to all but reciever
320 *
321 * @param messageType message id
322 * @param data pointer to data
323 * @param dataLength length of data
324 * @param recieverType type of the receiver
325 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
326 */
327void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
328{
329  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
330
331  // go through all outgoing message queues and add the message if its appropriate
332  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
333  {
334
335    // relay server special handling
336    if( SharedNetworkData::getInstance()->isMasterServer() && SharedNetworkData::getInstance()->isUserProxyServerActive(it->first))
337    {
338      NetworkMessage msg;
339
340      msg.data = new byte[dataLength];
341      memcpy( msg.data, data, dataLength );
342      msg.length = dataLength;
343      msg.messageType = messageType;
344      msg.number = this->newNumber++;
345      msg.senderId = SharedNetworkData::getInstance()->getHostID();
346      msg.destinationId = reciever;
347      msg.priority = messagePriority;
348
349      it->second.messages.push_back( msg );
350    }
351    // proxy server to master server
352    else if( SharedNetworkData::getInstance()->isProxyServerActive() && SharedNetworkData::getInstance()->isUserMasterServer(it->first))
353    {
354      NetworkMessage msg;
355
356      msg.data = new byte[dataLength];
357      memcpy( msg.data, data, dataLength );
358      msg.length = dataLength;
359      msg.messageType = messageType;
360      msg.number = this->newNumber++;
361      msg.senderId = SharedNetworkData::getInstance()->getHostID();
362      msg.destinationId = reciever;
363      msg.priority = messagePriority;
364
365      it->second.messages.push_back( msg );
366    }
367    // check for every node if the message is for it also
368    else if (
369         recieverType == RT_ALL_ME ||
370         recieverType == RT_ALL_BUT_ME ||
371         recieverType == RT_USER && it->first == reciever ||
372         recieverType == RT_NOT_USER && it->first != reciever ||
373         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
374         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
375       )
376    {
377      NetworkMessage msg;
378
379      msg.data = new byte[dataLength];
380      memcpy( msg.data, data, dataLength );
381      msg.length = dataLength;
382      msg.messageType = messageType;
383      msg.number = this->newNumber++;
384      msg.senderId = SharedNetworkData::getInstance()->getHostID();
385      msg.destinationId = reciever;
386      msg.priority = messagePriority;
387
388      it->second.messages.push_back( msg );
389    }
390
391
392  }
393
394
395  // if the message is also for myself, handle it here
396  if ( recieverType == RT_ALL_ME )
397  {
398    NetworkMessage msg;
399
400    msg.data = new byte[dataLength];
401    memcpy( msg.data, data, dataLength );
402    msg.length = dataLength;
403    msg.messageType = messageType;
404    msg.number = SharedNetworkData::getInstance()->getHostID();
405    msg.senderId = SharedNetworkData::getInstance()->getHostID();
406    msg.destinationId = reciever;
407    msg.priority = messagePriority;
408
409    incomingMessageQueue.push_back( msg );
410  }
411}
412
413
Note: See TracBrowser for help on using the repository browser.