TCP Socket을 구현하는 방법들
TCP Socket은 네트워크 레벨에서의 기능을 담당한다. Connect, Read, Write, Disconnect 등등과 같은 기능들이 있을 것이다. 다만 이 소켓을 이용한 함수들을 이용하는데에는 필연적으로 직면하게 되는 문제가 있다. 네트워크 레벨을 통해서 데이터를 주고받아야 하기 때문에, 네트워크 레벨에서 버퍼에 데이터가 모두 준비되지 않았을 경우, 이 상황을 어떻게 처리해야할까? 이러한 문제에 대해 여러가지 방법으로 대응이 가능하다. 다음과 방법들이 있다.
1. Blocking I/O
TCP 소켓의에서 Blocking I/O는 가장 간단하고 일반적으로 사용되는 접근 방식이다. Blocking I/O를 이용하여 Socket의 기능을 이용하는 경우 어플리케이션은 Block 상태에 빠지고, 해당 작업이 끝날 때까지 return 하지 않을 것이다. 만약 읽을 수 있는 데이터가 없거나 전송 버퍼가 가득 찬 경우 Read 또는 Write 작업은 수행될 수 있을 때까지 기다린다. 이 접근 방식은 사용 및 이해가 간단하지만 I/O 작업이 완료될 때까지 애플리케이션이 멈추게 된다는 치명적인 단점이 있다.
2. NonBlocking I/O
NonBlocking I/O는 프로그램이 완료될 때까지 기다리지 않고 I/O 작업을 수행할 수 있도록 하는 접근 방식이다. 즉, 읽기 및 쓰기 작업을 즉시 수행할 수 없더라도 즉시 반환된다. 즉, 네트워크 레벨에서 버퍼에 요청한 사이즈의 데이터가 모두 준비되지 않더라도, 현재까지 쌓여있는 데이터만 받아서 반환된다는 것이다. 이러한 방식은 Read, Write 함수를 이용함에 있어서 훨씬 더 많은 예외처리를 해주어야 하지만, Blocking I/O에서의 어플리케이션이 멈춰버리는 문제를 어플리케이션 레벨에서 핸들링할 수 있도록 한다.
3. Async I/O
Blocking I/O와 NonBlocking I/O는 좀 더 소켓에 특화된 기능인 반면, Async I/O는 어플리케이션 레벨에서 Blocking을 핸들링하는 방법이다. NonBlocking I/O는 현재까지 준비된 데이터만 가져오는 형태로 Blocking I/O에서의 어플리케이션이 멈추는 문제를 처리했다. 하지만 이것은 스레드가 하나밖에 없을 때의 이야기이다. 여러 개의 스레드를 활용할 수 있다면, 굳이 NonBlocking I/O와 복잡한 Polling 알고리즘으로 문제를 처리하기보다는, Async I/O를 이용하여 문제를 해결하는것이 효과적일 것이다.
Async I/O는 기본적으로는 Blocking I/O를 이용한다. 다만, 현재 어플리케이션이 돌고있는 스레드에서는 요청만 하고, Blocking 상태에 빠져서 I/O의 완료를 기다리는것은 백그라운드 스레드에서 진행된다. 백그라운드 스레드는 요청이 완료되는 경우 준비된 데이터를 콜백의 형태로 어플리케이션 스레드에 전달한다.
TCP Socket을 이용한 Client 개발
언리얼에서도 소켓을 이용한 네트워킹이 가능하다. 언리얼엔진에서는 소켓을 편하게 이용하도록 FSocket과 FTcpSocketBuilder 객체를 제공해준다. 이를 이용하기 위해 Networking 모듈을 추가해주어야한다.
그리고 언리얼에서는 어떤 플랫폼에서 이 소켓이 사용될 지 모르기 때문에 소켓을 사용할 때 ISocketSubsystem이라는 인터페이스를 이용하는것을 권장한다. 이 인터페이스는 내부적으로 플랫폼에 맞는 소켓의 기능을 포워딩해주는 역할을 담당한다. 이를 위해 Sockets 모듈을 추가해준다.
Build.cs
PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "Sockets", "Networking" });
Message Header
소켓 통신을 하는 경우 항상 나오는 문제는 현재 연결된 소켓으로부터 얼마만큼의 데이터를 가져올 것인지이다. 보내는 데이터의 크기를 받는 쪽에서는 알 수 없기 때문에 생기는 문제이다. 이 문제를 해결하기 위해 몇 가지 방법이 있다.
1. 항상 일정한 사이즈의 데이터를 보내도록 프로토콜을 정한다.
2. 데이터의 끝을 결정하는 태그를 정하고, 읽기 과정에서 태그가 읽힐때까지 데이터를 가져온다.
3. 고정된 사이즈의 데이터 헤더를 정하고, 변동될 수 있는 바디 데이터의 사이즈 값을 헤더에 넣는다. Recv 과정에서는 먼저 헤더를 읽고, 헤더에 들어있는 Size 정보를 바탕으로 바디를 읽도록 한다.
이 예제에서는 3번째 방법을 이용할 것이다.
struct UNREALEXAMPLES_API FMessageHeader
{
uint32 Type;
uint32 Size;
FMessageHeader()
: Type(0), Size(0)
{
}
FMessageHeader(uint32 Type, int32 PayloadSize)
: Type(Type), Size(PayloadSize)
{
}
friend FArchive& operator<<(FArchive& Ar, FMessageHeader& Header)
{
Ar << Header.Type;
Ar << Header.Size;
return Ar;
}
};
Blocking
// Fill out your copyright notice in the Description page of Project Settings.
#pragma once
#include "CoreMinimal.h"
#include "NetworkMessage.h"
#include "TCPSocketClientUtils.h"
class FSocket;
class UNREALEXAMPLES_API FTCPSocketClient_Blocking
{
public:
static bool Receive(FSocket* Socket, uint8* Results, int32 Size);
static bool Send(FSocket* Socket, const uint8* Buffer, int32 Size);
static bool SendPacket(FSocket* Socket, uint32 Type, const TArray<uint8>& Payload);
static bool SendPacket(FSocket* Socket, uint32 Type, const uint8* Payload, int32 PayloadSize);
static bool ReceivePacket(FSocket* Socket, TArray<uint8>& OutPayload);
void Connect();
void Disconnect();
void Send(uint32 Type, const FString& Text);
void Recv();
private:
FSocket* Socket;
};
#include "Networking/TCPSocketClient/TCPSocketClient_Blocking.h"
#include "Sockets.h"
#include "Common/TcpSocketBuilder.h"
#include "Serialization/ArrayWriter.h"
#include "SocketSubsystem.h"
// if data size is too big for just one recv, it needs to be called multi times.
bool FTCPSocketClient_Blocking::Receive(FSocket* Socket, uint8* Results, int32 Size)
{
int32 NumRead = 0;
bool bSuccess = Socket->Recv(Results, Size, NumRead, ESocketReceiveFlags::Type::WaitAll);
return bSuccess;
}
bool FTCPSocketClient_Blocking::Send(FSocket* Socket, const uint8* Buffer, int32 Size)
{
int32 BytesSent = 0;
bool bSuccess = Socket->Send(Buffer, Size, BytesSent);
return bSuccess;
}
bool FTCPSocketClient_Blocking::SendPacket(FSocket* Socket, uint32 Type, const TArray<uint8>& Payload)
{
return SendPacket(Socket, Type, Payload.GetData(), Payload.Num());
}
bool FTCPSocketClient_Blocking::SendPacket(
FSocket* Socket, uint32 Type, const uint8* Payload, int32 PayloadSize)
{
// make a header for the payload
FMessageHeader Header(Type, PayloadSize);
constexpr static int32 HeaderSize = sizeof(FMessageHeader);
// serialize out the header
FBufferArchive Ar;
Ar << Header;
// append the payload bytes to send it in one network packet
Ar.Append(Payload, PayloadSize);
// Send it, and make sure it sent it all
if (!Send(Socket, Ar.GetData(), Ar.Num()))
{
UE_LOG(LogSockets, Error, TEXT("Unable To Send."));
return false;
}
return true;
}
bool FTCPSocketClient_Blocking::ReceivePacket(FSocket* Socket, TArray<uint8>& OutPayload)
{
TArray<uint8> HeaderBuffer;
int32 HeaderSize = sizeof(FMessageHeader);
HeaderBuffer.AddZeroed(HeaderSize);
// recv header
int32 BytesRead = 0;
if (!Receive(Socket, HeaderBuffer.GetData(), HeaderBuffer.Num()))
{
UE_LOG(LogTemp, Error, TEXT("Recv Header Failed."));
return false;
}
FMessageHeader Header;
{
FMemoryReader Reader(HeaderBuffer);
Reader << Header;
UE_LOG(LogTemp, Log, TEXT("Recv Header Type : %d Size : %d"), Header.Type, Header.Size);
}
int32 PayloadSize = Header.Size;
OutPayload.SetNumZeroed(PayloadSize);
if (Receive(Socket, OutPayload.GetData(), OutPayload.Num()))
{
return true;
}
return false;
}
void FTCPSocketClient_Blocking::Connect()
{
Socket = FTcpSocketBuilder(TEXT("ClientSocket"));
FString IPAddress = TEXT("127.0.0.1");
uint16 PortNumber = 11000;
if (FTCPSocketClientUtils::Connect(Socket, IPAddress, PortNumber))
{
UE_LOG(LogTemp, Log, TEXT("Socket Connected"));
Socket->SetNonBlocking(false);
}
else
{
UE_LOG(LogTemp, Error, TEXT("Socket Connect Failed."));
FTCPSocketClientUtils::DestroySocket(Socket);
}
}
void FTCPSocketClient_Blocking::Disconnect()
{
FTCPSocketClientUtils::DestroySocket(Socket);
}
void FTCPSocketClient_Blocking::Send(uint32 Type, const FString& Text)
{
SCOPE_CYCLE_COUNTER(STAT_Send);
FTCHARToUTF8 Convert(*Text);
FArrayWriter WriterArray;
WriterArray.Serialize((UTF8CHAR*)Convert.Get(), Convert.Length());
if (FTCPSocketClient_Blocking::SendPacket(Socket, Type, WriterArray))
{
UE_LOG(LogTemp, Log, TEXT("Sent Text : %s Size : %d"), *Text, WriterArray.Num());
}
}
void FTCPSocketClient_Blocking::Recv()
{
SCOPE_CYCLE_COUNTER(STAT_Recv);
TArray<uint8> Payload;
if (FTCPSocketClient_Blocking::ReceivePacket(Socket, Payload))
{
FString Data(Payload.Num(), (char*)Payload.GetData());
UE_LOG(LogTemp, Log, TEXT("Recv data success. data : %s Payload : %d size : %d"), *Data, Payload.Num(), Data.Len());
}
}
Non Blocking
다음으로 소켓 클라이언트 객체를 정의한다.
// Fill out your copyright notice in the Description page of Project Settings.
#pragma once
#include "CoreMinimal.h"
#include "NetworkMessage.h"
#include "TCPSocketClientUtils.h"
class FSocket;
class UNREALEXAMPLES_API FTCPSocketClient_NonBlocking
{
public:
static bool Receive(FSocket* Socket, uint8* Results, int32 Size);
static bool Send(FSocket* Socket, const uint8* Buffer, int32 Size);
static bool SendPacket(FSocket* Socket, uint32 Type, const TArray<uint8>& Payload);
static bool SendPacket(FSocket* Socket, uint32 Type, const uint8* Payload, int32 PayloadSize);
static bool ReceivePacket(FSocket* Socket, TArray<uint8>& OutPayload);
void Connect();
void Disconnect();
void Send(uint32 Type, const FString& Text);
void Recv();
private:
FSocket* Socket;
};
#include "Networking/TCPSocketClient/TCPSocketClient_NonBlocking.h"
#include "Sockets.h"
#include "Common/TcpSocketBuilder.h"
#include "Serialization/ArrayWriter.h"
#include "SocketSubsystem.h"
// if data size is too big for just one recv, it needs to be called multi times.
bool FTCPSocketClient_NonBlocking::Receive(FSocket* Socket, uint8* Results, int32 Size)
{
int32 Offset = 0;
while (Size > 0)
{
int32 NumRead = 0;
Socket->Recv(Results + Offset, Size, NumRead);
check(NumRead <= Size);
// make sure we were able to read at least something (and not too much)
if (NumRead <= 0)
{
return false;
}
// if we read a partial block, move along
Offset += NumRead;
Size -= NumRead;
}
return true;
}
bool FTCPSocketClient_NonBlocking::Send(FSocket* Socket, const uint8* Buffer, int32 Size)
{
while (Size > 0)
{
int32 BytesSent = 0;
if (!Socket->Send(Buffer, Size, BytesSent))
{
return false;
}
Size -= BytesSent;
Buffer += BytesSent;
}
return true;
}
bool FTCPSocketClient_NonBlocking::SendPacket(FSocket* Socket, uint32 Type, const TArray<uint8>& Payload)
{
return SendPacket(Socket, Type, Payload.GetData(), Payload.Num());
}
bool FTCPSocketClient_NonBlocking::SendPacket(
FSocket* Socket, uint32 Type, const uint8* Payload, int32 PayloadSize)
{
// make a header for the payload
FMessageHeader Header(Type, PayloadSize);
constexpr static int32 HeaderSize = sizeof(FMessageHeader);
// serialize out the header
FBufferArchive Ar;
Ar << Header;
// append the payload bytes to send it in one network packet
Ar.Append(Payload, PayloadSize);
// Send it, and make sure it sent it all
if (!Send(Socket, Ar.GetData(), Ar.Num()))
{
UE_LOG(LogSockets, Error, TEXT("Unable To Send."));
return false;
}
return true;
}
bool FTCPSocketClient_NonBlocking::ReceivePacket(FSocket* Socket, TArray<uint8>& OutPayload)
{
const int32 MaxPacketSize = 1024 * 1024;
uint32 PendingDataSize;
if (!Socket->HasPendingData(PendingDataSize) || PendingDataSize <= MaxPacketSize)
{
return false;
}
TArray<uint8> HeaderBuffer;
int32 HeaderSize = sizeof(FMessageHeader);
HeaderBuffer.AddZeroed(HeaderSize);
// recv header
int32 BytesRead = 0;
if (!Receive(Socket, HeaderBuffer.GetData(), HeaderBuffer.Num()))
{
UE_LOG(LogTemp, Error, TEXT("Recv Header Failed."));
return false;
}
FMessageHeader Header;
{
FMemoryReader Reader(HeaderBuffer);
Reader << Header;
UE_LOG(LogTemp, Log, TEXT("Recv Header Type : %d Size : %d"), Header.Type, Header.Size);
}
int32 PayloadSize = Header.Size;
OutPayload.SetNumZeroed(PayloadSize);
if (Receive(Socket, OutPayload.GetData(), OutPayload.Num()))
{
return true;
}
return false;
}
void FTCPSocketClient_NonBlocking::Connect()
{
int32 RecvBufferSize = 2 * 1024 * 1024;
Socket = FTcpSocketBuilder(TEXT("ClientSocket"));
Socket->SetNonBlocking(true);
Socket->SetReceiveBufferSize(RecvBufferSize, RecvBufferSize);
FString IPAddress = TEXT("127.0.0.1");
uint16 PortNumber = 11000;
if (FTCPSocketClientUtils::Connect(Socket, IPAddress, PortNumber))
{
UE_LOG(LogTemp, Log, TEXT("Socket Connected"));
}
else
{
FTCPSocketClientUtils::DestroySocket(Socket);
}
}
void FTCPSocketClient_NonBlocking::Disconnect()
{
FTCPSocketClientUtils::DestroySocket(Socket);
}
void FTCPSocketClient_NonBlocking::Send(uint32 Type, const FString& Text)
{
SCOPE_CYCLE_COUNTER(STAT_Send);
FTCHARToUTF8 Convert(*Text);
FArrayWriter WriterArray;
WriterArray.Serialize((UTF8CHAR*)Convert.Get(), Convert.Length());
if (FTCPSocketClient_NonBlocking::SendPacket(Socket, Type, WriterArray))
{
UE_LOG(LogTemp, Log, TEXT("Sent Text : %s Size : %d"), *Text, WriterArray.Num());
}
}
void FTCPSocketClient_NonBlocking::Recv()
{
SCOPE_CYCLE_COUNTER(STAT_Recv);
TArray<uint8> Payload;
if (FTCPSocketClient_NonBlocking::ReceivePacket(Socket, Payload))
{
FString Data(Payload.Num(), (char*)Payload.GetData());
UE_LOG(LogTemp, Log, TEXT("Recv data success. data : %s Payload : %d size : %d"), *Data, Payload.Num(), Data.Len());
}
}
Async
// Fill out your copyright notice in the Description page of Project Settings.
#pragma once
#include "CoreMinimal.h"
#include "NetworkMessage.h"
#include "TCPSocketClientUtils.h"
class UNREALEXAMPLES_API FTCPSocketClient_Async
{
public:
static TSharedPtr<FBufferArchive> CreatePacket(uint32 InType, const uint8* InPayload, int32 InPayloadSize);
void Connect();
void Disconnect();
TSharedPtr<FBufferArchive> CreatePacket(uint32 Type, const FString& Text);
private:
// all phase functions will be called at GameThread
void BeginSendPhase();
void EndSendPhase();
void BeginRecvPhase();
void EndRecvPhase();
void OnSendCompleted();
void OnSendFailed();
void OnRecvCompleted(const TArray<uint8>& InPayload);
void OnRecvFailed();
private:
FSocket* Socket;
};
#include "Networking/TCPSocketClient/TCPSocketClient_Async.h"
#include "Sockets.h"
#include "Common/TcpSocketBuilder.h"
#include "Serialization/ArrayWriter.h"
#include "SocketSubsystem.h"
#include "Networking.h"
#include "SocketSubsystemModule.h"
TSharedPtr<FBufferArchive> FTCPSocketClient_Async::CreatePacket(
uint32 InType, const uint8* InPayload, int32 InPayloadSize)
{
// make a header for the payload
FMessageHeader Header(InType, InPayloadSize);
constexpr static int32 HeaderSize = sizeof(FMessageHeader);
TSharedPtr<FBufferArchive> Packet = MakeShareable(new FBufferArchive());
// serialize out the header
(*Packet) << Header;
// append the payload bytes to send it in one network packet
Packet->Append(InPayload, InPayloadSize);
return Packet;
}
void FTCPSocketClient_Async::Connect()
{
Socket = FTcpSocketBuilder(TEXT("ClientSocket"));
FString IPAddress = TEXT("127.0.0.1");
uint16 PortNumber = 11000;
if (FTCPSocketClientUtils::Connect(Socket, IPAddress, PortNumber))
{
UE_LOG(LogTemp, Log, TEXT("Socket Connected"));
Socket->SetNonBlocking(false);
BeginSendPhase();
}
else
{
FTCPSocketClientUtils::DestroySocket(Socket);
}
}
void FTCPSocketClient_Async::Disconnect()
{
FTCPSocketClientUtils::DestroySocket(Socket);
Socket = nullptr;
}
TSharedPtr<FBufferArchive> FTCPSocketClient_Async::CreatePacket(uint32 Type, const FString& Text)
{
SCOPE_CYCLE_COUNTER(STAT_Send);
FTCHARToUTF8 Convert(*Text);
FArrayWriter WriterArray;
WriterArray.Serialize((UTF8CHAR*)Convert.Get(), Convert.Length());
TSharedPtr<FBufferArchive> Packet = CreatePacket(Type, WriterArray.GetData(), WriterArray.Num());
return Packet;
}
void FTCPSocketClient_Async::BeginSendPhase()
{
UE_LOG(LogTemp, Log, TEXT("BeginSendPhase"));
TSharedPtr<FBufferArchive> Packet = CreatePacket(0, TEXT("start packet"));
AsyncTask(ENamedThreads::AnyThread, [this, Packet]()
{
if (Socket == nullptr || this == nullptr)
{
return;
}
// send all things in queue
int32 NumSend;
bool bSuccess = Socket->Send(Packet->GetData(), Packet->Num(), NumSend);
AsyncTask(ENamedThreads::GameThread, [this, bSuccess]()
{
if (Socket == nullptr || this == nullptr)
{
return;
}
if (bSuccess)
{
// send complete
OnSendCompleted();
EndSendPhase();
}
else
{
// send failed
OnSendFailed();
EndSendPhase();
}
});
});
}
void FTCPSocketClient_Async::EndSendPhase()
{
UE_LOG(LogTemp, Log, TEXT("EndSendPhase"));
BeginRecvPhase();
}
void FTCPSocketClient_Async::BeginRecvPhase()
{
UE_LOG(LogTemp, Log, TEXT("BeginRecvPhase"));
AsyncTask(ENamedThreads::AnyThread, [this]()
{
if (Socket == nullptr || this == nullptr)
{
return;
}
TArray<uint8> HeaderBuffer;
int32 HeaderSize = sizeof(FMessageHeader);
HeaderBuffer.AddZeroed(HeaderSize);
// recv header
bool bSuccessRecvHeader = false;
int32 NumRead = 0;
bSuccessRecvHeader = Socket->Recv(
HeaderBuffer.GetData(), HeaderBuffer.Num(), NumRead, ESocketReceiveFlags::Type::WaitAll);
if (bSuccessRecvHeader)
{
// recv payload
FMessageHeader Header;
FMemoryReader Reader(HeaderBuffer);
Reader << Header;
int32 PayloadSize = Header.Size;
TArray<uint8> Payload;
Payload.SetNumZeroed(PayloadSize);
bool bSuccessRecvPayload = false;
bSuccessRecvPayload = Socket->Recv(
Payload.GetData(), Payload.Num(), NumRead, ESocketReceiveFlags::Type::WaitAll);
AsyncTask(ENamedThreads::GameThread, [this, bSuccessRecvPayload, Payload]()
{
if (Socket == nullptr || this == nullptr)
{
return;
}
if (bSuccessRecvPayload)
{
OnRecvCompleted(Payload);
EndRecvPhase();
}
else
{
UE_LOG(LogTemp, Error, TEXT("Recv Payload Failed."));
OnRecvFailed();
EndRecvPhase();
}
});
}
else
{
AsyncTask(ENamedThreads::GameThread, [this]()
{
if (Socket == nullptr || this == nullptr)
{
return;
}
UE_LOG(LogTemp, Error, TEXT("Recv Header Failed."));
OnRecvFailed();
EndRecvPhase();
});
}
});
}
void FTCPSocketClient_Async::EndRecvPhase()
{
UE_LOG(LogTemp, Log, TEXT("EndRecvPhase"));
BeginSendPhase();
}
void FTCPSocketClient_Async::OnSendCompleted()
{
UE_LOG(LogTemp, Log, TEXT("OnSendCompleted"));
}
void FTCPSocketClient_Async::OnSendFailed()
{
FString OutText;
FTCPSocketClientUtils::PrintSocketError(OutText);
UE_LOG(LogTemp, Log, TEXT("OnSendFailed Error : %s"), *OutText);
}
void FTCPSocketClient_Async::OnRecvCompleted(const TArray<uint8>& InPayload)
{
TArray<uint8> Payload;
Payload.Append(InPayload);
FString Data(Payload.Num(), (char*)Payload.GetData());
UE_LOG(LogTemp, Log, TEXT("OnRecvCompleted recv data success. data : %s Payload : %d size : %d"), *Data, Payload.Num(), Data.Len());
}
void FTCPSocketClient_Async::OnRecvFailed()
{
FString OutText;
FTCPSocketClientUtils::PrintSocketError(OutText);
UE_LOG(LogTemp, Log, TEXT("OnRecvFailed Error : %s"), *OutText);
}
Github
'게임 엔진 > Unreal' 카테고리의 다른 글
[Unreal] [DateTime] 시간 관리 구조체 (0) | 2023.09.05 |
---|---|
[Unreal] [Networking] Http Module 사용 및 API Handler 개발 (0) | 2022.10.03 |
[Unreal] [Example] Editor에 Asset 생성 및 저장 (7) | 2021.08.10 |
[Unreal] 플러그인과 모듈 (0) | 2021.03.24 |
[Unreal] Android 디버깅 방법들 (0) | 2021.03.18 |