Недавно мне пришлось создать небольшое клиентское приложение TCP, которое подключается к прослушивателю TCP внешнего приложения и предназначено для работы с большими объемами данных и на высоких частотах.
Я создал класс-оболочку вокруг класса TCPClient, чтобы перехватывать исключения и сохранять ссылки на некоторые интересующие свойства (сетевой поток и т. д.). Вот обертка:
public class MyTCPClient
{
private string serverIP;
private int serverPort;
public TcpClient tcpClient = new TcpClient();
private IPEndPoint serverEndPoint;
private NetworkStream stream = null;
public string name;
public MyTCPClient(string serverIp, int serverPort, string parentName)
{
this.serverIP = serverIp;
this.serverPort = serverPort;
this.name = parentName + "_TCPClient";
serverEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
tcpClient.ReceiveBufferSize = 1048576;
this.TryConnect();
}
private bool TryConnect()
{
try
{
tcpClient.Connect(serverEndPoint);
}
catch (SocketException e1)
{
throw new ErrorOnConnectingException(e1, "SocketException while connecting. (see msdn Remarks section for more details. ) Error code: " + e1.ErrorCode);
}
catch (ArgumentNullException e2)
{
throw new ErrorOnConnectingException(e2, "ArgumentNullException while connecting. (The hostname parameter is null.) Message: " + e2.Message);
}
catch (ArgumentOutOfRangeException e3)
{
throw new ErrorOnConnectingException(e3, "ArgumentOutOfRangeException while connecting (The port parameter is not between MinPort and MaxPort. ). Message: " + e3.Message);
}
catch (ObjectDisposedException e4)
{
throw new ErrorOnConnectingException(e4, "ObjectDisposedException while connecting. (TcpClient is closed. ) Message: " + e4.Message);
}
try
{
stream = this.tcpClient.GetStream();
}
catch (ObjectDisposedException e1)
{
throw new ErrorOnGettingStreamException(e1, "ObjectDisposedException while acquiring Network stream. (The TcpClient has been closed. ) Message: " + e1.Message);
}
catch (InvalidOperationException e2)
{
throw new ErrorOnGettingStreamException(e2, "ArgumentOutOfRangeException while acquiring Network stream (The TcpClient is not connected to a remote host. ). Message: " + e2.Message);
}
return true;
}
public string ReadData()
{
try
{
ASCIIEncoding encoder = new ASCIIEncoding();
byte[] dataHeader = new byte[12];
if (this.tcpClient.Connected)
{
stream.Read(dataHeader, 0, 12);
}
else
{
throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
}
var strHeaderMessage = System.Text.Encoding.Default.GetString(dataHeader);
Utils.logToTimeStampedFile(strHeaderMessage, name);
int bodyAndTailCount = Convert.ToInt32(strHeaderMessage.Replace("#", ""));
byte[] dataBodyAndTail = new byte[bodyAndTailCount];
if (this.tcpClient.Connected)
{
stream.Read(dataBodyAndTail, 0, bodyAndTailCount);
}
else
{
throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
}
var strBodyAndTailMessage = System.Text.Encoding.Default.GetString(dataBodyAndTail);
Utils.logToTimeStampedFile(strBodyAndTailMessage, name);
return strBodyAndTailMessage;
}
catch (FormatException e0)
{
CloseAllLeft();
throw new ErrorOnReadingException(e0, "FormatException while reading data. (Bytes red are null or does not correspond to specification, happens on closing Server) Message: " + e0.Message);
}
catch (ArgumentNullException e1)
{
CloseAllLeft();
throw new ErrorOnReadingException(e1, "ArgumentNullException while reading data. (The buffer parameter is null.) Message: " + e1.Message);
}
catch (ArgumentOutOfRangeException e2)
{
CloseAllLeft();
throw new ErrorOnReadingException(e2, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e2.Message);
}
catch (IOException e3)
{
CloseAllLeft();
throw new ErrorOnReadingException(e3, "IOException while reading data. (The underlying Socket is closed.) Message: " + e3.Message);
}
catch (ObjectDisposedException e4)
{
CloseAllLeft();
throw new ErrorOnReadingException(e4, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e4.Message);
}
}
public void CloseAllLeft()
{
try
{
stream.Close();
}
catch (Exception e)
{
Console.WriteLine("Exception closing tcp network stream: " + e.Message);
}
try
{
tcpClient.Close();
}
catch (Exception e)
{
Console.WriteLine("Exception closing tcpClient: " + e.Message);
}
}
}
Тем не менее, ничего не упоминается о потоках, использующих этот MyTCPClient. Приложение должно иметь два таких TCP-клиента, подключающихся к разным портам и выполняющих разные задания. Я был новичком в программировании TCP, и после некоторого блуждания по свойствам я решил использовать подход блокирующего чтения - т.е. по умолчанию метод TCPClient.Read() будет блокировать поток, пока не появятся новые данные. Мне нужен такой подход, потому что я не контролирую прослушиватель внешнего приложения, и единственным способом распознать закрытие сервера было «нулевые байты», отправленные в соответствии со спецификациями TCP Sockets.
Итак, я создаю абстрактный класс, который будет поддерживать и контролировать потоки, которые позже будут использовать вышеупомянутый класс MyTCPClient (который по замыслу в конечном итоге может блокировать родительские объявления). Вот код моего абстрактного TCPManager:
/// <summary>
/// Serves as a dispatcher for the high frequency readings from the TCP pipe.
/// Each time the thread is started it initializes new TCPClients which will attempt to connect to server.
/// Once established a TCP socket connection is alive until the thread is not requested to stop.
///
/// Error hanling level here:
///
/// Resources lke NetworkStream and TCPClients are ensured to be closed already within the myTCPClient class, and the error handling here
/// is steps on top of that - sending proper emails, notifications and logging.
///
/// </summary>
public abstract class AbstractmyTCPClientManager
{
public string name;
public string serverIP;
public int serverPort;
public Boolean requestStop = false;
public Boolean MyTCPClientThreadRunning = false;
public Boolean requestStart = false;
public myTCPClient myTCPClient;
public int sleepInterval;
public Thread MyTCPClientThread;
public AbstractmyTCPClientManager(string name, string serverIP, int serverPort)
{
this.name = name;
this.serverIP = serverIP;
this.serverPort = serverPort;
}
public void ThreadRun()
{
MyTCPClientThreadRunning = false;
bool TCPSocketConnected = false;
bool AdditionalInitializationOK = false;
// keep trying to init requested tcp clients
while (!MyTCPClientThreadRunning && !requestStop) // and we are not suggested to stop
{
while (!TCPSocketConnected && !requestStop) // and we are not suggested to stop)
{
try
{
myTCPClient = new myTCPClient(serverIP, serverPort, name);
TCPSocketConnected = true;
}
catch (ErrorOnConnectingException e0)
{
// nah, too long message
string detail = e0.originalException != null ? e0.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Creating connection attempt failed.(1." + e0.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
//Utils.logToTimeStampedFile(e0.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
Thread.Sleep(10000);
}
catch (ErrorOnGettingStreamException e1)
{
// nah, too long message
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Getting network stream attempt failed. (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
//Utils.logToTimeStampedFile(e1.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
Thread.Sleep(10000);
}
}
Utils.logToTimeStampedFile("TCP Communication established", name);
while (!AdditionalInitializationOK && !requestStop) // or we are not suggested to stop
{
try
{
AdditionalInitialization();
AdditionalInitializationOK = true;
}
catch (AdditionalInitializationException e1)
{
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Additional initialization failed (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds", name);
Utils.logToTimeStampedFile(e1.customMessage + ". Will retry in 10 seconds", name);
Thread.Sleep(10000);
}
}
MyTCPClientThreadRunning = TCPSocketConnected && AdditionalInitializationOK;
ViewModelLocator.ControlTabStatic.updateUIButtons();
}
Utils.logToTimeStampedFile("Additional Initialization successfully completed, thread started", name);
// while all normal (i.e nobody request a stop) continiously sync with server (read data)
while (!requestStop)
{
try
{
syncWithInterface();
}
catch (ErrorOnReadingException e1)
{
string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
//Utils.logToTimeStampedFile("Error ocured while reading data. (1." + e1.customMessage + " 2." + detail + ")", name);
Utils.logToTimeStampedFile(e1.customMessage, name);
if (!requestStop) // i.e if this indeed is an exception, during a normal flow, and nobody requested a thread stop (which migh cause read exceptions as a consequence)
{
Utils.logToTimeStampedFile("There was no external stop request, when the error occured, doing tcp client restart.", name);
requestStop = true;
requestStart = true;
}
}
Thread.Sleep(sleepInterval);
}
// we need to close all after execution, but the execution may be closed before/while resources were still initializing
if (TCPSocketConnected)
{
myTCPClient.CloseAllLeft();
}
if (AdditionalInitializationOK)
{
ReleaseAdditionalResources();
}
// remember that thread is stoped
MyTCPClientThreadRunning = false;
Utils.logToTimeStampedFile("Thread stoped", name);
ViewModelLocator.ControlTabStatic.updateUIButtons();
// this serves as a restart
if (requestStart)
{
Utils.logToTimeStampedFile("Restarting thread...", name);
this.requestStop = false;
this.requestStart = false; // we are already processing a request start event, so reset this flag
this.MyTCPClientThread = new Thread(new ThreadStart(this.ThreadRun));
this.MyTCPClientThread.Name = this.name;
this.MyTCPClientThread.IsBackground = true;
this.MyTCPClientThread.Start();
}
}
/// <summary>
/// this method empties the entire TCP buffer, cycling through it
/// </summary>
private void syncWithInterface()
{
int counter = 0;
// read at most 100 messages at once (we assume that for 3 sec interval there might not be more,
//even if they are, it is still OK, they just will be processed next time)
while (counter < 100)
{
counter++;
string data = myTCPClient.ReadData();
ForwardData(data);
}
// below is left for testing:
/*
* "Sleep(0) or Yield is occasionally useful in production code for
* advanced performance tweaks. It’s also an excellent diagnostic tool
* for helping to uncover thread safety issues: if inserting Thread.Yield()
* anywhere in your code makes or breaks the program, you almost certainly have a bug."*/
Thread.Yield();
}
/// <summary>
/// Left for implementing in the caller that initialized the object. Meaning: one and the same way for receiving market/order data. Different ways of processing this data
/// </summary>
/// <param name="data"></param>
public abstract void ForwardData(string data);
/// <summary>
/// left for implementing in child classes. Its purpose is to initialize any additional resources needed for the thread to operate.
/// If something goes wrong while getting this additional resources,
/// an AdditionalInitialization exception should be thrown, which is than handled from the initialization phase in the caller.
/// </summary>
public abstract void AdditionalInitialization();
// countrapart of AdditionalInitialization method - what is initialized should be then closed
public abstract void ReleaseAdditionalResources();
}
Позже каждый необходимый канал связи TCP будет иметь выделенную реализацию для вышеупомянутого абстрактного класса, обеспечивающую реализацию методов ForwardData (т.е. что делать с этими данными) и AdditionalInitialization (т.е. что еще необходимо инициализировать перед конкретной обработкой связи TCP). Например, для одного из моих потоков требуется дополнительное хранилище. Поток должен быть инициализирован до получения данных).
Все было хорошо, кроме закрытия обработки TCP. У меня были эти переменные requestStop, чтобы контролировать, должен ли поток завершаться или продолжаться, но дело в том, что метод Read() может попасть в непрерывную блокировку, предотвращая чтение даже переменной requestStop (я должен сказать, что два tcp-канала, которые мне нужны, процесса сильно отличаются тем, что один из них получает данные очень часто, а другой — спорадически). Я все еще хотел бы, чтобы они реализовывали тот же дизайн. Итак, из того, что я читаю до сих пор, мне нужно реализовать другой, «родительский», или «контролирующий», или «оболочный» поток, который фактически возьмет на себя работу по наблюдению за параметром requestStop.
Я ищу решения, подобные этому post или таймеры, подобные этому post
Любые предложения будут ценны. Спасибо!
Wait()
блокирует выполнение потока, который вызывает ожидание завершения задачи. 13.03.2013