//From https://github.com/akavache/Akavache
//Copyright (c) 2012 GitHub
//TODO: Remove once netstandard support is added
#pragma warning disable CS0618
using Akavache;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Reactive;
using System.Reactive.Threading.Tasks;
namespace Discord.Net
{
public class HttpMixin : IAkavacheHttpMixin
{
///
/// Download data from an HTTP URL and insert the result into the
/// cache. If the data is already in the cache, this returns
/// a cached value. The URL itself is used as the key.
///
/// The URL to download.
/// An optional Dictionary containing the HTTP
/// request headers.
/// Force a web request to always be issued, skipping the cache.
/// An optional expiration date.
/// The data downloaded from the URL.
public IObservable DownloadUrl(IBlobCache This, string url, IDictionary headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
{
return This.DownloadUrl(url, url, headers, fetchAlways, absoluteExpiration);
}
///
/// Download data from an HTTP URL and insert the result into the
/// cache. If the data is already in the cache, this returns
/// a cached value. An explicit key is provided rather than the URL itself.
///
/// The key to store with.
/// The URL to download.
/// An optional Dictionary containing the HTTP
/// request headers.
/// Force a web request to always be issued, skipping the cache.
/// An optional expiration date.
/// The data downloaded from the URL.
public IObservable DownloadUrl(IBlobCache This, string key, string url, IDictionary headers = null, bool fetchAlways = false, DateTimeOffset? absoluteExpiration = null)
{
var doFetch = MakeWebRequest(new Uri(url), headers).SelectMany(x => ProcessWebResponse(x, url, absoluteExpiration));
var fetchAndCache = doFetch.SelectMany(x => This.Insert(key, x, absoluteExpiration).Select(_ => x));
var ret = default(IObservable);
if (!fetchAlways)
{
ret = This.Get(key).Catch(fetchAndCache);
}
else
{
ret = fetchAndCache;
}
var conn = ret.PublishLast();
conn.Connect();
return conn;
}
IObservable ProcessWebResponse(WebResponse wr, string url, DateTimeOffset? absoluteExpiration)
{
var hwr = (HttpWebResponse)wr;
Debug.Assert(hwr != null, "The Web Response is somehow null but shouldn't be.");
if ((int)hwr.StatusCode >= 400)
{
return Observable.Throw(new WebException(hwr.StatusDescription));
}
var ms = new MemoryStream();
using (var responseStream = hwr.GetResponseStream())
{
Debug.Assert(responseStream != null, "The response stream is somehow null");
responseStream.CopyTo(ms);
}
var ret = ms.ToArray();
return Observable.Return(ret);
}
static IObservable MakeWebRequest(
Uri uri,
IDictionary headers = null,
string content = null,
int retries = 3,
TimeSpan? timeout = null)
{
IObservable request;
request = Observable.Defer(() =>
{
var hwr = CreateWebRequest(uri, headers);
if (content == null)
return Observable.FromAsyncPattern(hwr.BeginGetResponse, hwr.EndGetResponse)();
var buf = Encoding.UTF8.GetBytes(content);
// NB: You'd think that BeginGetResponse would never block,
// seeing as how it's asynchronous. You'd be wrong :-/
var ret = new AsyncSubject();
Observable.Start(() =>
{
Observable.FromAsyncPattern(hwr.BeginGetRequestStream, hwr.EndGetRequestStream)()
.SelectMany(x => WriteAsyncRx(x, buf, 0, buf.Length))
.SelectMany(_ => Observable.FromAsyncPattern(hwr.BeginGetResponse, hwr.EndGetResponse)())
.Multicast(ret).Connect();
}, BlobCache.TaskpoolScheduler);
return ret;
});
return request.Timeout(timeout ?? TimeSpan.FromSeconds(15), BlobCache.TaskpoolScheduler).Retry(retries);
}
private static WebRequest CreateWebRequest(Uri uri, IDictionary headers)
{
var hwr = WebRequest.Create(uri);
if (headers != null)
{
foreach (var x in headers)
{
hwr.Headers[x.Key] = x.Value;
}
}
return hwr;
}
private static IObservable WriteAsyncRx(Stream stream, byte[] data, int start, int length)
{
return stream.WriteAsync(data, start, length).ToObservable();
}
}
}