Skip to content

Commit

Permalink
Improved error handling around message queue initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
sweetlandj committed Oct 31, 2017
1 parent f64c89a commit cfda0eb
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 13 deletions.
6 changes: 6 additions & 0 deletions Source/Platibus.MongoDB/MongoDBEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@ public class MongoDBEventType
/// Thrown when an index creation operation fails
/// </summary>
public static readonly DiagnosticEventType IndexCreationFailed = new DiagnosticEventType("IndexCreationFailed", DiagnosticEventLevel.Warn);

/// <summary>
/// Emitted when message headers or content cannot be read from the database
/// </summary>
public static readonly DiagnosticEventType MessageDocumentFormatError = new DiagnosticEventType("MessageDocumentFormatError", DiagnosticEventLevel.Error);

}
}
22 changes: 18 additions & 4 deletions Source/Platibus.MongoDB/MongoDBMessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ protected virtual Task UpdateQueuedMessage(QueuedMessage queuedMessage, DateTime
return _queuedMessages.UpdateOneAsync(filter, update, cancellationToken: cancellationToken);
}

/// <inheritdoc />
/// <summary>
/// Returns messages in the queue that are pending
/// </summary>
Expand All @@ -216,10 +217,23 @@ protected override async Task<IEnumerable<QueuedMessage>> GetPendingMessages(Can
var queuedMessages = new List<QueuedMessage>();
foreach (var queuedMessage in existingMessages)
{
var messageHeaders = new MessageHeaders(queuedMessage.Headers);
var principal = await _securityTokenService.NullSafeValidate(messageHeaders.SecurityToken);
var message = new Message(messageHeaders, queuedMessage.Content).WithoutSecurityToken();
queuedMessages.Add(new QueuedMessage(message, principal, queuedMessage.Attempts));
try
{
var messageHeaders = new MessageHeaders(queuedMessage.Headers);
var principal = await _securityTokenService.NullSafeValidate(messageHeaders.SecurityToken);
var message = new Message(messageHeaders, queuedMessage.Content).WithoutSecurityToken();
queuedMessages.Add(new QueuedMessage(message, principal, queuedMessage.Attempts));
}
catch (Exception ex)
{
DiagnosticService.Emit(new MongoDBEventBuilder(this, MongoDBEventType.MessageDocumentFormatError)
{
Detail = "Error reading previously queued message document ID " + queuedMessage.Id + "; skipping",
CollectionName = _queuedMessages.CollectionNamespace.CollectionName,
DatabaseName = _queuedMessages.Database.DatabaseNamespace.DatabaseName,
Exception = ex
}.Build());
}
}
return queuedMessages;
}
Expand Down
6 changes: 6 additions & 0 deletions Source/Platibus/Diagnostics/DiagnosticEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public class DiagnosticEventType
/// </summary>
public static readonly DiagnosticEventType ComponentInitialization = new DiagnosticEventType("ComponentInitialization", DiagnosticEventLevel.Info);

/// <summary>
/// Emitted when an error occurs during component or service initialization
/// </summary>
public static readonly DiagnosticEventType ComponentInitializationError = new DiagnosticEventType("ComponentInitializationError", DiagnosticEventLevel.Error);


/// <summary>
/// Emitted when a default configuration is used
/// </summary>
Expand Down
15 changes: 14 additions & 1 deletion Source/Platibus/Queueing/AbstractMessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

namespace Platibus.Queueing
{
/// <inheritdoc />
/// <summary>
/// An abstract base class for implementing message queues
/// </summary>
Expand Down Expand Up @@ -123,7 +124,19 @@ public virtual async Task Init(CancellationToken cancellationToken = default(Can
{
if (Interlocked.Exchange(ref _initialized, 1) == 0)
{
await EnqueueExistingMessages(cancellationToken);
try
{
await EnqueueExistingMessages(cancellationToken);
}
catch (Exception ex)
{
DiagnosticService.Emit(
new DiagnosticEventBuilder(this, DiagnosticEventType.ComponentInitializationError)
{
Detail = "Error enqueueing previously queued message(s)",
Exception = ex
}.Build());
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions Source/Platibus/SQL/SQLEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,11 @@ public static class SQLEventType
/// Emitted when there is an error opening or closing a connection
/// </summary>
public static readonly DiagnosticEventType CommandError = new DiagnosticEventType("CommandError", DiagnosticEventLevel.Error);

/// <summary>
/// Emitted when message headers or content cannot be read from the database
/// </summary>
public static readonly DiagnosticEventType MessageRecordFormatError = new DiagnosticEventType("MessageFormatError", DiagnosticEventLevel.Error);

}
}
27 changes: 19 additions & 8 deletions Source/Platibus/SQL/SQLMessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,27 @@ protected override async Task<IEnumerable<QueuedMessage>> GetPendingMessages(Can
{
while (await reader.ReadAsync(cancellationToken))
{
var record = commandBuilder.BuildQueuedMessageRecord(reader);
var messageContent = record.Content;
var headers = DeserializeHeaders(record.Headers);
try
{
var record = commandBuilder.BuildQueuedMessageRecord(reader);
var messageContent = record.Content;
var headers = DeserializeHeaders(record.Headers);
#pragma warning disable 612
var principal = await ResolvePrincipal(headers, record.SenderPrincipal);
var principal = await ResolvePrincipal(headers, record.SenderPrincipal);
#pragma warning restore 612
var message = new Message(headers, messageContent).WithoutSecurityToken();
var attempts = record.Attempts;
var queuedMessage = new QueuedMessage(message, principal, attempts);
queuedMessages.Add(queuedMessage);
var message = new Message(headers, messageContent).WithoutSecurityToken();
var attempts = record.Attempts;
var queuedMessage = new QueuedMessage(message, principal, attempts);
queuedMessages.Add(queuedMessage);
}
catch (Exception ex)
{
DiagnosticService.Emit(new SQLEventBuilder(this, SQLEventType.MessageRecordFormatError)
{
Detail = "Error reading previously queued message record; skipping",
Exception = ex
}.Build());
}
}
}
}
Expand Down

0 comments on commit cfda0eb

Please sign in to comment.