提问者:小点点

两个网络流上的ReadAsync()--我正确地使用了“await”吗?


我使用两个readAsync()调用和task.whenAny()来处理两个网络流(TcpClient)。

下面的等待代码会错过任何数据捕获吗?

  • 代码中的Q1:如果两个流在完全相同的时间都有新数据,会发生什么?
  • 代码中的Q2:writeSync()是否会占用太长时间并丢失存储缓冲区?
  • 问3:有没有更好的方法来解决这个问题

我正在编写一段代码,用于充当TCP流的中间人过滤器(以便以后允许过滤/监视某些数据包)

概括的逻辑应该是:

  • 客户端建立到筛选器的连接,然后筛选器建立到所选服务器的新连接
  • 如果有任何数据从客户端到达,请将其保存并发送到服务器
  • 如果有任何数据从服务器到达,请将其保存并发送到客户端

存在错误处理(在列出的位置)。。我错过什么重要的东西了吗?

对于一个关于“用于套接字编程的.NET4.5异步特性”的问题,我使用了以下答案作为起点:

var read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
var read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);

try
{
  while (true)
  {
     Task<int> read_task_occurred;
     try
     {
        read_task_occurred = await Task.WhenAny(read_task_from_client, read_task_from_server);
            //Q1: What happens if both streams have new data at EXACTLY the same time?

        if (read_task_occurred.Status != TaskStatus.RanToCompletion)
        {
          Trace.WriteLine(string.Format("[{0}] - Task failure", ID, read_task_occurred.ToString()));
          break;
        }
     }
     catch (AggregateException aex)
     {
        for (int i = 0; i < aex.Data.Values.Count; i++)
        {
          var aex_item = aex.Data[i];
          Trace.WriteLine(string.Format("[{0}] - Aggregate failure {1} - {2}", ID, i, aex_item));
        }
        break;
     }

     var bytes_read = read_task_occurred.Result;
     if (read_task_occurred.Result == 0)
     {
        // If a read-operation returns zero, the stream has closed.
        OneStreamHasClosed(read_task_from_client, read_task_from_server, read_task_occurred);
        break;
     }

     if (read_task_occurred == read_task_from_client)
     {
        BytesFromClient += read_task_from_client.Result;
        Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
        await tx_stream.WriteAsync(rx_buffer, 0, bytes_read);
        await FileStream_Incoming.WriteAsync(rx_buffer, 0, bytes_read);
            // Q2: Any chance of the WriteAsync taking too long?
            //    (e.g. rx_buffer begins to be filled again before being written to tx_stream or the filestream)

        read_task_from_client = rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
     }
     else if (read_task_occurred == read_task_from_server)
     {
        BytesFromServer += read_task_from_server.Result;
        Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
        await rx_stream.WriteAsync(tx_buffer, 0, bytes_read);
        await FileStream_Outgoing.WriteAsync(tx_buffer, 0, bytes_read);

        read_task_from_server = tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
     }
  }
}
finally
{
  FileStream_Incoming.Close();
  FileStream_Outgoing.Close();
}

到目前为止,这似乎如预期的那样工作,捕获并记录多个流。。。但是,我不确定是否安全地使用了await语句

这将在以后的多个线程中运行(可能每个传入连接一个线程,但这是一个单独的主题)

通过重构原始的“await tx_stream.write.。。”和“Await xxx_filestream.write.。。”如下所述,我相信我已经能够在第二季度改善一个主要的比赛状态。仍然不确定这是否是“最佳/推荐”的解决方案:

// Code changed to a call to MultiWrite
private void MultiWrite(byte[] buffer, int bytes_read, Stream s1, Stream s2)
{
  Task writer1 = s1.WriteAsync(buffer, 0, bytes_read);
  Task writer2 = s2.WriteAsync(buffer, 0, bytes_read);
  Task.WaitAll(writer1, writer2);
}

我被告知await不允许并发任务运行。。。这使我感到困惑,因为我无法理解以下内容如何/为什么能够运行。。。

private async Task<char> SimpleTask(char x, int sleep_ms) { return await Task.Run(() => { Console.Write(x); Thread.Sleep(sleep_ms); return x; }); }
internal async void DoStuff()
{
  var a_task = SimpleTask('a', 100);
  var b_task = SimpleTask('b', 250);
  var c_task = SimpleTask('c', 333);

  while (true)
  {
    var write_task_occurred = await Task.WhenAny(a_task, b_task, c_task);
    var char_written = write_task_occurred.Result;
    switch (char_written)
    {
      case 'a': a_task = SimpleTask('a', 100); break;
      case 'b': b_task = SimpleTask('b', 250); break;
      case 'c': c_task = SimpleTask('c', 333); break;
    }
  }
}

上面的代码片段确实运行了(正如我所期望的那样,产生了以下多线程的废话:

AABACABAACABAACABAACABAACABAACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACABACAB

谁能解释一下上述方法哪里/为什么是错误的,如果是,如何改进。

我已经集成了“写入输出流和文件,在进一步read()之前,确保两个输出都在'buffer'中有数据”,并且按照我之前对Q2的更新,还拆分了代码来调用multiwrite():

根据@usr和@pekka的建议,我将代码分成两个方法,如下所示。。。

private void ProcessStreams_Good()
{
  Task t1 = CopyClientToServer(), t2 = CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}
private async void ProcessStreams_Broken()
{
  await CopyClientToServer(); await CopyServerToClient();

  Trace.WriteLine(string.Format("[{0}] - Data stats: C={1}, S={2}\r\n", ID, BytesFromClient, BytesFromServer));
  Trace.WriteLine(string.Format("[{0}] - connection closed from {1}", ID, Incoming.Client.RemoteEndPoint));
}

private async Task CopyClientToServer()
{
  var bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Client-to-Server: {1}", ID, bytes_read));
    MultiWrite(rx_buffer, bytes_read, tx_stream, FileStream_FromClient);
    bytes_read = await rx_stream.ReadAsync(rx_buffer, 0, ActiveBufferSize);
  }
}
private async Task CopyServerToClient()
{
  var bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  while (bytes_read > 0)
  {
    BytesFromClient += bytes_read; Trace.WriteLine(string.Format("[{0}] - Server-to-Client: {1}", ID, bytes_read));
    MultiWrite(tx_buffer, bytes_read, rx_stream, FileStream_FromServer);
    bytes_read = await tx_stream.ReadAsync(tx_buffer, 0, ActiveBufferSize);
  }
}

是的,我知道ProcessStreams_Break()失败和ProcessStreams_Good()按预期工作的原因。

问:这个新代码稍微整洁了一些,但有没有什么“更好”?

问题结束后,我遇到了一个异步/等待链接的最佳实践,它很有帮助。


共2个答案

匿名用户

AwaitWhenany不启动任何操作。它们只是等待正在运行的操作完成。您开始的所有读操作最终都将完成,数据将从流中取出。无论你观察结果与否,都是如此。

我知道您想在客户机到服务器之间以及服务器到客户机之间中继数据。那么,为什么不同时启动两个异步方法,每个方法执行两个中继方向中的一个呢?这样就不需要whenany和所有复杂的逻辑。你得把这个扔掉。

代码中的Q1:如果两个流在完全相同的时间都有新数据,会发生什么?

你不需要这个问题的答案。您必须处理所有开始的读取的完成,无论它们何时完成。否则就会丢失数据。也许你是在假设非完整的未完成的读取(不知何故)被取消了,而实际上只有一个读取是“取”的?!事实并非如此。全部读取完成。没有办法取消一个(不删除数据)。

代码中的第2个问题:writeSync()是否会占用太长时间并丢失存储缓冲区?

不太清楚你的意思。如果出现超时,您需要一个处理该问题的策略。通常,您会记录错误并关闭。

匿名用户

并发性完全是关于非确定性的。通道的两个端点必然具有独立的时钟,并且无法区分您首先接收到的是哪条消息(在时钟抖动内)。如果您(以及整个OS堆栈)公平地对接收到的消息进行操作并转发它们,那么发生这种情况的顺序就不相关了。

如果你想要避免任何偏见,那么发展一种情况,确实将任何偏好引入到任何方向。例如,您的测试task.whenany(read_task_from_client,read_task_from_server);可能偏向于其中一个任务。使用@usr的建议创建单独的方法来避免这种情况。

最后,在拆下会话时要非常小心。不可能完全模拟所有可能的情况,从用户代码中突然拆下一个端点可能做的事情。您的仿真保真度将受到挑战,并可能使结果无效。类似地,当另一方丢弃会话时,您可能接受了一个流上的数据。没有办法正确地从这恢复--你能再次做的最好的事情就是假装伴侣在他们能看到这之前就扔下了他们的末端。

相关问题