Хобрук: Ваш путь к мастерству в программировании

C#/Сокеты TCP/Блокировка чтения/Как закрыть такие потоки

Недавно мне пришлось создать небольшое клиентское приложение TCP, которое подключается к прослушивателю TCP внешнего приложения и предназначено для работы с большими объемами данных и на высоких частотах.

Я создал класс-оболочку вокруг класса TCPClient, чтобы перехватывать исключения и сохранять ссылки на некоторые интересующие свойства (сетевой поток и т. д.). Вот обертка:

public class MyTCPClient
    {

        private string serverIP;
        private int serverPort;

        public TcpClient tcpClient = new TcpClient();
        private IPEndPoint serverEndPoint;
        private NetworkStream stream = null;

        public string name;

        public MyTCPClient(string serverIp, int serverPort, string parentName)
        {
            this.serverIP = serverIp;
            this.serverPort = serverPort;
            this.name = parentName + "_TCPClient";

            serverEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);

            tcpClient.ReceiveBufferSize = 1048576;

            this.TryConnect();
        }

        private bool TryConnect()
        {
            try
            {
                tcpClient.Connect(serverEndPoint);
            }
            catch (SocketException e1)
            {
                throw new ErrorOnConnectingException(e1, "SocketException while connecting. (see msdn Remarks section for more details. ) Error code: " + e1.ErrorCode);
            }
            catch (ArgumentNullException e2)
            {
                throw new ErrorOnConnectingException(e2, "ArgumentNullException while connecting. (The hostname parameter is null.) Message: " + e2.Message);
            }
            catch (ArgumentOutOfRangeException e3)
            {
                throw new ErrorOnConnectingException(e3, "ArgumentOutOfRangeException while connecting (The port parameter is not between MinPort and MaxPort. ). Message: " + e3.Message);
            }
            catch (ObjectDisposedException e4)
            {
                throw new ErrorOnConnectingException(e4, "ObjectDisposedException while connecting. (TcpClient is closed. ) Message: " + e4.Message);
            }


            try
            {
                stream = this.tcpClient.GetStream();
            }
            catch (ObjectDisposedException e1)
            {
                throw new ErrorOnGettingStreamException(e1, "ObjectDisposedException while acquiring Network stream. (The TcpClient has been closed. ) Message: " + e1.Message);
            }
            catch (InvalidOperationException e2)
            {
                throw new ErrorOnGettingStreamException(e2, "ArgumentOutOfRangeException while acquiring Network stream (The TcpClient is not connected to a remote host.  ). Message: " + e2.Message);
            }

            return true;
        }

        public string ReadData()
        {
            try
            {
                ASCIIEncoding encoder = new ASCIIEncoding();

                byte[] dataHeader = new byte[12];
                if (this.tcpClient.Connected)
                {
                    stream.Read(dataHeader, 0, 12);
                }
                else
                {
                    throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
                }

                var strHeaderMessage = System.Text.Encoding.Default.GetString(dataHeader);

                Utils.logToTimeStampedFile(strHeaderMessage, name);

                int bodyAndTailCount = Convert.ToInt32(strHeaderMessage.Replace("#", ""));
                byte[] dataBodyAndTail = new byte[bodyAndTailCount];

                if (this.tcpClient.Connected)
                {
                    stream.Read(dataBodyAndTail, 0, bodyAndTailCount);
                }
                else
                {
                    throw new ErrorOnReadingException(null, "The underlying TCP tcpClient is not connected any more");
                }

                var strBodyAndTailMessage = System.Text.Encoding.Default.GetString(dataBodyAndTail);

                Utils.logToTimeStampedFile(strBodyAndTailMessage, name);

                return strBodyAndTailMessage;

            }
            catch (FormatException e0)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e0, "FormatException while reading data. (Bytes red are null or does not correspond to specification, happens on closing Server) Message: " + e0.Message);
            }
            catch (ArgumentNullException e1)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e1, "ArgumentNullException while reading data. (The buffer parameter is null.) Message: " + e1.Message);
            }
            catch (ArgumentOutOfRangeException e2)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e2, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e2.Message);
            }
            catch (IOException e3)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e3, "IOException while reading data. (The underlying Socket is closed.) Message: " + e3.Message);
            }
            catch (ObjectDisposedException e4)
            {
                CloseAllLeft();
                throw new ErrorOnReadingException(e4, "ArgumentOutOfRangeException while reading data. (see msdn description) Message: " + e4.Message);
            }
        }

        public void CloseAllLeft()
        {
            try
            {
                stream.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception closing tcp network stream: " + e.Message);
            }
            try
            {
                tcpClient.Close();
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception closing tcpClient: " + e.Message);
            }
        }
    }

Тем не менее, ничего не упоминается о потоках, использующих этот MyTCPClient. Приложение должно иметь два таких TCP-клиента, подключающихся к разным портам и выполняющих разные задания. Я был новичком в программировании TCP, и после некоторого блуждания по свойствам я решил использовать подход блокирующего чтения - т.е. по умолчанию метод TCPClient.Read() будет блокировать поток, пока не появятся новые данные. Мне нужен такой подход, потому что я не контролирую прослушиватель внешнего приложения, и единственным способом распознать закрытие сервера было «нулевые байты», отправленные в соответствии со спецификациями TCP Sockets.

Итак, я создаю абстрактный класс, который будет поддерживать и контролировать потоки, которые позже будут использовать вышеупомянутый класс MyTCPClient (который по замыслу в конечном итоге может блокировать родительские объявления). Вот код моего абстрактного TCPManager:

/// <summary>
    /// Serves as a dispatcher for the high frequency readings from the TCP pipe.
    /// Each time the thread is started it initializes new TCPClients which will attempt to connect to server.
    /// Once established a TCP socket connection is alive until the thread is not requested to stop.
    ///
    /// Error hanling level here:
    ///
    /// Resources lke NetworkStream and TCPClients are ensured to be closed already within the myTCPClient class, and the error handling here
    /// is steps on top of that - sending proper emails, notifications and logging.
    ///
    /// </summary>
    public abstract class AbstractmyTCPClientManager
    {

        public string name;
        public string serverIP;
        public int serverPort;

        public Boolean requestStop = false;
        public Boolean MyTCPClientThreadRunning = false;
        public Boolean requestStart = false;

        public myTCPClient myTCPClient;

        public int sleepInterval;

        public Thread MyTCPClientThread;

        public AbstractmyTCPClientManager(string name, string serverIP, int serverPort)
        {
            this.name = name;
            this.serverIP = serverIP;
            this.serverPort = serverPort;
        }

        public void ThreadRun()
        {
            MyTCPClientThreadRunning = false;
            bool TCPSocketConnected = false;
            bool AdditionalInitializationOK = false;

            // keep trying to init requested tcp clients
            while (!MyTCPClientThreadRunning && !requestStop) // and we are not suggested to stop
            {
                while (!TCPSocketConnected && !requestStop) // and we are not suggested to stop)
                {
                    try
                    {
                        myTCPClient = new myTCPClient(serverIP, serverPort, name);

                        TCPSocketConnected = true;
                    }
                    catch (ErrorOnConnectingException e0)
                    {

                        // nah, too long message
                        string detail = e0.originalException != null ? e0.originalException.Message : "No inner exception";
                        //Utils.logToTimeStampedFile("Creating connection attempt failed.(1." + e0.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
                        //Utils.logToTimeStampedFile(e0.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);
                        Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);

                        Thread.Sleep(10000);
                    }
                    catch (ErrorOnGettingStreamException e1)
                    {
                        // nah, too long message
                        string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";
                        //Utils.logToTimeStampedFile("Getting network stream attempt failed. (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds...", name);
                        //Utils.logToTimeStampedFile(e1.customMessage + " (" + detail + "). Will retry in 10 seconds...", name);

                        Utils.logToTimeStampedFile(detail + ". Will retry in 10 seconds...", name);
                        Thread.Sleep(10000);
                    }
                }
                Utils.logToTimeStampedFile("TCP Communication established", name);

                while (!AdditionalInitializationOK && !requestStop) // or we are not suggested to stop
                {
                    try
                    {
                        AdditionalInitialization();

                        AdditionalInitializationOK = true;

                    }
                    catch (AdditionalInitializationException e1)
                    {
                        string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";

                        //Utils.logToTimeStampedFile("Additional initialization failed (1." + e1.customMessage + " 2." + detail + "). Will retry in 10 seconds", name);

                        Utils.logToTimeStampedFile(e1.customMessage + ". Will retry in 10 seconds", name);
                        Thread.Sleep(10000);
                    }
                }

                MyTCPClientThreadRunning = TCPSocketConnected && AdditionalInitializationOK;
                ViewModelLocator.ControlTabStatic.updateUIButtons();
            }
            Utils.logToTimeStampedFile("Additional Initialization successfully completed, thread started", name);

            // while all normal (i.e nobody request a stop) continiously sync with server (read data)
            while (!requestStop)
            {
                try
                {
                    syncWithInterface();
                }
                catch (ErrorOnReadingException e1)
                {
                    string detail = e1.originalException != null ? e1.originalException.Message : "No inner exception";

                    //Utils.logToTimeStampedFile("Error ocured while reading data. (1." + e1.customMessage + " 2." + detail + ")", name);
                    Utils.logToTimeStampedFile(e1.customMessage, name);

                    if (!requestStop) // i.e if this indeed is an exception, during a normal flow, and nobody requested a thread stop (which migh cause read exceptions as a consequence)
                    {
                        Utils.logToTimeStampedFile("There was no external stop request, when the error occured, doing tcp client restart.", name);
                        requestStop = true;
                        requestStart = true;
                    }
                }

                Thread.Sleep(sleepInterval);
            }

            // we need to close all after execution, but the execution may be closed before/while resources were still initializing
            if (TCPSocketConnected)
            {
                myTCPClient.CloseAllLeft();
            }
            if (AdditionalInitializationOK)
            {
                ReleaseAdditionalResources();
            }

            // remember that thread is stoped
            MyTCPClientThreadRunning = false;
            Utils.logToTimeStampedFile("Thread stoped", name);
            ViewModelLocator.ControlTabStatic.updateUIButtons();

            // this serves as a restart
            if (requestStart)
            {
                Utils.logToTimeStampedFile("Restarting thread...", name);
                this.requestStop = false;
                this.requestStart = false; // we are already processing a request start event, so reset this flag

                this.MyTCPClientThread = new Thread(new ThreadStart(this.ThreadRun));
                this.MyTCPClientThread.Name = this.name;
                this.MyTCPClientThread.IsBackground = true;
                this.MyTCPClientThread.Start();
            }
        }

        /// <summary>
        /// this method empties the entire TCP buffer, cycling through it
        /// </summary>
        private void syncWithInterface()
        {
            int counter = 0;
            // read at most 100 messages at once (we assume that for 3 sec interval there might not be more,
            //even if they are, it is still OK, they just will be processed next time)
            while (counter < 100)
            {
                counter++;
                string data = myTCPClient.ReadData();
                ForwardData(data);
            }

            // below is left for testing:
            /*
             * "Sleep(0) or Yield is occasionally useful in production code for
             * advanced performance tweaks. It’s also an excellent diagnostic tool
             * for helping to uncover thread safety issues: if inserting Thread.Yield()
             * anywhere in your code makes or breaks the program, you almost certainly have a bug."*/
            Thread.Yield();
        }

        /// <summary>
        /// Left for implementing in the caller that initialized the object. Meaning: one and the same way for receiving market/order data. Different ways of processing this data
        /// </summary>
        /// <param name="data"></param>
        public abstract void ForwardData(string data);

        /// <summary>
        /// left for implementing in child classes. Its purpose is to initialize any additional resources needed for the thread to operate.
        /// If something goes wrong while getting this additional resources,
        /// an AdditionalInitialization exception should be thrown, which is than handled from the initialization phase in the caller.
        /// </summary>
        public abstract void AdditionalInitialization();

        // countrapart of AdditionalInitialization method - what is initialized should be then closed
        public abstract void ReleaseAdditionalResources();
    }

Позже каждый необходимый канал связи TCP будет иметь выделенную реализацию для вышеупомянутого абстрактного класса, обеспечивающую реализацию методов ForwardData (т.е. что делать с этими данными) и AdditionalInitialization (т.е. что еще необходимо инициализировать перед конкретной обработкой связи TCP). Например, для одного из моих потоков требуется дополнительное хранилище. Поток должен быть инициализирован до получения данных).

Все было хорошо, кроме закрытия обработки TCP. У меня были эти переменные requestStop, чтобы контролировать, должен ли поток завершаться или продолжаться, но дело в том, что метод Read() может попасть в непрерывную блокировку, предотвращая чтение даже переменной requestStop (я должен сказать, что два tcp-канала, которые мне нужны, процесса сильно отличаются тем, что один из них получает данные очень часто, а другой — спорадически). Я все еще хотел бы, чтобы они реализовывали тот же дизайн. Итак, из того, что я читаю до сих пор, мне нужно реализовать другой, «родительский», или «контролирующий», или «оболочный» поток, который фактически возьмет на себя работу по наблюдению за параметром requestStop.

Я ищу решения, подобные этому post или таймеры, подобные этому post

Любые предложения будут ценны. Спасибо!


Ответы:


1

Я бы рекомендовал вызвать метод ReadAsync NetworkStream и передать ему CancellationToken. . Таким образом, операция чтения может быть легко отменена (из другого потока), когда наблюдается событие остановки запроса:

public class MyTCPClient : IDisposable
{
  ...
  private CancellationTokenSource cancellationTokenSource = new CancellationTokenSource ();
  ...

  public string ReadData()
  {
     ...
     byte[] dataHeader = new byte[12];
     if (this.tcpClient.Connected)
     {
         stream.ReadAsync(dataHeader, 0, 12, cancellationTokenSource.Token).Wait();
     } ...
12.03.2013
  • Большое спасибо, я видел, что эти методы доступны только в .Net 4.5 (извините, забыл упомянуть, что я работаю с win 7, .Net 4 и в целом - я немного новичок в программировании на С#). Но этими настройками можно управлять в рабочей среде, поэтому определенно лучше использовать .Net 4.5. Сейчас я устанавливаю/читаю и т.д. Думаю, у меня возникнут дополнительные вопросы по этому объекту CancellationTokenSource, но я попытаюсь сначала прочитать/кодировать, а затем вернуться сюда. 12.03.2013
  • Ок, возник еще вопрос. Сразу скажу - я все еще осматриваюсь и читаю, потому что оказалось, что для использования .Net 4.5 я также получил бы лицензию на Visual Studio 2012. 13.03.2013
  • Как (кто) гарантирует, что я буду получать все сообщения в том порядке, в котором они пришли. Возможно, это совсем не так, но я представляю себе ситуацию, когда один асинхронный вызов запускается, читает одно сообщение и по какой-то причине следующий начавшийся асинхронный вызов завершается раньше первого. (сообщения должны быть проанализированы, сопоставлены с некоторыми номенклатурами из БД и т. д.) В конце у меня есть очередь, содержащая объекты, построенные на основе байтов в сообщении tcp, и для моего приложения важно, чтобы они были в том же порядке. Является ли это Метод Wait(), вызванный для объекта Task, возвращен из формы ReadAsync? 13.03.2013
  • @user2160353 user2160353 Да, порядок получения вами событий не изменится, приведенный выше фрагмент кода функционально эквивалентен вашему опубликованному коду — вызов Wait() блокирует выполнение потока, который вызывает ожидание завершения задачи. 13.03.2013
  • Хорошо. Я думаю, теперь это просто некоторая инфраструктура, которую нужно сделать вокруг VS, .Net 4.5 и т. Д. Еще раз спасибо! 13.03.2013

  • 2

    Лично я бы использовал для этого асинхронные сокеты: http://msdn.microsoft.com/en-us/library/bbx2eya8.aspx

    Однако, если вы все еще хотите использовать блокирующие чтения, вы можете просто закрыть () сокет из другого потока.

    Я надеюсь, что это поможет.

    12.03.2013

    3

    Установите логическое значение requestStop и закройте клиентский сокет из другого потока. Это приводит к тому, что вызов read() возвращает "рано" с ошибкой/исключением. Клиентский поток может проверять 'requestStop' после каждого возврата read() и очищать/выходить, если требуется.

    TBH, я все равно редко заморачиваюсь явным отключением таких клиентов. Я просто оставляю их, пока приложение не закроется.

    12.03.2013
  • О, если этот ответ полезен, пожалуйста, проголосуйте за @brain - он предлагал то же самое ранее, но я пропустил это. 12.03.2013
  • Новые материалы

    Понимание СТРУКТУРЫ ДАННЫХ И АЛГОРИТМА.
    Что такое структуры данных и алгоритмы? Термин «структура данных» используется для описания того, как данные хранятся, а алгоритм используется для описания того, как данные сжимаются. И данные, и..

    Как интегрировать модель машинного обучения на ios с помощью CoreMl
    С выпуском новых функций, таких как CoreML, которые упростили преобразование модели машинного обучения в модель coreML. Доступная модель машинного обучения, которую можно преобразовать в модель..

    Создание успешной организации по науке о данных
    "Рабочие часы" Создание успешной организации по науке о данных Как создать эффективную группу по анализу данных! Введение Это обзорная статья о том, как создать эффективную группу по..

    Технологии и проблемы будущей работы
    Изучение преимуществ и недостатков технологий в образовании В быстро меняющемся мире технологии являются решающим фактором в формировании будущего работы. Многие отрасли уже были..

    Игорь Минар из Google приедет на #ReactiveConf2017
    Мы рады сообщить еще одну замечательную новость: один из самых востребованных спикеров приезжает в Братиславу на ReactiveConf 2017 ! Возможно, нет двух других кланов разработчиков с более..

    Я собираюсь научить вас Python шаг за шагом
    Привет, уважаемый энтузиаст Python! 👋 Готовы погрузиться в мир Python? Сегодня я приготовил для вас кое-что интересное, что сделает ваше путешествие более приятным, чем шарик мороженого в..

    Альтернатива шаблону исходящих сообщений для архитектуры микросервисов
    Познакомьтесь с двухэтапным сообщением В этой статье предлагается альтернативный шаблон для папки Исходящие : двухэтапное сообщение. Он основан не на очереди сообщений, а на..