Starshatter_Open
Open source Starshatter engine
 All Classes Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
NetLink.cpp
Go to the documentation of this file.
1 /* Project nGenEx
2  Destroyer Studios LLC
3  Copyright © 1997-2004. All Rights Reserved.
4 
5  SUBSYSTEM: NetEx.lib
6  FILE: NetLink.cpp
7  AUTHOR: John DiCamillo
8 
9 
10  OVERVIEW
11  ========
12  Network (IP) Socket Wrapper Implementation
13 */
14 
15 
16 #include "MemDebug.h"
17 #include "NetLink.h"
18 #include "NetGram.h"
19 #include "NetMsg.h"
20 #include "NetPeer.h"
21 #include "NetLayer.h"
22 
23 // +-------------------------------------------------------------------+
24 
25 DWORD WINAPI NetLinkProc(LPVOID link);
26 
27 const DWORD UDP_HEADER_SIZE = 34;
28 
29 // +-------------------------------------------------------------------+
30 // client-side ctor
32  : hnet(0), shutdown(false), traffic_time(50), resend_time(300),
33  packets_sent(0), packets_recv(0), bytes_sent(0), bytes_recv(0), retries(0), drops(0), lag(100)
34 {
35  ZeroMemory(lag_samples, sizeof(lag_samples));
36  lag_index = 0;
37 
38  DWORD thread_id = 0;
39  hnet = CreateThread(0, 4096, NetLinkProc, (LPVOID) this, 0, &thread_id);
40 }
41 
42 // server-side ctor
44  : addr(a), hnet(0), shutdown(false), traffic_time(50), resend_time(300),
45  packets_sent(0), packets_recv(0), bytes_sent(0), bytes_recv(0), retries(0), drops(0), lag(100)
46 {
47  ZeroMemory(lag_samples, sizeof(lag_samples));
48  lag_index = 0;
49 
50  sock.bind(addr);
51  DWORD thread_id = 0;
52  hnet = CreateThread(0, 4096, NetLinkProc, (LPVOID) this, 0, &thread_id);
53 }
54 
56 {
57  if (!shutdown) {
58  shutdown = true;
59  }
60 
61  if (hnet) {
62  WaitForSingleObject(hnet, 2000);
63  CloseHandle(hnet);
64  }
65 
66  send_list.destroy(); // packets waiting to be ack'ed must be destroyed
67  recv_list.clear(); // received messages are owned by the peers
68  peer_list.destroy(); // but the net link owns the peers!
69 }
70 
71 // +--------------------------------------------------------------------+
72 
73 static DWORD base_netid = 1000;
74 
75 DWORD
76 NetLink::AddPeer(const char* a, WORD p)
77 {
78  return AddPeer(NetAddr(a, p));
79 }
80 
81 DWORD
82 NetLink::AddPeer(DWORD a, WORD p)
83 {
84  return AddPeer(NetAddr(a, p));
85 }
86 
87 DWORD
89 {
90  if (!a.IPAddr())
91  return 0;
92 
93  AutoThreadSync auto_sync(sync);
94 
95  NetPeer* peer = FindPeer(a);
96 
97  if (!peer) {
98  peer = new(__FILE__, __LINE__) NetPeer(a, base_netid++);
99  if (peer)
100  peer_list.append(peer);
101  }
102 
103  if (peer)
104  return peer->NetID();
105 
106  return 0;
107 }
108 
109 // +--------------------------------------------------------------------+
110 
111 bool
112 NetLink::SendMessage(DWORD nid, void* d, int l, BYTE f)
113 {
114  return SendMessage(new(__FILE__,__LINE__) NetMsg(nid, d, l, f));
115 }
116 
117 bool
118 NetLink::SendMessage(DWORD nid, BYTE type, const char* text, int len, BYTE f)
119 {
120  return SendMessage(new(__FILE__,__LINE__) NetMsg(nid, type, text, len, f));
121 }
122 
123 bool
125 {
126  if (msg) {
127  if (msg->Type() != NetMsg::INVALID &&
128  msg->Type() < NetMsg::RESERVED &&
129  msg->NetID()) {
130 
131  NetPeer* p = FindPeer(msg->NetID());
132  if (p)
133  return p->SendMessage(msg);
134  }
135 
136  delete msg;
137  }
138 
139  return false;
140 }
141 
142 // +--------------------------------------------------------------------+
143 
144 NetMsg*
146 {
147  NetMsg* msg = 0;
148 
149  // receive from specific host:
150  if (netid) {
151  NetPeer* p = FindPeer(netid);
152  if (p) {
153  msg = p->GetMessage();
154 
155  sync.acquire();
156  recv_list.remove(msg);
157  sync.release();
158  }
159  }
160 
161  return msg;
162 }
163 
164 // +--------------------------------------------------------------------+
165 
166 NetMsg*
168 {
169  NetMsg* msg = 0;
170 
171  // get first available packet:
172 
173  // Double-checked locking:
174  if (recv_list.size()) {
175  sync.acquire();
176  if (recv_list.size()) {
177  msg = recv_list.removeIndex(0);
178  }
179  sync.release();
180 
181  if (msg && msg->NetID()) {
182  NetPeer* p = FindPeer(msg->NetID());
183  if (p) {
184  p->GetMessage(); // remove message from peer's list
185  // don't do this inside of sync block -
186  // it might cause a deadlock
187  }
188  }
189  }
190 
191  return msg;
192 }
193 
194 // +--------------------------------------------------------------------+
195 
196 void
198 {
199  shutdown = true;
200 }
201 
202 // +--------------------------------------------------------------------+
203 
204 DWORD WINAPI NetLinkProc(LPVOID link)
205 {
206  NetLink* netlink = (NetLink*) link;
207 
208  if (netlink)
209  return netlink->DoSendRecv();
210 
211  return (DWORD) E_POINTER;
212 }
213 
214 DWORD
216 {
217  while (!shutdown) {
218  ReadPackets();
219  SendPackets();
220 
221  // discard reeeeally old peers:
222  sync.acquire();
223 
225  while (!shutdown && ++iter) {
226  NetPeer* peer = iter.value();
227 
228  if ((NetLayer::GetUTC() - peer->LastReceiveTime()) > 300)
229  delete iter.removeItem();
230  }
231 
232  sync.release();
233  Sleep(traffic_time);
234  }
235 
236  return 0;
237 }
238 
239 void
241 {
242  while (!shutdown && sock.select(NetSock::SELECT_READ) > 0) {
243  NetGram* gram = RecvNetGram();
244 
245  if (gram && gram->IsReliable()) {
246  if (gram->IsAck()) {
247  ProcessAck(gram);
248  delete gram;
249  }
250  else {
251  AckNetGram(gram);
252  QueueNetGram(gram);
253  }
254  }
255  else {
256  QueueNetGram(gram);
257  }
258  }
259 }
260 
261 void
263 {
264  if (shutdown)
265  return;
266 
267  if (sock.select(NetSock::SELECT_WRITE) > 0) {
268  DoRetries();
269  }
270 
271  AutoThreadSync auto_sync(sync);
272 
274  while (!shutdown && ++iter) {
275  NetPeer* p = iter.value();
276  NetGram* g = 0;
277 
278  do {
279  if (sock.select(NetSock::SELECT_WRITE) > 0) {
280  g = p->ComposeGram();
281  if (g) {
282  SendNetGram(g);
283  }
284  }
285  else {
286  g = 0;
287  }
288  }
289  while (!shutdown && g);
290  }
291 }
292 
293 // +--------------------------------------------------------------------+
294 
295 void
297 {
298  if (gram) {
299  if (gram->IsReliable()) {
300  send_list.append(gram);
301  }
302 
303  int err = sock.sendto(gram->Body(), gram->Address());
304 
305  if (err < 0) {
306  err = NetLayer::GetLastError();
307  }
308  else {
309  packets_sent += 1;
310  bytes_sent += gram->Size() + UDP_HEADER_SIZE;
311  }
312 
313  if (!gram->IsReliable())
314  delete gram;
315  }
316 }
317 
318 NetGram*
320 {
321  NetAddr from;
322  Text msg = sock.recvfrom(&from);
323 
324  packets_recv += 1;
325  bytes_recv += msg.length() + UDP_HEADER_SIZE;
326 
327  return new(__FILE__, __LINE__) NetGram(from, msg);
328 }
329 
330 // +--------------------------------------------------------------------+
331 
332 void
334 {
335  if (gram) {
336  NetGram ack = gram->Ack();
337 
338  int err = sock.sendto(ack.Body(), gram->Address());
339  if (err < 0)
340  err = NetLayer::GetLastError();
341  }
342  else {
343  Print("NetLink::AckNetGram( NULL!!! )\n");
344  }
345 }
346 
347 void
349 {
350  if (!shutdown && send_list.size()) {
351  AutoThreadSync auto_sync(sync);
352 
353  // remove the ack flag:
354  gram->ClearAck();
355 
356  // find a matching outgoing packet:
357  int sent = send_list.index(gram);
358  if (sent >= 0) {
359  NetGram* orig = send_list.removeIndex(sent);
360  DWORD time = NetLayer::GetTime();
361  DWORD msec = time - orig->SendTime();
362  double dlag = 0.75 * lag + 0.25 * msec;
363 
364  if (lag_index >= 10) lag_index = 0;
365  lag_samples[lag_index++] = msec;
366 
367  NetPeer* peer = FindPeer(orig->Address());
368  if (peer)
370 
371  delete orig;
372 
373  lag = (DWORD) dlag;
374 
375  if (lag > 100)
376  resend_time = 3 * lag;
377  else
378  resend_time = 300;
379  }
380  }
381 }
382 
383 void
385 {
386  if (!shutdown) {
387  AutoThreadSync auto_sync(sync);
388 
389  DWORD sequence = 0;
390  NetPeer* peer = FindPeer(gram->Address());
391 
392  if (peer) {
393  sequence = peer->Sequence();
394  }
395  else {
396  peer = new(__FILE__, __LINE__) NetPeer(gram->Address(), base_netid++);
397  if (peer)
398  peer_list.append(peer);
399  }
400 
401  if (!gram->IsReliable()) {
402  if (gram->Sequence() < sequence) { // discard, too old
403  delete gram;
404  return;
405  }
406  }
407 
408  // sort this gram into the recv list(s) based on sequence:
409  if (peer) {
410  peer->ReceiveGram(gram, &recv_list);
411  }
412  }
413 }
414 
415 // +--------------------------------------------------------------------+
416 
417 void
419 {
420  if (!shutdown) {
421  AutoThreadSync auto_sync(sync);
422 
423  if (send_list.size()) {
424  int time = (int) NetLayer::GetTime();
425 
427  while (!shutdown && ++iter) {
428  NetGram* gram = iter.value();
429 
430  // still trying ?
431  if (gram->Retries() > 0) {
432  DWORD last_send = gram->SendTime();
433  DWORD delta = time - last_send;
434 
435  if (delta > resend_time) {
436  gram->Retry();
437  sock.sendto(gram->Body(), gram->Address());
438  retries++;
439  }
440  }
441 
442  // oh, give it up:
443  else {
444  iter.removeItem();
445  delete gram;
446  drops++;
447  }
448  }
449  }
450  }
451 }
452 
453 // +--------------------------------------------------------------------+
454 
455 NetPeer*
456 NetLink::FindPeer(DWORD netid)
457 {
458  AutoThreadSync auto_sync(sync);
459  NetPeer* peer = 0;
460 
462  while (++iter && !peer) {
463  NetPeer* p = iter.value();
464 
465  if (p->NetID() == netid)
466  peer = p;
467  }
468 
469  return peer;
470 }
471 
472 NetPeer*
474 {
475  AutoThreadSync auto_sync(sync);
476  NetPeer* peer = 0;
477 
479  while (++iter && !peer) {
480  NetPeer* p = iter.value();
481 
482  if (p->Address() == a)
483  peer = p;
484  }
485 
486  return peer;
487 }