diff --git a/Chronological.Tests/GenericFluentAggregateQueryTests.cs b/Chronological.Tests/GenericFluentAggregateQueryTests.cs index 8104035..ca527c9 100644 --- a/Chronological.Tests/GenericFluentAggregateQueryTests.cs +++ b/Chronological.Tests/GenericFluentAggregateQueryTests.cs @@ -84,7 +84,7 @@ public async void Test2() var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc); var to = new DateTime(2017, 12, 30, 12, 0, 0, DateTimeKind.Utc); - var result = await new GenericFluentAggregateQuery("Test", Search.Span(from, to), environment, new AggregateWebSocketRepository(new MockWebSocketRepository(_webSocketResult))) + var result = await new GenericFluentAggregateQuery("Test", Search.Span(from, to), environment, new AggregateApiRepository(new MockWebSocketRepository(_webSocketResult))) .Select(builder => builder.UniqueValues(x => x.DataType, 10, builder.DateHistogram(x => x.Date, Breaks.InDays(1), new diff --git a/Chronological.Tests/HttpRepositoryTests.cs b/Chronological.Tests/HttpRepositoryTests.cs new file mode 100644 index 0000000..2e1dad9 --- /dev/null +++ b/Chronological.Tests/HttpRepositoryTests.cs @@ -0,0 +1,130 @@ +using System; +using System.Linq; +using Newtonsoft.Json.Linq; +using Xunit; + +namespace Chronological.Tests +{ + public class HttpRepositoryTests + { + string applicationClientId = "testApplicationId"; + string applicationClientSecret = "testApplicationSecret"; + string tenant = "testTenant"; + + [Fact] + public async void GenericFluentEventQueryTest() + { + var environment = new Environment("TestFqdn", "TestAccessToken"); + + var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc); + var to = new DateTime(2018, 10, 23, 12, 0, 0, DateTimeKind.Utc); + + var result = await new GenericFluentEventQuery("Test", Search.Span(from, to), Limit.CreateLimit(Limit.Take, 10), environment, new EventApiRepository(new MockHttpRepository(_eventsResult))) + .Where(x => x.ip == "127.0.0.1") + .ExecuteAsync(); + + Assert.NotNull(result); + } + + [Fact] + public async void AggregateQueryTest() + { + var environment = new Environment("TestFqdn", "TestAccessToken"); + + var from = new DateTime(2017, 12, 23, 12, 0, 0, DateTimeKind.Utc); + var to = new DateTime(2018, 10, 23, 12, 0, 0, DateTimeKind.Utc); + + var result = await new GenericFluentAggregateQuery("Test", Search.Span(from, to), environment, new AggregateApiRepository(new MockHttpRepository(_aggregateResults))) + .Select(builder => builder.UniqueValues(x => x.ip, 10, new { Count = builder.Count() })) + .Where(x => x.ip == "127.0.0.1") + .ExecuteAsync(); + + Assert.NotNull(result); + } + + private string _eventsResult = @"{ + ""warnings"": [], + ""events"": [ + { + ""schema"": { + ""rid"": 0, + ""$esn"": ""source1"", + ""properties"": [ + { + ""name"": ""email"", + ""type"": ""String"" + }, + { + ""name"": ""ip"", + ""type"": ""String"" + }, + { + ""name"": ""useragent"", + ""type"": ""String"" + } + ] + }, + ""$ts"": ""2018-10-18T11:30:36Z"", + ""values"": [ + ""test1@mail.com"", + ""127.0.0.1"", + ""IE"" + ] + }, + { + ""schemaRid"": 0, + ""$ts"": ""2018-10-18T11:30:36Z"", + ""values"": [ + ""test2@mail.com"", + ""127.0.0.1"", + ""FF"" + ] + }, + { + ""schemaRid"": 0, + ""$ts"": ""2018-10-18T11:30:36Z"", + ""values"": [ + ""test3@mail.com"", + ""127.0.0.1"", + ""IE"" + ] + } + ] +}"; + + private string _aggregateResults = @"{ + ""aggregates"": [ + { + ""dimension"": [ + null, + ""EmailOpened"", + ""EmailSent"" + ], + ""measures"": [ + [ + 10.0 + ], + [ + 20.0 + ], + [ + 200.0 + ] + ] + } + ], + ""warnings"": [] +}"; + + } + + public class EmailTest + { + [ChronologicalEventField("email")] + public string email { get; set; } + [ChronologicalEventField("ip")] + public string ip { get; set; } + [ChronologicalEventField("useragent")] + public string useragent { get; set; } + } +} diff --git a/Chronological.Tests/MockHttpRepository.cs b/Chronological.Tests/MockHttpRepository.cs new file mode 100644 index 0000000..db29dcf --- /dev/null +++ b/Chronological.Tests/MockHttpRepository.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Newtonsoft.Json.Linq; + +namespace Chronological.Tests +{ + internal class MockHttpRepository : IWebRequestRepository + { + private readonly List _results; + + public MockHttpRepository(string result) : this(new List { result }) + { + } + + public MockHttpRepository(List results) + { + _results = results; + } + + async Task> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken) + { + if ("aggregates".Equals(resourcePath, StringComparison.OrdinalIgnoreCase)) + { + return new List() { JToken.Parse(_results.First())["aggregates"] }; + } + + return new List { JToken.Parse(_results.First()) }; + } + } +} diff --git a/Chronological.Tests/MockWebSocketRepository.cs b/Chronological.Tests/MockWebSocketRepository.cs index 74c930c..52615fa 100644 --- a/Chronological.Tests/MockWebSocketRepository.cs +++ b/Chronological.Tests/MockWebSocketRepository.cs @@ -7,7 +7,7 @@ namespace Chronological.Tests { - internal class MockWebSocketRepository : IWebSocketRepository + internal class MockWebSocketRepository : IWebRequestRepository { private readonly List _results; @@ -20,7 +20,7 @@ public MockWebSocketRepository(List results) _results = results; } - async Task> IWebSocketRepository.ReadWebSocketResponseAsync(string query, string resourcePath, CancellationToken cancellationToken) + async Task> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken) { return new List { JToken.Parse(_results.First())["content"] }; } diff --git a/Chronological/AggregateWebSocketRepository.cs b/Chronological/AggregateApiRepository.cs similarity index 61% rename from Chronological/AggregateWebSocketRepository.cs rename to Chronological/AggregateApiRepository.cs index eae3db3..f3db4a5 100644 --- a/Chronological/AggregateWebSocketRepository.cs +++ b/Chronological/AggregateApiRepository.cs @@ -6,20 +6,25 @@ namespace Chronological { - internal class AggregateWebSocketRepository : IAggregateWebSocketRepository + internal interface IAggregateApiRepository { - private readonly IWebSocketRepository _webSocketRepository; + Task> Execute(string query, IEnumerable aggregates, CancellationToken cancellationToken = default); + } + + internal class AggregateApiRepository : IAggregateApiRepository + { + private readonly IWebRequestRepository _webSocketRepository; - internal AggregateWebSocketRepository(IWebSocketRepository webSocketRepository) + internal AggregateApiRepository(IWebRequestRepository webSocketRepository) { _webSocketRepository = webSocketRepository; } - async Task> IAggregateWebSocketRepository.Execute(string query, IEnumerable aggregates, CancellationToken cancellationToken) + async Task> IAggregateApiRepository.Execute(string query, IEnumerable aggregates, CancellationToken cancellationToken) { var executionResults = new List(); - var results = await _webSocketRepository.ReadWebSocketResponseAsync(query, "aggregates", cancellationToken); + var results = await _webSocketRepository.ExecuteRequestAsync(query, "aggregates", cancellationToken); // According to samples here: https://github.com/Azure-Samples/Azure-Time-Series-Insights/blob/master/C-%20Hello%20World%20App%20Sample/Program.cs // Aggregates should only use the final result set diff --git a/Chronological/Aggregate[TX,TY,TZ].cs b/Chronological/Aggregate[TX,TY,TZ].cs index edecd77..f30aca2 100644 --- a/Chronological/Aggregate[TX,TY,TZ].cs +++ b/Chronological/Aggregate[TX,TY,TZ].cs @@ -6,7 +6,6 @@ namespace Chronological { - public abstract class Aggregate : Dictionary, IAggregate, IInternalAggregate { internal abstract string AggregateType { get; } @@ -17,8 +16,8 @@ public abstract class Aggregate : Dictionary, IAggregate, IIn IAggregate IInternalAggregate.GetPopulatedAggregate(JObject jObject, Func measureAccessFunc) { var aggregate = Clone(); - - foreach (var dimension in jObject["dimension"].Select((x, y) => new { x, y })) + + foreach (var dimension in jObject["dimension"].Select((x, y) => new { x = x.IsNullOrEmpty() ? "null" : x, y })) { JArray MeasureAccessFunc(JArray x) { diff --git a/Chronological/Environment.cs b/Chronological/Environment.cs index 89d0d49..9d2ea22 100644 --- a/Chronological/Environment.cs +++ b/Chronological/Environment.cs @@ -15,11 +15,13 @@ public class Environment public string EnvironmentId { get; } public string ResourceId { get; } public string AccessToken { get; } + public WebRequestChannel WebRequestChannel { get; set; } - public Environment(string fqdn, string accessToken) + public Environment(string fqdn, string accessToken, WebRequestChannel webRequestChannel = WebRequestChannel.WebSocket) { EnvironmentFqdn = fqdn; AccessToken = accessToken; + WebRequestChannel = webRequestChannel; } internal Environment(string displayName, string environmentId, string resourceId, string environmentFqdn, string accessToken) @@ -95,6 +97,9 @@ public async Task GetMetadataAsync(DateTime from, DateTime public GenericFluentAggregateQuery AggregateQuery(DateTime fromDate, DateTime toDate, string queryName = "ChronologicalQuery") where T: new() { + if (WebRequestChannel == WebRequestChannel.Http) + return new GenericFluentAggregateQuery(queryName, Search.Span(fromDate, toDate), this, new AggregateApiRepository(new HttpRepository(this))); + return new GenericFluentAggregateQuery(queryName, Search.Span(fromDate,toDate), this); } @@ -106,6 +111,10 @@ public StringAggregateQuery AggregateQuery(string query, string queryName = "Chr public GenericFluentEventQuery EventQuery(DateTime fromDate, DateTime toDate, INonSortableLimit limit, int limitCount, string queryName = "ChronologicalQuery") where T:new() { var populatedLimit = Limit.CreateLimit(limit, limitCount); + + if(WebRequestChannel == WebRequestChannel.Http) + return new GenericFluentEventQuery(queryName, Search.Span(fromDate, toDate), populatedLimit, this, new EventApiRepository(new HttpRepository(this))); + return new GenericFluentEventQuery(queryName, Search.Span(fromDate, toDate), populatedLimit, this); } @@ -129,4 +138,10 @@ public StringEventQuery EventQuery(string query, string queryName = "Chronologic } } + + public enum WebRequestChannel + { + WebSocket = 1, + Http = 2 + } } diff --git a/Chronological/EventWebSocketRepository.cs b/Chronological/EventApiRepository.cs similarity index 63% rename from Chronological/EventWebSocketRepository.cs rename to Chronological/EventApiRepository.cs index f527825..de8c494 100644 --- a/Chronological/EventWebSocketRepository.cs +++ b/Chronological/EventApiRepository.cs @@ -8,25 +8,23 @@ namespace Chronological { - - internal interface IEventWebSocketRepository + internal interface IEventApiRepository { - Task> Execute(string query, CancellationToken cancellationToken = default); } -; internal class EventWebSocketRepository : IEventWebSocketRepository +; internal class EventApiRepository : IEventApiRepository { - private readonly IWebSocketRepository _webSocketRepository; + private readonly IWebRequestRepository _webRequestRepository; - internal EventWebSocketRepository(IWebSocketRepository webSocketRepository) + internal EventApiRepository(IWebRequestRepository webRequestRepository) { - _webSocketRepository = webSocketRepository; + _webRequestRepository = webRequestRepository; } - async Task> IEventWebSocketRepository.Execute(string query, CancellationToken cancellationToken) + async Task> IEventApiRepository.Execute(string query, CancellationToken cancellationToken) { - var results = await _webSocketRepository.ReadWebSocketResponseAsync(query, "events", cancellationToken); + var results = await _webRequestRepository.ExecuteRequestAsync(query, "events", cancellationToken); // According to samples here: https://github.com/Azure-Samples/Azure-Time-Series-Insights/blob/master/C-%20Hello%20World%20App%20Sample/Program.cs // Events should combine all results recevied diff --git a/Chronological/EventQuery.cs b/Chronological/EventQuery.cs index 4af6d0b..536b980 100644 --- a/Chronological/EventQuery.cs +++ b/Chronological/EventQuery.cs @@ -6,17 +6,17 @@ public abstract class EventQuery { private readonly string _queryName; private readonly Environment _environment; - private readonly IEventWebSocketRepository _eventWebSocketRepository; + private readonly IEventApiRepository _eventApiRepository; internal EventQuery(string queryName, Environment environment, - IEventWebSocketRepository eventWebSocketRepository) + IEventApiRepository eventApiRepository) { _queryName = queryName; _environment = environment; - _eventWebSocketRepository = eventWebSocketRepository; + _eventApiRepository = eventApiRepository; } - internal EventQuery(string queryName, Environment environment) : this(queryName, environment, new EventWebSocketRepository(new WebSocketRepository(environment))) + internal EventQuery(string queryName, Environment environment) : this(queryName, environment, new EventApiRepository(new WebSocketRepository(environment))) { } diff --git a/Chronological/GenericFluentAggregateQuery.cs b/Chronological/GenericFluentAggregateQuery.cs index 0418767..98944b0 100644 --- a/Chronological/GenericFluentAggregateQuery.cs +++ b/Chronological/GenericFluentAggregateQuery.cs @@ -13,14 +13,14 @@ namespace Chronological private readonly string _queryName; private readonly Search _search; private readonly Environment _environment; - private readonly IAggregateWebSocketRepository _webSocketRepository; + private readonly IAggregateApiRepository _webSocketRepository; internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment) : this(queryName, - search, environment, new AggregateWebSocketRepository(new WebSocketRepository(environment))) + search, environment, new AggregateApiRepository(new WebSocketRepository(environment))) { } - internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment, IAggregateWebSocketRepository webSocketRepository) + internal GenericFluentAggregateQuery(string queryName, Search search, Environment environment, IAggregateApiRepository webSocketRepository) { _queryName = queryName; _search = search; @@ -39,7 +39,7 @@ public GenericFluentAggregatesQuery> Select(IEnu { private GenericFluentAggregatesQuery _multiQuery; - internal GenericFluentAggregateQuery(string queryName, Search search, TY aggregate, Environment environment, IAggregateWebSocketRepository webSocketRepository) + internal GenericFluentAggregateQuery(string queryName, Search search, TY aggregate, Environment environment, IAggregateApiRepository webSocketRepository) { _multiQuery = new GenericFluentAggregatesQuery(queryName, search, new List {aggregate}, environment, webSocketRepository); @@ -77,11 +77,11 @@ public async Task ExecuteAsync(CancellationToken cancellationToken = default private readonly string _queryName; private readonly Search _search; private readonly Environment _environment; - private readonly IAggregateWebSocketRepository _webSocketRepository; + private readonly IAggregateApiRepository _webSocketRepository; - internal GenericFluentAggregatesQuery(string queryName, Search search, IEnumerable aggregates, Environment environment, IAggregateWebSocketRepository webSocketRepository) + internal GenericFluentAggregatesQuery(string queryName, Search search, IEnumerable aggregates, Environment environment, IAggregateApiRepository webSocketRepository) { _queryName = queryName; _search = search; diff --git a/Chronological/GenericFluentEventQuery.cs b/Chronological/GenericFluentEventQuery.cs index 0888488..f2c0adc 100644 --- a/Chronological/GenericFluentEventQuery.cs +++ b/Chronological/GenericFluentEventQuery.cs @@ -14,21 +14,21 @@ namespace Chronological private Filter _filter; private readonly Limit _limit; private readonly Environment _environment; - private readonly IEventWebSocketRepository _eventWebSocketRepository; + private readonly IEventApiRepository _eventApiRepository; internal GenericFluentEventQuery(string queryName, Search search, Limit limit, Environment environment) - : this(queryName,search,limit,environment,new EventWebSocketRepository(new WebSocketRepository(environment))) + : this(queryName,search,limit,environment,new EventApiRepository(new WebSocketRepository(environment))) { } internal GenericFluentEventQuery(string queryName, Search search, Limit limit, Environment environment, - IEventWebSocketRepository eventWebSocketRepository) + IEventApiRepository eventApiRepository) { _queryName = queryName; _search = search; _limit = limit; _environment = environment; - _eventWebSocketRepository = eventWebSocketRepository; + _eventApiRepository = eventApiRepository; } public GenericFluentEventQuery Where(Expression> predicate) @@ -87,7 +87,7 @@ private JObject ToJObject(string accessToken) public async Task> ExecuteAsync(CancellationToken cancellationToken = default) { - return await _eventWebSocketRepository.Execute(ToString(), cancellationToken); + return await _eventApiRepository.Execute(ToString(), cancellationToken); } } } diff --git a/Chronological/HttpRepository.cs b/Chronological/HttpRepository.cs new file mode 100644 index 0000000..d4fb169 --- /dev/null +++ b/Chronological/HttpRepository.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using Chronological.Exceptions; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Chronological +{ + public class HttpRepository : IWebRequestRepository + { + private readonly Environment _environment; + private readonly IErrorToExceptionConverter _errorToExceptionConverter; + + public HttpRepository(Environment environment) : this(environment, new ErrorToExceptionConverter()) + { + } + + internal HttpRepository(Environment environment, IErrorToExceptionConverter errorToExceptionConverter) + { + _environment = environment; + _errorToExceptionConverter = errorToExceptionConverter; + } + + async Task> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken) + { + var httpQuery = JToken.Parse(query)["content"].ToString(); + + var request = CreateHttpsWebRequest(_environment.EnvironmentFqdn, "POST", resourcePath, _environment.AccessToken, new[] { "timeout=PT20S" }); + await WriteRequestStreamAsync(request, httpQuery); + var responseContent = await GetResponseAsync(request); + + if ("aggregates".Equals(resourcePath, StringComparison.OrdinalIgnoreCase)) + { + var results = new List() { responseContent }; + return new List() { results.First()["aggregates"] }; + } + + return new List() { responseContent }; + } + + private static HttpWebRequest CreateHttpsWebRequest(string host, string method, string path, string accessToken, string[] queryArgs = null) + { + string query = "api-version=2016-12-12"; + if (queryArgs != null && queryArgs.Any()) + { + query += "&" + String.Join("&", queryArgs); + } + + Uri uri = new UriBuilder("https", host) + { + Path = path, + Query = query + }.Uri; + HttpWebRequest request = WebRequest.CreateHttp(uri); + request.Method = method; + request.Headers["Authorization"] = $"Bearer {accessToken}"; + return request; + } + + private static async Task WriteRequestStreamAsync(HttpWebRequest request, string inputPayload) + { + using (var stream = await request.GetRequestStreamAsync()) + using (var streamWriter = new StreamWriter(stream)) + { + await streamWriter.WriteAsync(inputPayload); + await streamWriter.FlushAsync(); + //streamWriter.Close(); + } + } + + private static async Task GetResponseAsync(HttpWebRequest request) + { + try + { + using (WebResponse webResponse = await request.GetResponseAsync()) + using (var sr = new StreamReader(webResponse.GetResponseStream())) + { + string result = await sr.ReadToEndAsync(); + return JsonConvert.DeserializeObject(result); + } + } + catch (WebException e) + { + if (e.Response == null) + throw e; + + using (WebResponse response = e.Response) + { + HttpWebResponse httpResponse = (HttpWebResponse)response; + Console.WriteLine("Error code: {0}", httpResponse.StatusCode); + using (Stream data = response.GetResponseStream()) + using (var reader = new StreamReader(data)) + { + throw new Exception(reader.ReadToEnd(), e); + } + } + } + } + } +} diff --git a/Chronological/IAggregateWebSocketRepository.cs b/Chronological/IAggregateWebSocketRepository.cs deleted file mode 100644 index 4598c22..0000000 --- a/Chronological/IAggregateWebSocketRepository.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace Chronological -{ - internal interface IAggregateWebSocketRepository - { - Task> Execute(string query, IEnumerable aggregates, CancellationToken cancellationToken = default); - } -} diff --git a/Chronological/IWebRequestRepository.cs b/Chronological/IWebRequestRepository.cs new file mode 100644 index 0000000..8afdd7b --- /dev/null +++ b/Chronological/IWebRequestRepository.cs @@ -0,0 +1,12 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Newtonsoft.Json.Linq; + +namespace Chronological +{ + internal interface IWebRequestRepository + { + Task> ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken = default); + } +} diff --git a/Chronological/IWebSocketRepository.cs b/Chronological/IWebSocketRepository.cs deleted file mode 100644 index 5f1983f..0000000 --- a/Chronological/IWebSocketRepository.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using Newtonsoft.Json.Linq; - -namespace Chronological -{ - internal interface IWebSocketRepository - { - Task> ReadWebSocketResponseAsync(string query, string resourcePath, CancellationToken cancellationToken = default); - } -} diff --git a/Chronological/JsonExtensions.cs b/Chronological/JsonExtensions.cs new file mode 100644 index 0000000..d9683c6 --- /dev/null +++ b/Chronological/JsonExtensions.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Newtonsoft.Json.Linq; + +namespace Chronological +{ + public static class JsonExtensions + { + public static bool IsNullOrEmpty(this JToken token) + { + return (token == null) || + (token.Type == JTokenType.Array && !token.HasValues) || + (token.Type == JTokenType.Object && !token.HasValues) || + (token.Type == JTokenType.String && token.ToString() == String.Empty) || + (token.Type == JTokenType.Null); + } + } +} diff --git a/Chronological/WebSocketRepository.cs b/Chronological/WebSocketRepository.cs index 3bcdc94..fbc0eb4 100644 --- a/Chronological/WebSocketRepository.cs +++ b/Chronological/WebSocketRepository.cs @@ -12,7 +12,7 @@ namespace Chronological { - internal class WebSocketRepository : IWebSocketRepository + internal class WebSocketRepository : IWebRequestRepository { private readonly Environment _environment; private readonly IErrorToExceptionConverter _errorToExceptionConverter; @@ -27,7 +27,7 @@ internal WebSocketRepository(Environment environment, IErrorToExceptionConverter _errorToExceptionConverter = errorToExceptionConverter; } - async Task> IWebSocketRepository.ReadWebSocketResponseAsync(string query, string resourcePath, CancellationToken cancellationToken) + async Task> IWebRequestRepository.ExecuteRequestAsync(string query, string resourcePath, CancellationToken cancellationToken) { var webSocket = new ClientWebSocket();