Planet
navi homePPSaboutscreenshotsdownloaddevelopmentforum

source: orxonox.OLD/branches/proxy/src/lib/network/message_manager.cc @ 9347

Last change on this file since 9347 was 9347, checked in by bensch, 18 years ago

orxonox/proxy: merged the proxy.old back again, and it seems to work.

Merged with command
svn merge -r9247:HEAD https://svn.orxonox.net/orxonox/branches/proxy.old .

no conflicts

File size: 9.7 KB
Line 
1/*
2   orxonox - the future of 3D-vertical-scrollers
3
4   Copyright (C) 2004 orx
5
6   This program is free software; you can redistribute it and/or modify
7   it under the terms of the GNU General Public License as published by
8   the Free Software Foundation; either version 2, or (at your option)
9   any later version.
10
11   ### File Specific:
12   main-programmer: Christoph Renner
13   co-programmer: ...
14*/
15
16//#define DEBUG_SPECIAL_MODULE DEBUG_MODULE_
17
18#include "message_manager.h"
19
20#include "network_stream.h"
21#include "shared_network_data.h"
22
23using namespace std;
24
25MessageManager* MessageManager::singletonRef = NULL;
26
27
28/**
29 * standard constructor
30*/
31MessageManager::MessageManager ()
32{
33  this->setClassID( CL_MESSAGE_MANAGER, "MessageManager" );
34  newNumber = 1;
35  setSynchronized( true );
36}
37
38
39/**
40 * standard deconstructor
41*/
42MessageManager::~MessageManager ()
43{
44  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
45  {
46    for ( std::list<NetworkMessage>::iterator it2 = it->second.messages.begin(); it2 != it->second.messages.end(); it2++ )
47    {
48      if ( it2->data )
49      {
50        delete [] it2->data;
51        it2->data = NULL;
52      }
53    }
54
55    it->second.messages.clear();
56    it->second.toAck.clear();
57  }
58
59  messageQueue.clear();
60
61  this->messageHandlerMap.clear();
62
63  MessageManager::singletonRef = NULL;
64}
65
66/**
67 * get the diff to last acked state of userId
68 *
69 * this class does not use the normal SynchronizeableVars for zynchronisation. instead
70 * it defines its own protocol
71 *
72 * @param userId user to create diff for
73 * @param data buffer to copy diff in
74 * @param maxLength max bytes to copy into data
75 * @param stateId id of current state
76 * @param fromStateId the reference state for the delta state
77 * @param priorityTH tells getStateDiff to not send element with priority \< priorityTH
78 * @return n bytes copied into data
79 */
80int MessageManager::getStateDiff( int userId, byte * data, int maxLength, int stateId, int fromStateId, int priorityTH )
81{
82  int i = 0;
83  int n;
84
85  n = Converter::intToByteArray( messageQueue[userId].toAck.size(), data + i, maxLength );
86  i += n;
87  assert( n == INTSIZE );
88
89  for ( std::list<int>::iterator it = messageQueue[userId].toAck.begin(); it != messageQueue[userId].toAck.end(); it++)
90  {
91    n = Converter::intToByteArray( *it, data + i, maxLength );
92    i += n;
93    assert( n == INTSIZE );
94  }
95
96  messageQueue[userId].toAck.clear();
97
98  n = Converter::intToByteArray( messageQueue[userId].messages.size(), data + i, maxLength );
99  i += n;
100  assert( n == INTSIZE );
101
102  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
103  {
104    n = Converter::intToByteArray( it->length, data + i, maxLength );
105    i += n;
106    assert( n == INTSIZE );
107
108    n = Converter::intToByteArray( it->number, data + i, maxLength );
109    i += n;
110    assert( n == INTSIZE );
111
112    n = Converter::intToByteArray( it->messageId, data + i, maxLength );
113    i += n;
114    assert( n == INTSIZE );
115
116    assert( i + it->length <= maxLength );
117    memcpy( data + i, it->data, it->length );
118    i += it->length;
119  }
120
121  return i;
122}
123
124/**
125 * sets a new state out of a diff created on another host
126 * @param userId hostId of user who send me that diff
127 * @param data pointer to diff
128 * @param length length of diff
129 * @param stateId id of current state
130 * @param fromStateId id of the base state id
131 * @return number bytes read
132 * @todo check for permissions
133 */
134int MessageManager::setStateDiff( int userId, byte * data, int length, int stateId, int fromStateId )
135{
136  int i = 0;
137  int n;
138
139  int nAcks;
140
141  assert( i + INTSIZE <= length );
142  n = Converter::byteArrayToInt( data + i, &nAcks );
143  assert( n == INTSIZE );
144  i += n;
145
146  std::list<int> acks;
147
148  int number;
149
150  for ( int j = 0; j < nAcks; j++ )
151  {
152    assert( i + INTSIZE <= length );
153    n = Converter::byteArrayToInt( data + i, &number );
154    assert( n == INTSIZE );
155    i += n;
156
157    acks.push_back( number );
158  }
159
160  int nMessages;
161
162  assert( i + INTSIZE <= length );
163  n = Converter::byteArrayToInt( data + i, &nMessages );
164  assert( n == INTSIZE );
165  i += n;
166
167  int messageLength, messageId;
168
169  for ( int j = 0; j < nMessages; j++ )
170  {
171    assert( i + INTSIZE <= length );
172    n = Converter::byteArrayToInt( data + i, &messageLength );
173    assert( n == INTSIZE );
174    i += n;
175
176    assert( i + INTSIZE <= length );
177    n = Converter::byteArrayToInt( data + i, &number );
178    assert( n == INTSIZE );
179    i += n;
180
181    assert( i + INTSIZE <= length );
182    n = Converter::byteArrayToInt( data + i, &messageId );
183    assert( n == INTSIZE );
184    i += n;
185
186    if ( number > 0 )
187      messageQueue[userId].toAck.push_back( number );
188
189    assert( i + messageLength <= length );
190    assert( messageHandlerMap.find( (MessageId)messageId ) != messageHandlerMap.end() );
191    if ( std::find( messageQueue[userId].recievedMessages.begin(), messageQueue[userId].recievedMessages.end(), number )== messageQueue[userId].recievedMessages.end() )
192    {
193      if ( !(*(messageHandlerMap[(MessageId)messageId].cb))( (MessageId)messageId, data + i, messageLength, messageHandlerMap[(MessageId)messageId].someData, userId ) )
194      {
195        NetworkMessage msg;
196
197        msg.data = new byte[messageLength];
198        memcpy( msg.data, data + i, messageLength );
199        msg.length = messageLength;
200        msg.messageId = (MessageId)messageId;
201        msg.number = userId;
202
203        incomingMessageBuffer.push_back( msg );
204      }
205      messageQueue[userId].recievedMessages.push_back( number );
206    }
207    i += messageLength;
208  }
209
210
211  //TODO maybe handle incomingMessage in tick function. else local messages will not be handled if no clients are connected
212  for ( std::list<NetworkMessage>::iterator it = incomingMessageBuffer.begin(); it != incomingMessageBuffer.end();  )
213  {
214    if ( (*(messageHandlerMap[it->messageId].cb))( it->messageId, it->data, it->length, messageHandlerMap[it->messageId].someData, it->number ) )
215    {
216      std::list<NetworkMessage>::iterator delIt = it;
217      if ( it->data )
218        delete it->data;
219      it++;
220      incomingMessageBuffer.erase( delIt );
221      continue;
222    }
223    it++;
224  }
225
226  //walk throu message queue and remove acked messages
227  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end();  )
228  {
229    if ( std::find( acks.begin(), acks.end(), it->number) != acks.end() )
230    {
231      std::list<NetworkMessage>::iterator delIt = it;
232      it++;
233      messageQueue[userId].messages.erase( delIt );
234      continue;
235    }
236    it++;
237  }
238
239  //TODO find bether way. maybe with timestamp
240  if ( messageQueue[userId].recievedMessages.size() > 1000 )
241  {
242    for ( int j = 0; j < messageQueue[userId].recievedMessages.size() - 1000; j++ )
243      messageQueue[userId].recievedMessages.erase( messageQueue[userId].recievedMessages.begin() );
244  }
245
246  return i;
247}
248
249/**
250 * clean up memory reserved for user
251 * @param userId userid
252 */
253void MessageManager::cleanUpUser( int userId )
254{
255  if ( messageQueue.find( userId ) == messageQueue.end() )
256    return;
257
258  for ( std::list<NetworkMessage>::iterator it = messageQueue[userId].messages.begin(); it != messageQueue[userId].messages.end(); it++ )
259  {
260    if ( it->data )
261      delete it->data;
262    it->data = NULL;
263  }
264
265  messageQueue[userId].toAck.clear();
266
267  messageQueue.erase( userId );
268}
269
270/**
271 * registers function to handle messages with id messageId. someData is passed to callbackfuntion
272 * @param messageId message id to handle
273 * @param cb function pointer to callback function
274 * @param someData this pointer is passed to callback function without modification
275 * @return true on success
276 */
277bool MessageManager::registerMessageHandler( MessageId messageId, MessageCallback cb, void * someData )
278{
279  MessageHandler messageHandler;
280
281  messageHandler.cb = cb;
282  messageHandler.messageId = messageId;
283  messageHandler.someData = someData;
284
285  messageHandlerMap[messageId] = messageHandler;
286
287  return true;
288}
289
290/**
291 * initializes buffers for user
292 * @param userId userId
293 */
294void MessageManager::initUser( int userId )
295{
296  // just do something so map creates a new entry
297  messageQueue[userId].toAck.clear();
298  //assert( messageQueue[userId].messages.size() == 0 );
299}
300
301/**
302 * send a message to one or more clients
303 * recieverType:
304 *               RT_ALL send to all users. reciever is ignored
305 *               RT_USER send only to reciever
306 *               RT_NOT_USER send to all but reciever
307 *
308 * @param messageId message id
309 * @param data pointer to data
310 * @param dataLength length of data
311 * @param recieverType
312 * @param reciever
313 */
314void MessageManager::sendMessage( MessageId messageId, byte * data, int dataLength, RecieverType recieverType, int reciever, MessagePriority messagePriority )
315{
316  for ( MessageQueue::iterator it = messageQueue.begin(); it != messageQueue.end(); it++ )
317  {
318    if (
319         recieverType == RT_ALL_ME ||
320         recieverType == RT_ALL_NOT_ME ||
321         recieverType == RT_USER && it->first == reciever ||
322         recieverType == RT_NOT_USER && it->first != reciever ||
323         recieverType == RT_SERVER && getNetworkStream()->isUserMasterServer( it->first )
324       )
325    {
326      NetworkMessage msg;
327
328      msg.data = new byte[dataLength];
329      memcpy( msg.data, data, dataLength );
330      msg.length = dataLength;
331      msg.messageId = messageId;
332      msg.number = newNumber++;
333      msg.priority = messagePriority;
334
335      it->second.messages.push_back( msg );
336    }
337  }
338
339  if ( recieverType == RT_ALL_ME )
340  {
341    NetworkMessage msg;
342
343    msg.data = new byte[dataLength];
344    memcpy( msg.data, data, dataLength );
345    msg.length = dataLength;
346    msg.messageId = messageId;
347    msg.number = SharedNetworkData::getInstance()->getHostID();
348    msg.priority = messagePriority;
349
350    incomingMessageBuffer.push_back( msg );
351  }
352}
353
354
Note: See TracBrowser for help on using the repository browser.