我正在尝试通过HTTP将数据流式传输到客户端。为了实现这一点,我使用了带有WebHttp绑定
的WCF服务。问题是我的操作返回的System.IO.Stream
在我可以向其写入某些内容之前已经关闭。我想保持流打开,直到需要向其写入数据。通常这不会超过半分钟。
在服务请求方法中,我创建了一个新的System.IO实例。MemoryStream
我将其放入所有流的集合中,并将其作为函数输出返回。稍后,当有可用的音频数据时,我将写入集合中的所有流。但到那时,所有请求都已关闭。当我转到endpointurl时,浏览器标准播放器完全置灰。我还使用REST客户端进行了测试,它显示请求在返回
语句后立即关闭。
问题是我们使用libmp3lame
SDK来检索音乐。这会在每个周期发送8192字节的PCM数据。我们希望用户可以从Chromecast设备播放他们的音乐。Chromecast不支持PCM数据,这就是为什么我们使用libmp3lame
将其转换为MP3,然后通过输出流将其发送到Chromecast。对于这种方法,即使没有实际数据通过Stream
发送,我们也需要保持连接。
可以在此处找到Libspotify音乐传递回调。
这是我设置服务的方式:
/// <summary>
/// The WCF service host.
/// </summary>
private ServiceHost ServiceHost;
/// <summary>
/// Start the HTTP WCF service.
/// </summary>
public void startListening()
{
if (ServiceHost == null)
{
ServiceHost = new ServiceHost(typeof(StreamingService));
var binding = new WebHttpBinding(WebHttpSecurityMode.None);
binding.TransferMode = TransferMode.StreamedResponse;
var endpoint = ServiceHost.AddServiceEndpoint(typeof(StreamingContract), binding, new Uri(streamAddress));
endpoint.EndpointBehaviors.Add(new WebHttpBehavior());
ServiceHost.Open();
}
}
这是服务实现:
[ServiceContract(Name="StreamingContract")]
interface StreamingContract
{
[WebGet(UriTemplate="audio")]
[OperationContract()]
Stream Audio();
}
[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
IncludeExceptionDetailInFaults = true)]
public class StreamingService : StreamingContract
{
public System.IO.Stream Audio()
{
var stream = new System.IO.MemoryStream();
App.Logic.streaming.streams.Add(stream);
WebOperationContext.Current.OutgoingResponse.ContentType = "audio/mp3";
WebOperationContext.Current.OutgoingResponse.ContentLength = 1000;
return stream;
}
}
我还尝试了在服务合同
中的Audio()
上设置:[OperationContext(AutoDisposeParameter=false)]
。这只是开始抛出System. InvalidOperationException
。我还认为可能是内容长度未知的问题,这也没有帮助。
您的服务正在做它应该做的事情——返回一个空流,然后关闭连接。
听起来你想等待这个流被异步填充。为了做到这一点,你必须实现某种回调。你应该研究Task.Run()
方法,因为这将是实现异步逻辑的标准方法。NET。
希望您可以使用此示例作为答案。在此示例中,您可以将流异步放入服务器。解决方案经过测试和验证。
以下是一个HTTPWCF服务(服务器)的示例,该服务通过异步设置托管:
Uri baseAddress = new Uri("http://localhost:8000/Service1/");
// Step 2 Create a ServiceHost instance to host the service
using (ServiceHost selfHost = new ServiceHost(typeof(Service1), baseAddress)) // type of class that implements service contract, and base address of service.
{
try
{
WebHttpBinding binding = new WebHttpBinding();
//BasicHttpBinding binding = new BasicHttpBinding();
binding.TransferMode = TransferMode.Streamed;
binding.MaxReceivedMessageSize = int.MaxValue; //"1000000000000"
binding.ReceiveTimeout = new TimeSpan(1, 0, 0); //"01:00:00";
binding.SendTimeout = new TimeSpan(1, 0, 0); //"01:00:00";
//binding.ReaderQuotas. = int.MaxValue;
// Step 3 Add a service endpoint to host. Endpoint consist of address, binding and service contract.
// Note this is optional in Framework 4.0 and upward. generate auto default.
selfHost.AddServiceEndpoint(typeof(IService1), binding, "").EndpointBehaviors.Add(new WebHttpBehavior()); // service contract interface, binding, address
// Step 5 Start the service.
// Open host to listen for incoming messages.
selfHost.Open();
Console.WriteLine("The service is ready.");
Console.WriteLine("Press <ENTER> to terminate service.");
Console.WriteLine();
Console.ReadLine();
// Close the ServiceHostBase to shutdown the service.
selfHost.Close();
}
catch (CommunicationException ce)
{
Console.WriteLine("An exception occurred: {0}", ce.Message);
selfHost.Abort();
}
}
}
}
}
这是实际的服务接口impl:
[ServiceContract]
//[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public interface IService1
{
/// <summary>
/// An asynchronous service side upload operation.
/// </summary>
/// <param name="token">An application arbitrary piece of data. Can be used for request obfuscation.</param>
/// <param name="data">The data being uploaded.</param>
/// <param name="callback">Callback for async pattern, client does not pass this.</param>
/// <param name="asyncState">User state for async pattern, client does not pass this.</param>
/// <remarks>
/// The <paramref name="token"/> parameter is the only parameter passed in the URL by the client. The <paramref name="data"/>
/// parameter is the request body, the file being uploaded.
/// </remarks>
/// <returns></returns>
[OperationContract(AsyncPattern = true)]
[WebInvoke(Method = "PUT", UriTemplate = "asyncupload/")]
IAsyncResult BeginAsyncUpload(Stream data, AsyncCallback callback, object asyncState);
/// <summary>
/// Ends the asynchonous operation initiated by the call to <see cref="BeginAsyncUpload"/>.
/// </summary>
/// <remarks>
/// This is called by the WCF framework service side. NOTE: There is no <see cref="OperationContractAttribute"/> decorating
/// this method.
/// </remarks>
/// <param name="ar"></param>
void EndAsyncUpload(IAsyncResult ar);
}
和实施:
public class Service1 : IService1
{
/// <summary>
/// <see cref="IUpload.Upload"/>
/// </summary>
/// <param name="token">This parameter is ignored.</param>
/// <param name="data">Data being uploaded.</param>
/// <param name="callback">Async callback.</param>
/// <param name="asyncState">Async user state.</param>
public IAsyncResult BeginAsyncUpload(Stream data, AsyncCallback callback, object asyncState)
{
return new CompletedAsyncResult<Stream>(data);
}
/// <summary>
/// <see cref="IUpload.EndAsyncUpload"/>
/// </summary>
public void EndAsyncUpload(IAsyncResult ar)
{
Stream data = ((CompletedAsyncResult<Stream>)ar).Data;
_streamToFile(data);
}
/// <summary>
/// Writes the uploaded stream to a file.
/// </summary>
/// <remarks>
/// This function is just to prove a test. This simple saves the uploaded data into a file named "upload.dat" in a subdirectory
/// whose name is created by a generated guid.
/// </remarks>
private static void _streamToFile(Stream data)
{
// create name of subdirectory
string subDir = Guid.NewGuid().ToString("N");
// get full path to and create the directory to save file in
string uploadDir = Path.Combine(Path.GetDirectoryName(typeof(Service1).Assembly.Location), subDir);
Directory.CreateDirectory(uploadDir);
// 64 KiB buffer
byte[] buff = new byte[0x10000];
// save the file in chunks
using (FileStream fs = new FileStream(Path.Combine(uploadDir, "upload.xml"), FileMode.Create))
{
int bytesRead = data.Read(buff, 0, buff.Length);
while (bytesRead > 0)
{
fs.Write(buff, 0, bytesRead);
bytesRead = data.Read(buff, 0, buff.Length);
}
}
}
此外,在此项目中添加一个具有以下内容的类:
internal class CompletedAsyncResult<T> : IAsyncResult
{
T data;
public CompletedAsyncResult(T data)
{ this.data = data; }
public T Data
{ get { return data; } }
#region IAsyncResult Members
public object AsyncState
{ get { return (object)data; } }
public WaitHandle AsyncWaitHandle
{ get { throw new Exception("The method or operation is not implemented."); } }
public bool CompletedSynchronously
{ get { return true; } }
public bool IsCompleted
{ get { return true; } }
#endregion
}
internal class AsyncResultNoResult : IAsyncResult
{
// Fields set at construction which never change while
// operation is pending
private readonly AsyncCallback m_AsyncCallback;
private readonly Object m_AsyncState;
// Fields set at construction which do change after
// operation completes
private const Int32 c_StatePending = 0;
private const Int32 c_StateCompletedSynchronously = 1;
private const Int32 c_StateCompletedAsynchronously = 2;
private Int32 m_CompletedState = c_StatePending;
// Field that may or may not get set depending on usage
private ManualResetEvent m_AsyncWaitHandle;
// Fields set when operation completes
private Exception m_exception;
public AsyncResultNoResult(AsyncCallback asyncCallback, Object state)
{
m_AsyncCallback = asyncCallback;
m_AsyncState = state;
}
public void SetAsCompleted(
Exception exception, Boolean completedSynchronously)
{
// Passing null for exception means no error occurred.
// This is the common case
m_exception = exception;
// The m_CompletedState field MUST be set prior calling the callback
Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
completedSynchronously ? c_StateCompletedSynchronously :
c_StateCompletedAsynchronously);
if (prevState != c_StatePending)
throw new InvalidOperationException(
"You can set a result only once");
// If the event exists, set it
if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();
// If a callback method was set, call it
if (m_AsyncCallback != null) m_AsyncCallback(this);
}
public void EndInvoke()
{
// This method assumes that only 1 thread calls EndInvoke
// for this object
if (!IsCompleted)
{
// If the operation isn't done, wait for it
AsyncWaitHandle.WaitOne();
AsyncWaitHandle.Close();
m_AsyncWaitHandle = null; // Allow early GC
}
// Operation is done: if an exception occured, throw it
if (m_exception != null) throw m_exception;
}
#region Implementation of IAsyncResult
public Object AsyncState { get { return m_AsyncState; } }
public Boolean CompletedSynchronously
{
get
{
return Thread.VolatileRead(ref m_CompletedState) ==
c_StateCompletedSynchronously;
}
}
public WaitHandle AsyncWaitHandle
{
get
{
if (m_AsyncWaitHandle == null)
{
Boolean done = IsCompleted;
ManualResetEvent mre = new ManualResetEvent(done);
if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,
mre, null) != null)
{
// Another thread created this object's event; dispose
// the event we just created
mre.Close();
}
else
{
if (!done && IsCompleted)
{
// If the operation wasn't done when we created
// the event but now it is done, set the event
m_AsyncWaitHandle.Set();
}
}
}
return m_AsyncWaitHandle;
}
}
public Boolean IsCompleted
{
get
{
return Thread.VolatileRead(ref m_CompletedState) !=
c_StatePending;
}
}
#endregion
}
internal class AsyncResult<TResult> : AsyncResultNoResult
{
// Field set when operation completes
private TResult m_result = default(TResult);
public AsyncResult(AsyncCallback asyncCallback, Object state) :
base(asyncCallback, state) { }
public void SetAsCompleted(TResult result,
Boolean completedSynchronously)
{
// Save the asynchronous operation's result
m_result = result;
// Tell the base class that the operation completed
// sucessfully (no exception)
base.SetAsCompleted(null, completedSynchronously);
}
new public TResult EndInvoke()
{
base.EndInvoke(); // Wait until operation has completed
return m_result; // Return the result (if above didn't throw)
}
}
然后客户端impl:
try
{
//string txtDescription = "Test";
string txtFileName = "Invoice_50000.xml";
//byte[] fileToSend = File.ReadAllBytes(txtFileName)
// Create the REST request.
string url = "http://localhost:8000/Service1/";//ConfigurationManager.AppSettings["serviceUrl"];
//string requestUrl = string.Format("{0}/Upload/{1}/{2}", url, System.IO.Path.GetFileName(txtFileName), txtDescription);
/* Asynchronous */
string requestUrl = string.Format("{0}/asyncupload/", url);
HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(requestUrl);
using (FileStream inputStream = File.Open(txtFileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
//new BufferedStream
//new Buffer
request.SendChunked = true;
request.AllowWriteStreamBuffering = false;
request.Method = "PUT";
request.ContentType = "application/octet-stream";
//request.ContentType = MediaTypeNames.Application.Octet
request.ContentLength = inputStream.Length;
/* BEGIN: Solution with chunks */
// 64 KB buffer
byte[] chunkBuffer = new byte[0x10000];
Stream st = request.GetRequestStream();
// as the file is streamed up in chunks, the server will be processing the file
int bytesRead = inputStream.Read(chunkBuffer, 0, chunkBuffer.Length);
while (bytesRead > 0)
{
st.Write(chunkBuffer, 0, bytesRead);
bytesRead = inputStream.Read(chunkBuffer, 0, chunkBuffer.Length);
}
st.Close();
}
try
{
HttpWebResponse resp = (HttpWebResponse)request.GetResponse();
Console.WriteLine("HTTP/{0} {1} {2}", resp.ProtocolVersion, (int)resp.StatusCode, resp.StatusDescription);
resp.Close();
}
catch (System.Exception)
{
//TODO: error handling here.
}
/* END: Solution with chunks */
}