Using cSocketThreads for client outgoing packets. Unfortunately had to put in one intermediate thread (cServer::cNotifyWriteThread) to avoid deadlocks. Still, seems we have a proper multithreading for clients and no more per-client threads, yay :)
git-svn-id: http://mc-server.googlecode.com/svn/trunk@328 0a769ca7-a7f5-676a-18bf-c427514a06d6
This commit is contained in:
@@ -77,7 +77,8 @@
|
||||
case 2: (z)-=(amount); break; case 3: (z)+=(amount); break;\
|
||||
case 4: (x)-=(amount); break; case 5: (x)+=(amount); break; }
|
||||
|
||||
#define MAX_SEMAPHORES (2000)
|
||||
/// If the number of queued outgoing packets reaches this, the client will be kicked
|
||||
#define MAX_OUTGOING_PACKETS 2000
|
||||
|
||||
|
||||
|
||||
@@ -89,9 +90,7 @@
|
||||
cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
|
||||
: m_ViewDistance(a_ViewDistance)
|
||||
, m_ProtocolVersion(23)
|
||||
, m_pSendThread(NULL)
|
||||
, m_Socket(a_Socket)
|
||||
, m_Semaphore(MAX_SEMAPHORES)
|
||||
, m_bDestroyed(false)
|
||||
, m_Player(NULL)
|
||||
, m_bKicking(false)
|
||||
@@ -135,11 +134,6 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
|
||||
m_PacketMap[E_RESPAWN] = new cPacket_Respawn;
|
||||
m_PacketMap[E_PING] = new cPacket_Ping;
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
m_pSendThread = new cThread(SendThread, this, "cClientHandle::SendThread");
|
||||
m_pSendThread->Start (true);
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
|
||||
LOG("New ClientHandle created at %p", this);
|
||||
}
|
||||
|
||||
@@ -149,7 +143,7 @@ cClientHandle::cClientHandle(const cSocket & a_Socket, int a_ViewDistance)
|
||||
|
||||
cClientHandle::~cClientHandle()
|
||||
{
|
||||
LOG("Deleting client \"%s\"", GetUsername().c_str());
|
||||
LOG("Deleting client \"%s\" at %p", GetUsername().c_str(), this);
|
||||
|
||||
// Remove from cSocketThreads, we're not to be called anymore:
|
||||
cRoot::Get()->GetServer()->ClientDestroying(this);
|
||||
@@ -173,20 +167,13 @@ cClientHandle::~cClientHandle()
|
||||
}
|
||||
}
|
||||
|
||||
// First stop sending thread
|
||||
m_bKeepThreadGoing = false;
|
||||
|
||||
if (m_Socket.IsValid())
|
||||
{
|
||||
cPacket_Disconnect Disconnect;
|
||||
Disconnect.m_Reason = "Server shut down? Kthnxbai";
|
||||
m_Socket.Send(&Disconnect);
|
||||
m_Socket.CloseSocket();
|
||||
}
|
||||
|
||||
m_Semaphore.Signal();
|
||||
delete m_pSendThread;
|
||||
|
||||
|
||||
if (m_Player != NULL)
|
||||
{
|
||||
m_Player->SetClientHandle(NULL);
|
||||
@@ -198,19 +185,31 @@ cClientHandle::~cClientHandle()
|
||||
delete m_PacketMap[i];
|
||||
}
|
||||
|
||||
// Queue all remaining outgoing packets to cSocketThreads:
|
||||
{
|
||||
cCSLock Lock(m_SendCriticalSection);
|
||||
cCSLock Lock(m_CSPackets);
|
||||
for (PacketList::iterator itr = m_PendingNrmSendPackets.begin(); itr != m_PendingNrmSendPackets.end(); ++itr)
|
||||
{
|
||||
AString Data;
|
||||
(*itr)->Serialize(Data);
|
||||
cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data);
|
||||
delete *itr;
|
||||
}
|
||||
m_PendingNrmSendPackets.clear();
|
||||
for (PacketList::iterator itr = m_PendingLowSendPackets.begin(); itr != m_PendingLowSendPackets.end(); ++itr)
|
||||
{
|
||||
AString Data;
|
||||
(*itr)->Serialize(Data);
|
||||
cRoot::Get()->GetServer()->WriteToClient(&m_Socket, Data);
|
||||
delete *itr;
|
||||
}
|
||||
m_PendingLowSendPackets.clear();
|
||||
}
|
||||
|
||||
LOG("ClientHandle at %p destroyed", this);
|
||||
// Queue the socket to close as soon as it sends all outgoing data:
|
||||
cRoot::Get()->GetServer()->QueueClientClose(&m_Socket);
|
||||
|
||||
LOG("ClientHandle at %p deleted", this);
|
||||
}
|
||||
|
||||
|
||||
@@ -295,8 +294,8 @@ void cClientHandle::Authenticate(void)
|
||||
Send(Health);
|
||||
|
||||
m_Player->Initialize(World);
|
||||
m_State = csDownloadingWorld;
|
||||
StreamChunks();
|
||||
m_State = csDownloadingWorld;
|
||||
}
|
||||
|
||||
|
||||
@@ -305,7 +304,7 @@ void cClientHandle::Authenticate(void)
|
||||
|
||||
void cClientHandle::StreamChunks(void)
|
||||
{
|
||||
if (m_State < csDownloadingWorld)
|
||||
if (m_State < csAuthenticating)
|
||||
{
|
||||
return;
|
||||
}
|
||||
@@ -323,7 +322,7 @@ void cClientHandle::StreamChunks(void)
|
||||
m_LastStreamedChunkZ = ChunkPosZ;
|
||||
|
||||
// DEBUG:
|
||||
LOGINFO("Streaming chunks centered on [%d, %d]", ChunkPosX, ChunkPosZ);
|
||||
LOGINFO("Streaming chunks centered on [%d, %d], view distance %d", ChunkPosX, ChunkPosZ, m_ViewDistance);
|
||||
|
||||
cWorld * World = m_Player->GetWorld();
|
||||
ASSERT(World != NULL);
|
||||
@@ -1645,7 +1644,10 @@ void cClientHandle::Tick(float a_Dt)
|
||||
if (m_State >= csDownloadingWorld)
|
||||
{
|
||||
cWorld * World = m_Player->GetWorld();
|
||||
|
||||
cCSLock Lock(m_CSChunkLists);
|
||||
|
||||
// Send the chunks:
|
||||
int NumSent = 0;
|
||||
for (cChunkCoordsList::iterator itr = m_ChunksToSend.begin(); itr != m_ChunksToSend.end();)
|
||||
{
|
||||
@@ -1662,7 +1664,8 @@ void cClientHandle::Tick(float a_Dt)
|
||||
break;
|
||||
}
|
||||
} // for itr - m_ChunksToSend[]
|
||||
|
||||
Lock.Unlock();
|
||||
|
||||
// Check even if we didn't send anything - a chunk may have sent a notification that we'd miss otherwise
|
||||
CheckIfWorldDownloaded();
|
||||
}
|
||||
@@ -1707,8 +1710,7 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
|
||||
}
|
||||
}
|
||||
|
||||
bool bSignalSemaphore = true;
|
||||
cCSLock Lock(m_SendCriticalSection);
|
||||
cCSLock Lock(m_CSPackets);
|
||||
if (a_Priority == E_PRIORITY_NORMAL)
|
||||
{
|
||||
if (a_Packet->m_PacketID == E_REL_ENT_MOVE_LOOK)
|
||||
@@ -1727,7 +1729,6 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
|
||||
{
|
||||
Packets.erase(itr);
|
||||
bBreak = true;
|
||||
bSignalSemaphore = false; // Because 1 packet is removed, semaphore count is the same
|
||||
delete PacketData;
|
||||
break;
|
||||
}
|
||||
@@ -1747,10 +1748,9 @@ void cClientHandle::Send(const cPacket * a_Packet, ENUM_PRIORITY a_Priority /* =
|
||||
m_PendingLowSendPackets.push_back(a_Packet->Clone());
|
||||
}
|
||||
Lock.Unlock();
|
||||
if (bSignalSemaphore)
|
||||
{
|
||||
m_Semaphore.Signal();
|
||||
}
|
||||
|
||||
// Notify SocketThreads that we have something to write:
|
||||
cRoot::Get()->GetServer()->NotifyClientWrite(this);
|
||||
}
|
||||
|
||||
|
||||
@@ -1797,90 +1797,6 @@ void cClientHandle::SendConfirmPosition(void)
|
||||
|
||||
|
||||
|
||||
void cClientHandle::SendThread(void *lpParam)
|
||||
{
|
||||
cClientHandle* self = (cClientHandle*)lpParam;
|
||||
PacketList & NrmSendPackets = self->m_PendingNrmSendPackets;
|
||||
PacketList & LowSendPackets = self->m_PendingLowSendPackets;
|
||||
|
||||
|
||||
while (self->m_bKeepThreadGoing && self->m_Socket.IsValid())
|
||||
{
|
||||
self->m_Semaphore.Wait();
|
||||
cCSLock Lock(self->m_SendCriticalSection);
|
||||
if (NrmSendPackets.size() + LowSendPackets.size() > MAX_SEMAPHORES)
|
||||
{
|
||||
LOGERROR("ERROR: Too many packets in queue for player %s !!", self->m_Username.c_str());
|
||||
cPacket_Disconnect DC("Too many packets in queue.");
|
||||
self->m_Socket.Send(DC);
|
||||
|
||||
cSleep::MilliSleep(1000); // Give packet some time to be received
|
||||
|
||||
Lock.Unlock();
|
||||
self->Destroy();
|
||||
break;
|
||||
}
|
||||
|
||||
if (NrmSendPackets.size() == 0 && LowSendPackets.size() == 0)
|
||||
{
|
||||
ASSERT(!self->m_bKeepThreadGoing);
|
||||
if (self->m_bKeepThreadGoing)
|
||||
{
|
||||
LOGERROR("ERROR: Semaphore was signaled while no packets to send");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (NrmSendPackets.size() > MAX_SEMAPHORES / 2)
|
||||
{
|
||||
LOGINFO("Pending packets: %i Last: 0x%02x", NrmSendPackets.size(), (*NrmSendPackets.rbegin())->m_PacketID);
|
||||
}
|
||||
|
||||
cPacket * Packet = NULL;
|
||||
if (!NrmSendPackets.empty())
|
||||
{
|
||||
Packet = *NrmSendPackets.begin();
|
||||
NrmSendPackets.erase(NrmSendPackets.begin());
|
||||
}
|
||||
else if (!LowSendPackets.empty())
|
||||
{
|
||||
Packet = *LowSendPackets.begin();
|
||||
LowSendPackets.erase(LowSendPackets.begin());
|
||||
}
|
||||
Lock.Unlock();
|
||||
|
||||
if (!self->m_Socket.IsValid())
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
// LOG("Sending packet 0x%02x to \"%s\" (\"%s\")", Packet->m_PacketID, self->m_Socket.GetIPString().c_str(), self->m_Username.c_str());
|
||||
|
||||
bool bSuccess = self->m_Socket.Send(Packet);
|
||||
|
||||
if (!bSuccess)
|
||||
{
|
||||
LOGERROR("ERROR: While sending packet 0x%02x to client \"%s\"", Packet->m_PacketID, self->m_Username.c_str());
|
||||
delete Packet;
|
||||
self->Destroy();
|
||||
break;
|
||||
}
|
||||
delete Packet;
|
||||
|
||||
if (self->m_bKicking && (NrmSendPackets.size() + LowSendPackets.size() == 0)) // Disconnect player after all packets have been sent
|
||||
{
|
||||
cSleep::MilliSleep(1000); // Give all packets some time to be received
|
||||
self->Destroy();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
const AString & cClientHandle::GetUsername(void) const
|
||||
{
|
||||
return m_Username;
|
||||
@@ -1967,7 +1883,49 @@ void cClientHandle::GetOutgoingData(AString & a_Data)
|
||||
{
|
||||
// Data can be sent to client
|
||||
|
||||
// TODO
|
||||
cCSLock Lock(m_CSPackets);
|
||||
if (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() > MAX_OUTGOING_PACKETS)
|
||||
{
|
||||
LOGERROR("ERROR: Too many packets in queue for player %s !!", m_Username.c_str());
|
||||
cPacket_Disconnect DC("Too many packets in queue.");
|
||||
m_Socket.Send(DC);
|
||||
Lock.Unlock();
|
||||
Destroy();
|
||||
return;
|
||||
}
|
||||
|
||||
if ((m_PendingNrmSendPackets.size() == 0) && (m_PendingLowSendPackets.size() == 0))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if (m_PendingNrmSendPackets.size() > MAX_OUTGOING_PACKETS / 2)
|
||||
{
|
||||
LOGINFO("Suspiciously many pending packets: %i; client \"%s\", LastType: 0x%02x", m_PendingNrmSendPackets.size(), m_Username.c_str(), (*m_PendingNrmSendPackets.rbegin())->m_PacketID);
|
||||
}
|
||||
|
||||
AString Data;
|
||||
if (!m_PendingNrmSendPackets.empty())
|
||||
{
|
||||
m_PendingNrmSendPackets.front()->Serialize(Data);
|
||||
delete m_PendingNrmSendPackets.front();
|
||||
m_PendingNrmSendPackets.erase(m_PendingNrmSendPackets.begin());
|
||||
}
|
||||
else if (!m_PendingLowSendPackets.empty())
|
||||
{
|
||||
m_PendingLowSendPackets.front()->Serialize(Data);
|
||||
delete m_PendingLowSendPackets.front();
|
||||
m_PendingLowSendPackets.erase(m_PendingLowSendPackets.begin());
|
||||
}
|
||||
Lock.Unlock();
|
||||
|
||||
a_Data.append(Data);
|
||||
|
||||
// Disconnect player after all packets have been sent
|
||||
if (m_bKicking && (m_PendingNrmSendPackets.size() + m_PendingLowSendPackets.size() == 0))
|
||||
{
|
||||
Destroy();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user