Skip to content

Commit

Permalink
Added missing diagnostic events and factored message marshalling logi…
Browse files Browse the repository at this point in the history
…c into a separate class.
  • Loading branch information
sweetlandj committed Apr 25, 2018
1 parent 9984f49 commit 1604c16
Show file tree
Hide file tree
Showing 24 changed files with 311 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ internal class LegacyMessageFileWriter : IDisposable

public LegacyMessageFileWriter(TextWriter writer, bool leaveOpen = false)
{
_writer = writer ?? throw new ArgumentNullException("writer");
_writer = writer ?? throw new ArgumentNullException(nameof(writer));
_leaveOpen = leaveOpen;
}

public LegacyMessageFileWriter(Stream stream, Encoding encoding = null, bool leaveOpen = false)
{
if (stream == null) throw new ArgumentNullException("stream");
if (stream == null) throw new ArgumentNullException(nameof(stream));
_writer = new StreamWriter(stream, encoding ?? Encoding.UTF8);
_leaveOpen = leaveOpen;
}
Expand All @@ -52,7 +52,7 @@ public async Task WritePrincipal(IPrincipal principal)

public async Task WriteMessage(Message message)
{
if (message == null) throw new ArgumentNullException("message");
if (message == null) throw new ArgumentNullException(nameof(message));

var headers = message.Headers;
if (headers != null)
Expand Down
4 changes: 2 additions & 2 deletions Source/Platibus.UnitTests/MessageQueueingServiceTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public QueueListenerStub(Action<Message, IQueuedMessageContext> callback = null,
_taskCompletionSource = new TaskCompletionSource<Message>();
_cancellationTokenSource = new CancellationTokenSource();
var cancelAfter = timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(3)
? TimeSpan.FromSeconds(5)
: timeout;

_cancellationTokenSource.Token.Register(() => _taskCompletionSource.TrySetCanceled());
Expand Down Expand Up @@ -445,7 +445,7 @@ public CountdownListenerStub(int target, TimeSpan timeout = default(TimeSpan))
_taskCompletionSource = new TaskCompletionSource<Message>();
_cancellationTokenSource = new CancellationTokenSource();
var cancelAfter = timeout <= TimeSpan.Zero
? TimeSpan.FromSeconds(3)
? TimeSpan.FromSeconds(5)
: timeout;

_cancellationTokenSource.Token.Register(() => _taskCompletionSource.TrySetCanceled());
Expand Down
66 changes: 54 additions & 12 deletions Source/Platibus.UnitTests/SentMessageExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

using Platibus.Diagnostics;
using Platibus.Serialization;
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand All @@ -29,23 +32,62 @@ namespace Platibus.UnitTests
[Trait("Category", "UnitTests")]
public class SentMessageExtensionsTests
{
protected CancellationTokenSource CancellationTokenSource;
protected MemoryCacheReplyHub ReplyHub;
protected Message Message;
protected ISentMessage SentMessage;
protected Message Reply;

public SentMessageExtensionsTests()
{
var timeout = TimeSpan.FromSeconds(3);
CancellationTokenSource = new CancellationTokenSource(timeout);

var messageSerializationService = new MessageMarshaller();
var diagnosticService = DiagnosticService.DefaultInstance;

ReplyHub = new MemoryCacheReplyHub(
messageSerializationService,
diagnosticService,
timeout);
}

[Fact]
public async Task FirstReplyIsReturned()
public async Task GetReplyReturnsFirstReply()
{
var memoryCacheReplyHub = new MemoryCacheReplyHub(TimeSpan.FromSeconds(3));
var messageId = MessageId.Generate();
var message = new Message(new MessageHeaders { MessageId = messageId }, "Hello, world!");
var sentMessage = memoryCacheReplyHub.CreateSentMessage(message);
GivenSentMessage();
GivenReply();
WhenReplyReceived();

const string reply = "Hello yourself!";
var replyTask = Task.Delay(TimeSpan.FromMilliseconds(500))
.ContinueWith(t => memoryCacheReplyHub.ReplyReceived(reply, messageId));
var cancellationToken = CancellationTokenSource.Token;
var awaitedReplyContent = await SentMessage.GetReply(cancellationToken);

var awaitedReply = await sentMessage.GetReply(TimeSpan.FromSeconds(3));
await replyTask;
Assert.NotNull(awaitedReplyContent);
Assert.Equal(Reply.Content, awaitedReplyContent);
}

Assert.NotNull(awaitedReply);
Assert.Equal(reply, awaitedReply);
protected void GivenSentMessage()
{
Message = new Message(new MessageHeaders
{
MessageId = MessageId.Generate()
}, "Hello, world!");

SentMessage = ReplyHub.CreateSentMessage(Message);
}

protected void GivenReply()
{
Reply = new Message(new MessageHeaders
{
MessageId = MessageId.Generate(),
RelatedTo = Message?.Headers.MessageId ?? default(MessageId)
}, "Hello, back!");
}

protected void WhenReplyReceived()
{
ReplyHub.NotifyReplyReceived(Reply);
}
}
}
2 changes: 1 addition & 1 deletion Source/Platibus.UnitTests/TestOutputHelperSink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class TestOutputHelperSink : IDiagnosticEventSink
/// <param name="outputHelper">The text writer to target</param>
public TestOutputHelperSink(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper ?? throw new ArgumentNullException("outputHelper");
_outputHelper = outputHelper ?? throw new ArgumentNullException(nameof(outputHelper));
}

/// <inheritdoc />
Expand Down
40 changes: 19 additions & 21 deletions Source/Platibus/Bus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ public class Bus : IBus, IDisposable
private readonly IEndpointCollection _endpoints;
private readonly IList<Task> _subscriptionTasks = new List<Task>();
private readonly IList<IHandlingRule> _handlingRules;
private readonly IMessageNamingService _messageNamingService;
private readonly IMessageQueueingService _messageQueueingService;
private readonly string _defaultContentType;
private readonly SendOptions _defaultSendOptions;
private readonly IDiagnosticService _diagnosticService;

private readonly MemoryCacheReplyHub _replyHub = new MemoryCacheReplyHub(TimeSpan.FromMinutes(5));
private readonly MessageMarshaller _messageMarshaller;
private readonly MemoryCacheReplyHub _replyHub;
private readonly IList<ISendRule> _sendRules;
private readonly ISerializationService _serializationService;
private readonly IList<ISubscription> _subscriptions;
private readonly IList<TopicName> _topics;
private readonly Uri _baseUri;
Expand Down Expand Up @@ -86,8 +85,10 @@ public Bus(IPlatibusConfiguration configuration, Uri baseUri, ITransportService

_defaultSendOptions = configuration.DefaultSendOptions ?? new SendOptions();

_messageNamingService = configuration.MessageNamingService;
_serializationService = configuration.SerializationService;
_messageMarshaller = new MessageMarshaller(
configuration.MessageNamingService,
configuration.SerializationService,
configuration.DefaultContentType);

_endpoints = new ReadOnlyEndpointCollection(configuration.Endpoints);
_topics = configuration.Topics.ToList();
Expand All @@ -96,9 +97,11 @@ public Bus(IPlatibusConfiguration configuration, Uri baseUri, ITransportService
_subscriptions = configuration.Subscriptions.ToList();

_diagnosticService = configuration.DiagnosticService;
_messageHandler = new MessageHandler(_messageNamingService, _serializationService, _diagnosticService);
_messageHandler = new MessageHandler(_messageMarshaller, _diagnosticService);

_transportService.MessageReceived += OnMessageReceived;

_replyHub = new MemoryCacheReplyHub(_messageMarshaller, _diagnosticService, TimeSpan.FromMinutes(5));
}

/// <summary>
Expand Down Expand Up @@ -128,8 +131,7 @@ public async Task Init(CancellationToken cancellationToken = default(Cancellatio
.Select(r => r.QueueOptions)
.FirstOrDefault();

var queueListener = new MessageHandlingListener(this, _messageNamingService,
_serializationService, queueName, handlers, _diagnosticService);
var queueListener = new MessageHandlingListener(this, _messageHandler, queueName, handlers, _diagnosticService);

await _messageQueueingService.CreateQueue(queueName, queueListener, queueOptions, cancellationToken);
}
Expand Down Expand Up @@ -350,12 +352,9 @@ private static int QueueOptionPrecedence(QueueOptions queueOptions)
private Message BuildMessage(object content, IMessageHeaders suppliedHeaders, SendOptions options)
{
if (content == null) throw new ArgumentNullException(nameof(content));
var messageName = _messageNamingService.GetNameForType(content.GetType());

var headers = new MessageHeaders(suppliedHeaders)
{
MessageId = MessageId.Generate(),
MessageName = messageName,
Origination = _baseUri,
Synchronous = options != null && options.Synchronous
};
Expand All @@ -372,9 +371,7 @@ private Message BuildMessage(object content, IMessageHeaders suppliedHeaders, Se
}
headers.ContentType = contentType;

var serializer = _serializationService.GetSerializer(headers.ContentType);
var serializedContent = serializer.Serialize(content);
return new Message(headers, serializedContent);
return _messageMarshaller.Marshal(content, headers);
}

private IEnumerable<KeyValuePair<EndpointName, IEndpoint>> GetEndpointsForSend(Message message)
Expand All @@ -385,6 +382,12 @@ private Message BuildMessage(object content, IMessageHeaders suppliedHeaders, Se

if (!matchingSendRules.Any())
{
_diagnosticService.Emit(
new DiagnosticEventBuilder(this, DiagnosticEventType.NoMatchingSendRules)
{
Message = message
}.Build());

throw new NoMatchingSendRulesException();
}

Expand Down Expand Up @@ -547,13 +550,8 @@ private async Task NotifyReplyReceived(Message message)
// number of replies received is less than the number expected, then the OnComplete
// event can be deferred.

var relatedToMessageId = message.Headers.RelatedTo;
var messageType = _messageNamingService.GetTypeForName(message.Headers.MessageName);
var serializer = _serializationService.GetSerializer(message.Headers.ContentType);
var messageContent = serializer.Deserialize(message.Content, messageType);

await _replyHub.ReplyReceived(messageContent, relatedToMessageId);
await _replyHub.NotifyLastReplyReceived(relatedToMessageId);
await _replyHub.NotifyReplyReceived(message);
await _replyHub.NotifyLastReplyReceived(message);
}

private void CheckDisposed()
Expand Down
25 changes: 14 additions & 11 deletions Source/Platibus/Config/ExtensibleConfigurationElement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,18 +169,20 @@ public Uri GetUri(string name)
/// </returns>
/// <exception cref="FormatException">If the property value
/// cannot be cast or parsed to <typeparamref name="TEnum"/></exception>
public TEnum GetEnum<TEnum>(string name) where TEnum : struct
public TEnum? GetEnum<TEnum>(string name) where TEnum : struct
{
var val = GetObject(name);
if (val == null) return default(TEnum);

if (val is TEnum)
switch (val)
{
return (TEnum) val;
case null:
return null;
case TEnum @enum:
return @enum;
}

return (TEnum) Enum.Parse(typeof (TEnum), val.ToString(), false);
}

/// <summary>
/// Returns the property with the specified <paramref name="name"/>
/// as a <see cref="TimeSpan"/>.
Expand All @@ -200,14 +202,15 @@ public Uri GetUri(string name)
/// cannot be cast or converted to an int</exception>
/// <exception cref="FormatException">If the property value
/// cannot be cast or converted to an int</exception>
public TimeSpan GetTimeSpan(string name)
public TimeSpan? GetTimeSpan(string name)
{
var val = GetObject(name);
if (val == null) return default(TimeSpan);

if (val is TimeSpan)
switch (val)
{
return (TimeSpan)val;
case null:
return null;
case TimeSpan span:
return span;
}

var str = val.ToString();
Expand Down
5 changes: 2 additions & 3 deletions Source/Platibus/Diagnostics/DiagnosticEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ public class DiagnosticEventType
/// Emitted when an error occurs during component or service initialization
/// </summary>
public static readonly DiagnosticEventType ComponentInitializationError = new DiagnosticEventType("ComponentInitializationError", DiagnosticEventLevel.Error);



/// <summary>
/// Emitted when a default configuration is used
/// </summary>
Expand Down Expand Up @@ -132,7 +131,7 @@ public class DiagnosticEventType
/// <summary>
/// Emitted whenever a message is not acknowledged
/// </summary>
public static readonly DiagnosticEventType MessageNotAcknowledged = new DiagnosticEventType("MessageNotAcknowledged", DiagnosticEventLevel.Trace);
public static readonly DiagnosticEventType MessageNotAcknowledged = new DiagnosticEventType("MessageNotAcknowledged", DiagnosticEventLevel.Warn);

/// <summary>
/// Emitted whenever an attempt is made to handle a message that has expired
Expand Down
4 changes: 2 additions & 2 deletions Source/Platibus/IO/MessageReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class MessageReader : IDisposable
/// <see cref="reader"/> will be left to the caller. Defaults to <c>false</c>.</param>
public MessageReader(TextReader reader, bool leaveOpen = false)
{
_reader = reader ?? throw new ArgumentNullException("reader");
_reader = reader ?? throw new ArgumentNullException(nameof(reader));
_leaveOpen = leaveOpen;
}

Expand All @@ -64,7 +64,7 @@ public MessageReader(TextReader reader, bool leaveOpen = false)
/// <see cref="stream"/> will be left to the caller. Defaults to <c>false</c>.</param>
public MessageReader(Stream stream, Encoding encoding = null, bool leaveOpen = false)
{
if (stream == null) throw new ArgumentNullException("stream");
if (stream == null) throw new ArgumentNullException(nameof(stream));
_reader = new StreamReader(stream, encoding ?? Encoding.UTF8);
_leaveOpen = leaveOpen;
}
Expand Down
1 change: 1 addition & 0 deletions Source/Platibus/IO/MessageWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

namespace Platibus.IO
{
/// <inheritdoc />
/// <summary>
/// An object that writes messages to streams
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions Source/Platibus/Journaling/MessageJournalConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ public MessageJournalConsumer(IPlatibusConfiguration configuration) : this(confi
{
}

/// <inheritdoc />
/// <summary>
/// Initializes a new <see cref="MessageJournalConsumer"/> with the specified configuration
/// Initializes a new <see cref="T:Platibus.Journaling.MessageJournalConsumer" /> with the specified configuration
/// and services
/// </summary>
/// <param name="bus">A configured and initialized bus instance</param>
/// <param name="messageJournal">The message journal from which entries will be consumed</param>
/// <exception cref="ArgumentNullException">Thrown if any of the parameters are <c>null</c></exception>
/// <exception cref="T:System.ArgumentNullException">Thrown if any of the parameters are <c>null</c></exception>
public MessageJournalConsumer(IBus bus, IMessageJournal messageJournal) : this(bus, messageJournal, null)
{
}
Expand Down

0 comments on commit 1604c16

Please sign in to comment.