From e33e19d0587146859d48a134ec9fd94e7b7ba5cd Mon Sep 17 00:00:00 2001 From: "FWoltermann@gmail.com" Date: Thu, 8 Dec 2011 14:53:40 +0000 Subject: Initial upload --- NetEx/NetPeer.cpp | 441 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 441 insertions(+) create mode 100644 NetEx/NetPeer.cpp (limited to 'NetEx/NetPeer.cpp') diff --git a/NetEx/NetPeer.cpp b/NetEx/NetPeer.cpp new file mode 100644 index 0000000..6bf5d18 --- /dev/null +++ b/NetEx/NetPeer.cpp @@ -0,0 +1,441 @@ +/* Project nGenEx + Destroyer Studios LLC + Copyright © 1997-2004. All Rights Reserved. + + SUBSYSTEM: NetEx.lib + FILE: NetPeer.cpp + AUTHOR: John DiCamillo + + + OVERVIEW + ======== + One side of a UDP net link connection +*/ + + +#include "MemDebug.h" +#include +#include "NetPeer.h" +#include "NetGram.h" +#include "NetMsg.h" +#include "NetLayer.h" + +#include + +// +-------------------------------------------------------------------+ + +const int MULTIPART_CHUNKSIZE = 232; +const int MULTIPART_HEADER = 16; +const int UDP_HEADER_SIZE = 34; + +static NetMsgMultipart multi_part_buffer; +static DWORD multi_msg_sequence = 1; + +// +-------------------------------------------------------------------+ + +NetPeer::NetPeer(const NetAddr& a, DWORD id) + : addr(a), netid(id), sequence(0), pps(0), bps(0), max_qsize(0), + status(OK), hist_indx(0), send_size(0), recv_size(0), + chunk_size(MULTIPART_CHUNKSIZE) +{ + ZeroMemory(hist_time, sizeof(hist_time)); + ZeroMemory(hist_size, sizeof(hist_size)); + + last_recv_time = NetLayer::GetUTC(); +} + +NetPeer::~NetPeer() +{ + send_list.destroy(); + recv_list.destroy(); + + multi_send_list.destroy(); + multi_recv_list.destroy(); +} + +// +-------------------------------------------------------------------+ + +bool +NetPeer::SendMessage(NetMsg* msg) +{ + if (msg) { + if (max_qsize > 0 && msg->Length() + send_size > max_qsize) { + status = SEND_OVERFLOW; + delete msg; + return false; + } + + // simple message + if (msg->Length() <= (int) chunk_size) { + if (msg->IsPriority()) + send_list.insert(msg); + else + send_list.append(msg); + + send_size += msg->Length(); + } + + // multipart message + else { + List* list = &send_list; + + if (msg->IsScatter()) + list = &multi_send_list; + + DWORD nparts = msg->Length() / chunk_size; + DWORD extra = msg->Length() % chunk_size; + + if (extra > 0) nparts++; + + multi_part_buffer.type = NetMsg::MULTIPART; + multi_part_buffer.msgid = multi_msg_sequence++; + multi_part_buffer.nparts = nparts; + + DWORD header_size = (DWORD) (&multi_part_buffer.payload) - + (DWORD) (&multi_part_buffer); + + const BYTE* p = msg->Data(); + + for (DWORD i = 0; i < nparts; i++) { + multi_part_buffer.partno = i; + NetMsg* part = 0; + DWORD part_size = chunk_size; + + if (i == nparts-1 && extra > 0) // last partial payload + part_size = extra; + + CopyMemory(multi_part_buffer.payload, p, part_size); + p += part_size; + part = new(__FILE__,__LINE__) NetMsg(msg->NetID(), + &multi_part_buffer, + header_size + part_size, + msg->Flags()); + + if (part) { + list->append(part); + send_size += part->Length(); + } + } + } + + return true; + } + + return false; +} + +// +-------------------------------------------------------------------+ + +NetMsg* +NetPeer::GetMessage() +{ + if (recv_list.size() > 0) { + NetMsg* msg = recv_list.removeIndex(0); + recv_size -= msg->Length(); + return msg; + } + + return 0; +} + +// +-------------------------------------------------------------------+ + +NetGram* +NetPeer::ComposeGram() +{ + NetGram* g = 0; + + if ((send_list.size() || multi_send_list.size()) && OKtoSend()) { + AutoThreadSync auto_sync(sync); + + int xmit_size = send_size; + int nmsg = send_list.size(); + int limit = NET_GRAM_MAX_SIZE; + bool reliable = false; + bool is_multi = false; + + NetMsg* multi_msg = 0; + List* list = &send_list; + + if (xmit_size > limit) { + xmit_size = 0; + nmsg = 0; + + if (send_list.size() > 0) { + NetMsg* msg = 0; + + // if there is regular traffic, and multipart traffic + if (multi_send_list.size()) { + // just send one multipart message in this packet + multi_msg = multi_send_list.removeIndex(0); + limit -= msg->Length(); + reliable = true; + is_multi = true; + } + + for (int i = 0; i < send_list.size(); i++) { + NetMsg* msg = send_list[i]; + + if (xmit_size + msg->Length() < limit) { + xmit_size += msg->Length(); + nmsg++; + } + else { + break; + } + } + } + else { + // if there is only multipart traffic, + // send as many multipart messages as will fit: + list = &multi_send_list; + reliable = true; + is_multi = true; + + for (int i = 0; i < multi_send_list.size(); i++) { + NetMsg* msg = multi_send_list[i]; + + if (xmit_size + msg->Length() < limit) { + xmit_size += msg->Length(); + nmsg++; + } + else { + break; + } + } + } + } + + if (xmit_size > 0 && nmsg > 0) { + BYTE* buffer = new(__FILE__,__LINE__) BYTE[xmit_size]; + BYTE* p = buffer; + + if (multi_msg) { + if (buffer) { + CopyMemory(p, multi_msg->Data(), multi_msg->Length()); + p[1] = multi_msg->Length(); + p += multi_msg->Length(); + } + delete multi_msg; + } + + while (nmsg-- && p < buffer + xmit_size) { + NetMsg* msg = list->removeIndex(0); + + if (msg) { + if (msg->IsReliable()) reliable = true; + if (buffer) { + CopyMemory(p, msg->Data(), msg->Length()); + p[1] = msg->Length(); + p += msg->Length(); + } + delete msg; + } + } + + if (buffer) { + Text user_data((const char*) buffer, xmit_size); + int retries = 0; + + if (reliable) + retries = 5; + + if (is_multi) + retries = 10; + + send_size -= xmit_size; + + hist_size[hist_indx] = xmit_size + UDP_HEADER_SIZE; + hist_time[hist_indx] = NetLayer::GetTime(); + hist_indx++; + + if (hist_indx >= HIST_SIZE) + hist_indx = 0; + + g = new(__FILE__,__LINE__) NetGram(addr, user_data, retries); + delete buffer; + } + } + + // the next msg is too big to fit in a single packet + else { + NetMsg* m = send_list.removeIndex(0); + send_size -= m->Length(); + delete m; + } + } + + return g; +} + +// +-------------------------------------------------------------------+ + +bool +NetPeer::ReceiveGram(NetGram* g, List* q) +{ + if (g) { + if (max_qsize > 0 && recv_size + g->Size() > max_qsize) { + status = RECV_OVERFLOW; + delete g; + return false; + } + + sequence = g->Sequence(); + recv_size += g->Size() - NET_GRAM_HEADER_SIZE; + + // PARSE THE BLOCKS: + BYTE* p = g->UserData(); + + while (p < g->Data() + g->Size()) { + BYTE block_type = p[0]; + BYTE block_size = p[1]; + + if (!block_type || !block_size) + break; + + NetMsg* msg = new(__FILE__,__LINE__) NetMsg(netid, p, block_size); + + if (msg) { + if (msg->Type() < NetMsg::RESERVED) { + msg->SetSequence(sequence); + + recv_list.insertSort(msg); + + if (q) + q->insertSort(msg); + + p += block_size; + } + + else if (msg->Type() == NetMsg::MULTIPART) { + multi_recv_list.insertSort(msg); + p += block_size; + + CheckMultiRecv(q); + } + } + } + + last_recv_time = NetLayer::GetUTC(); + + delete g; + return true; + } + + return false; +} + +// +-------------------------------------------------------------------+ + +bool +NetPeer::OKtoSend() const +{ + if (pps || bps) { + DWORD hist_total = 0; + DWORD hist_count = 0; + DWORD now = NetLayer::GetTime(); + DWORD hist_oldest = now; + DWORD hist_newest = 0; + + for (int i = 0; i < HIST_SIZE; i++) { + if (hist_size[i] > 0) { + hist_total += hist_size[i]; + hist_count++; + } + + if (hist_time[i] > 0) { + if (hist_time[i] < hist_oldest) + hist_oldest = hist_time[i]; + + if (hist_time[i] > hist_newest) + hist_newest = hist_time[i]; + } + } + + if (now - hist_newest < (DWORD) pps) + return false; + + DWORD delta = now - hist_oldest; + DWORD avg_bps = hist_total / delta; + + if (bps > 0 && avg_bps > (DWORD) bps) + return false; + } + + return true; +} + +// +-------------------------------------------------------------------+ + +struct PacketAssembly { + DWORD msgid; + DWORD netid; + int nreq; + int nparts; + int nbytes; +}; + +void +NetPeer::CheckMultiRecv(List* q) +{ + const int MAX_SIMULTANEOUS_MULTI_SEQUENCES = 8; + + PacketAssembly assy[MAX_SIMULTANEOUS_MULTI_SEQUENCES]; + ZeroMemory(assy, sizeof(assy)); + + DWORD header_size = (DWORD) (&multi_part_buffer.payload) - + (DWORD) (&multi_part_buffer); + + // Catalog how much of each multipart sequence has been received: + for (int i = 0; i < multi_recv_list.size(); i++) { + NetMsg* msg = multi_recv_list[i]; + NetMsgMultipart* m = (NetMsgMultipart*) msg->Data(); + + for (int n = 0; n < MAX_SIMULTANEOUS_MULTI_SEQUENCES; n++) { + PacketAssembly* a = assy + n; + + if (a->msgid == 0 || (a->msgid == m->msgid && a->netid == msg->NetID())) { + a->msgid = m->msgid; + a->netid = msg->NetID(); + a->nreq = m->nparts; + a->nparts += 1; + a->nbytes += m->len - header_size; + break; + } + } + } + + for (int n = 0; n < MAX_SIMULTANEOUS_MULTI_SEQUENCES; n++) { + PacketAssembly* a = assy + n; + + // is this sequence complete? + if (a->msgid && a->nparts == a->nreq) { + BYTE* buffer = new BYTE[a->nbytes]; + BYTE* p = buffer; + WORD nid = 0; + + ListIter iter = multi_recv_list; + while (++iter) { + netid = iter->NetID(); + NetMsgMultipart* m = (NetMsgMultipart*) iter->Data(); + + // found part of the sequence + if (m->msgid == a->msgid && netid == a->netid) { + // copy it into the buffer + CopyMemory(p, m->payload, m->len - header_size); + p += m->len - header_size; + + delete iter.removeItem(); + } + } + + NetMsg* msg = new(__FILE__,__LINE__) NetMsg(netid, buffer, a->nbytes, NetMsg::RELIABLE); + if (msg) { + recv_list.insertSort(msg); + + if (q) + q->insertSort(msg); + } + } + } +} -- cgit v1.1