Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9563

Last change on this file since 9563 was 9563, checked in by patrick, 18 years ago

playable now gets assigned correctly. but clients can't play it yet

File size: 15.5 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: Patrick Boenzli (patrick@orxonox.ethz.ch)
14
15     June 2006: finishing work on the network stream for pps presentation (rennerc@ee.ethz.ch)
16     July 2006: some code rearangement and integration of the proxy server mechanism (boenzlip@ee.ethz.ch)
17     July 2006: message forwarding algorithms
18*/
19
20//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
21
22#include "message_manager.h"
23
24#include "network_stream.h"
25#include "shared_network_data.h"
26#include "converter.h"
27#include <cassert>
28
29
30
31MessageManager* MessageManager::singletonRef = NULL;
32
33
34/**
35 * standard constructor
36*/
37MessageManager::MessageManager ()
38{
39  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
40  newNumber = 1;
41  setSynchronized( true );
42}
43
44
45/**
46 * standard deconstructor
47*/
48MessageManager::~MessageManager ()
49{
50  for ( MessageQueue::iterator it = outgoingMessageQueue.begin(); it != outgoingMessageQueue.end(); it++ )
51  {
52    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
53    {
54      if ( it2->data )
55      {
56        delete [] it2->data;
57        it2->data = NULL;
58      }
59    }
60
61    it->second.messages.clear();
62    it->second.toAck.clear();
63  }
64
65  outgoingMessageQueue.clear();
66
67  this->messageHandlerMap.clear();
68
69  MessageManager::singletonRef = NULL;
70}
71
72/**
73 * get the diff to last acked state of userId
74 *
75 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
76 * it defines its own protocol
77 *
78 * @param userId user to create diff for
79 * @param data buffer to copy diff in
80 * @param maxLength max bytes to copy into data
81 * @param stateId id of current state
82 * @param fromStateId the reference state for the delta state
83 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
84 * @return n bytes copied into data
85 */
86int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
87{
88  int i = 0;
89  int n;
90
91  n = Converter::intToByteArray( outgoingMessageQueue[userId].toAck.size(), data + i, maxLength );
92  i += n;
93  assert( n == INTSIZE );
94
95  for ( std::list<int>::iterator it = outgoingMessageQueue[userId].toAck.begin(); it != outgoingMessageQueue[userId].toAck.end(); it++)
96  {
97    n = Converter::intToByteArray( *it, data + i, maxLength );
98    i += n;
99    assert( n == INTSIZE );
100  }
101
102  outgoingMessageQueue[userId].toAck.clear();
103
104  n = Converter::intToByteArray( outgoingMessageQueue[userId].messages.size(), data + i, maxLength );
105  i += n;
106  assert( n == INTSIZE );
107
108  // write the message down, a message has this structure:
109  //   | data_length | serial_number | message_type | source_id | dest_id | ...data... |
110  //      4byte        4byte            4byte         4byte      4byte     data_length
111  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
112  {
113    // send data length
114    n = Converter::intToByteArray( it->length, data + i, maxLength );
115    i += n;
116    assert( n == INTSIZE );
117
118    // send serial number
119    n = Converter::intToByteArray( it->number, data + i, maxLength );
120    i += n;
121    assert( n == INTSIZE );
122
123    // send message type
124    n = Converter::intToByteArray( it->messageType, data + i, maxLength );
125    i += n;
126    assert( n == INTSIZE );
127
128    // send sender id
129    n = Converter::intToByteArray( it->senderId, data + i, maxLength );
130    i += n;
131    assert( n == INTSIZE );
132
133    // send destination id
134    n = Converter::intToByteArray( it->destinationId, data + i, maxLength );
135    i += n;
136    assert( n == INTSIZE );
137
138    // send receiver type
139    n = Converter::intToByteArray( it->recieverType, data + i, maxLength );
140    i += n;
141    assert( n == INTSIZE );
142
143    // and copy the data
144    assert( i + it->length <= maxLength );
145    memcpy( data + i, it->data, it->length );
146    i += it->length;
147  }
148
149  return i;
150}
151
152/**
153 * sets a new state out of a diff created on another host
154 * @param userId hostId of user who send me that diff
155 * @param data pointer to diff
156 * @param length length of diff
157 * @param stateId id of current state
158 * @param fromStateId id of the base state id
159 * @return number bytes read
160 * @todo check for permissions
161 */
162int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
163{
164  int i = 0;
165  int n;
166
167  int nAcks;
168
169
170  assert( i + INTSIZE <= length );
171  n = Converter::byteArrayToInt( data + i, &nAcks );
172  assert( n == INTSIZE );
173  i += n;
174
175  std::list<int> acks;
176
177  int number;
178
179  for ( int j = 0; j < nAcks; j++ )
180  {
181    assert( i + INTSIZE <= length );
182    n = Converter::byteArrayToInt( data + i, &number );
183    assert( n == INTSIZE );
184    i += n;
185
186    acks.push_back( number );
187  }
188
189  int nMessages;
190
191  assert( i + INTSIZE <= length );
192  n = Converter::byteArrayToInt( data + i, &nMessages );
193  assert( n == INTSIZE );
194  i += n;
195
196  int messageLength, messageType;
197  int senderId, destinationId, recieverType;
198
199  // now go through all newly received messages and assemble them
200  for ( int j = 0; j < nMessages; j++ )
201  {
202    // read the length
203    assert( i + INTSIZE <= length );
204    n = Converter::byteArrayToInt( data + i, &messageLength );
205    assert( n == INTSIZE );
206    i += n;
207
208    // read the serial number
209    assert( i + INTSIZE <= length );
210    n = Converter::byteArrayToInt( data + i, &number );
211    assert( n == INTSIZE );
212    i += n;
213
214    // read the message type
215    assert( i + INTSIZE <= length );
216    n = Converter::byteArrayToInt( data + i, &messageType );
217    assert( n == INTSIZE );
218    i += n;
219
220    // read the sender id
221    assert( i + INTSIZE <= length );
222    n = Converter::byteArrayToInt( data + i, &senderId );
223    assert( n == INTSIZE );
224    i += n;
225
226    //read the destination id
227    assert( i + INTSIZE <= length );
228    n = Converter::byteArrayToInt( data + i, &destinationId);
229    assert( n == INTSIZE );
230    i += n;
231
232    // read the receiver type
233    assert( i + INTSIZE <= length );
234    n = Converter::byteArrayToInt( data + i, &recieverType);
235    assert( n == INTSIZE );
236    i += n;
237
238    if ( number > 0 )
239      outgoingMessageQueue[userId].toAck.push_back( number );
240
241    assert( i + messageLength <= length );
242    // make sure there is a message handler for this message type
243    assert( messageHandlerMap.find( (MessageType)messageType ) != messageHandlerMap.end() );
244
245
246    if ( std::find( outgoingMessageQueue[userId].recievedMessages.begin(), outgoingMessageQueue[userId].recievedMessages.end(), number ) ==
247         outgoingMessageQueue[userId].recievedMessages.end() )
248    {
249
250      // find out if this message is addressed for this client too
251      if( recieverType == RT_ALL_BUT_ME  && SharedNetworkData::getInstance()->getHostID() != destinationId ||
252          recieverType == RT_ALL_ME ||
253          recieverType == RT_NOT_USER && SharedNetworkData::getInstance()->getHostID() != destinationId ||
254          recieverType == RT_USER  && SharedNetworkData::getInstance()->getHostID() == destinationId ||
255          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isMasterServer() ||
256          recieverType == RT_SERVER && SharedNetworkData::getInstance()->isProxyServerActive())
257      {
258
259        PRINTF(0)("<<< MessageManager: got msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
260      // call the handler function and handle errors
261        if ( !(*(messageHandlerMap[(MessageType)messageType].cb))( (MessageType)messageType, data + i, messageLength,
262                 messageHandlerMap[(MessageType)messageType].someData, senderId, destinationId ) )
263        {
264        // if the message is not handled correctly, bush it back to the incoming packets
265          NetworkMessage msg;
266
267          msg.data = new byte[messageLength];
268          memcpy( msg.data, data + i, messageLength );
269          msg.length = messageLength;
270          msg.messageType = (MessageType)messageType;
271          msg.number = userId;
272          msg.senderId = senderId;
273          msg.recieverType = (RecieverType)recieverType;
274          msg.destinationId = destinationId;
275
276          incomingMessageQueue.push_back( msg );
277        }
278      }
279      // or else forward the message to the other servers
280      else
281      {
282        // forwarding the messages but only if its a proxy
283        if( /*!SharedNetworkData::getInstance()->isClient()*/ SharedNetworkData::getInstance()->isProxyServerActive())
284        {
285          PRINTF(0)("===========>> Forwarding Message msg with type: %i, from sender %i, to rec: %i\n", messageType, senderId, destinationId);
286          NetworkMessage msg;
287
288          msg.data = new byte[messageLength];
289          memcpy( msg.data, data + i, messageLength );
290          msg.length = messageLength;
291          msg.messageType = (MessageType)messageType;
292          msg.number = userId;
293          msg.senderId = senderId;
294          msg.destinationId = destinationId;
295          msg.recieverType = (RecieverType)recieverType;
296
297          this->sendMessage(msg.messageType, msg.data, msg.length, msg.recieverType, msg.senderId = senderId, msg.destinationId, MP_HIGHBANDWIDTH);
298        }
299      }
300
301      // save the serial number for ack signaling
302      outgoingMessageQueue[userId].recievedMessages.push_back( number );
303    }
304
305    i += messageLength;
306  }
307
308  // now call the message handlers with the new message
309  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
310  for ( std::list<NetworkMessage>::iterator it = incomingMessageQueue.begin(); it != incomingMessageQueue.end();  )
311  {
312    if ( (*(messageHandlerMap[it->messageType].cb))( it->messageType, it->data, it->length, messageHandlerMap[it->messageType].someData,
313                                                     /*it->number, */it->senderId, it->destinationId ) )
314    {
315      std::list<NetworkMessage>::iterator delIt = it;
316      if ( it->data )
317        delete it->data;
318      it++;
319      incomingMessageQueue.erase( delIt );
320      continue;
321    }
322    it++;
323  }
324
325  //walk throu message queue and remove acked messages
326  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end();  )
327  {
328    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
329    {
330      std::list<NetworkMessage>::iterator delIt = it;
331      it++;
332      outgoingMessageQueue[userId].messages.erase( delIt );
333      continue;
334    }
335    it++;
336  }
337
338  //TODO find bether way. maybe with timestamp
339  if ( outgoingMessageQueue[userId].recievedMessages.size() > 1000 )
340  {
341    for ( int j = 0; j < outgoingMessageQueue[userId].recievedMessages.size() - 1000; j++ )
342      outgoingMessageQueue[userId].recievedMessages.erase( outgoingMessageQueue[userId].recievedMessages.begin() );
343  }
344
345  return i;
346}
347
348
349
350/**
351 * clean up memory reserved for user
352 * @param userId userid
353 */
354void MessageManager::cleanUpUser( int userId )
355{
356  if ( outgoingMessageQueue.find( userId ) == outgoingMessageQueue.end() )
357    return;
358
359  for ( std::list<NetworkMessage>::iterator it = outgoingMessageQueue[userId].messages.begin(); it != outgoingMessageQueue[userId].messages.end(); it++ )
360  {
361    if ( it->data )
362      delete it->data;
363    it->data = NULL;
364  }
365
366  outgoingMessageQueue[userId].toAck.clear();
367
368  outgoingMessageQueue.erase( userId );
369}
370
371/**
372 * registers function to handle messages with id messageType. someData is passed to callbackfuntion
373 * @param messageType message id to handle
374 * @param cb function pointer to callback function
375 * @param someData this pointer is passed to callback function without modification
376 * @return true on success
377 */
378bool MessageManager::registerMessageHandler( MessageType messageType, MessageCallback cb, void * someData )
379{
380  MessageHandler messageHandler;
381
382  messageHandler.cb = cb;
383  messageHandler.messageType = messageType;
384  messageHandler.someData = someData;
385
386  messageHandlerMap[messageType] = messageHandler;
387
388  return true;
389}
390
391/**
392 * initializes buffers for user
393 * @param userId userId
394 */
395void MessageManager::initUser( int userId )
396{
397  // just do something so map creates a new entry
398  outgoingMessageQueue[userId].toAck.clear();
399  //assert( outgoingMessageQueue[userId].messages.size() == 0 );
400}
401
402
403
404/**
405 * send a message to one or more clients
406 * recieverType:
407 *               RT_ALL send to all users. reciever is ignored
408 *               RT_USER send only to reciever
409 *               RT_NOT_USER send to all but reciever
410 *
411 * @param messageType message id
412 * @param data pointer to data
413 * @param dataLength length of data
414 * @param recieverType type of the receiver
415 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
416 */
417void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
418{
419  this->sendMessage(messageType, data, dataLength, recieverType, SharedNetworkData::getInstance()->getHostID(), reciever, messagePriority);
420}
421
422
423/**
424 * send a message to one or more clients as a special client
425 * recieverType:
426 *               RT_ALL send to all users. reciever is ignored
427 *               RT_USER send only to reciever
428 *               RT_NOT_USER send to all but reciever
429 *
430 * @param messageType message id
431 * @param data pointer to data
432 * @param dataLength length of data
433 * @param recieverType type of the receiver
434 * @param sender the userId of the sender if there is need for shadowing it (eg. for msg forwarding)
435 * @param reciever the userId of the receiver if needed (depends on the ReceiverType)
436 */
437    void MessageManager::sendMessage( MessageType messageType, byte * data, int dataLength, RecieverType recieverType, int sender, int reciever, MessagePriority messagePriority )
438{
439  PRINTF(0)(" >>> MessageManager: sending msg with type: %i, recieverType: %i, reciever %i\n", messageType, recieverType, reciever);
440
441  // go through all outgoing message queues and add the message if its appropriate
442  for ( MessageQueue::iterator it = this->outgoingMessageQueue.begin(); it != this->outgoingMessageQueue.end(); it++ )
443  {
444
445    if (
446         recieverType == RT_ALL_ME ||
447         recieverType == RT_ALL_BUT_ME ||
448         recieverType == RT_USER && it->first == reciever ||
449         recieverType == RT_NOT_USER && it->first != reciever ||
450         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first ) ||
451         recieverType == RT_SERVER && getNetworkStream()->isUserProxyServerActive( it->first )
452       )
453    {
454      NetworkMessage msg;
455
456      msg.data = new byte[dataLength];
457      memcpy( msg.data, data, dataLength );
458      msg.length = dataLength;
459      msg.messageType = messageType;
460      msg.number = this->newNumber++;
461      msg.senderId = sender;
462      msg.destinationId = reciever;
463      msg.recieverType = recieverType;
464      msg.priority = messagePriority;
465
466      it->second.messages.push_back( msg );
467    }
468
469
470  }
471
472
473  // if the message is also for myself, handle it here
474  if ( recieverType == RT_ALL_ME )
475  {
476    NetworkMessage msg;
477
478    msg.data = new byte[dataLength];
479    memcpy( msg.data, data, dataLength );
480    msg.length = dataLength;
481    msg.messageType = messageType;
482    msg.number = SharedNetworkData::getInstance()->getHostID();
483    msg.senderId = sender;
484    msg.destinationId = reciever;
485    msg.recieverType = recieverType;
486    msg.priority = messagePriority;
487
488    incomingMessageQueue.push_back( msg );
489  }
490}
491
492
Note: See TracBrowser for help on using the repository browser.