28
loading...
This website collects cookies to deliver better user experience
public interface IInQueuePipeModel
{
string InMessage { get; set; }
}
QueueName
property in IInQueuePipeModel
interface:public interface IInQueuePipeModel
{
string QueueName { get; set; }
string InMessage { get; set; }
}
public interface IInQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where
TPipeModel : IInQueuePipeModel
{
string InQueueName { get; }
}
TPipeModel
, meaning that the type parameter TPipeModel
implements the IInQueuePipeModel
interface.public interface IOutQueuePipeStep<TPipeModel> : IPipeStep<TPipeModel> where TPipeModel : IInQueuePipeModel
{
}
public interface IOutQueuePipeModel
{
string OutMessage { get; set; }
}
IOutQueuePipeModel
interface is not required. Since there are different ways to implement sending out messages, such property is not always needed in pipe model. Add it when you are sure you will make use of it. IInQueuePipeStep
and IOutQueuePipeStep
. Additionally, new QueuePipeService
will be introduced and it's going to be responsible for queue related pipe execution.QueuePipeService
. The only difference comparing to the previous implementation is inside ExecuteAsync
method:public class QueuePipeService<TPipeModel> : IPipeService<TPipeModel>
{
protected readonly IList<Func<IPipeStep<TPipeModel>>> _pipeSteps;
protected QueuePipeService()
{
_pipeSteps = new List<Func<IPipeStep<TPipeModel>>>();
}
public IPipeService<TPipeModel> Add(Func<IPipeStep<TPipeModel>> pipeStep)
{
_pipeSteps.Add(pipeStep);
return this;
}
public async Task ExecuteAsync(TPipeModel pipeModel)
{
var steps = GetInitializedSteps().SkipUntil(s => !(s is IInQueuePipeStep<TPipeModel> && (s as IInQueuePipeStep<TPipeModel>).InQueueName == pipeModel.QueueName))
.TakeUntil(s => !(s is IOutQueuePipeStep<TPipeModel>));
foreach (var pipeStep in steps)
{
await pipeStep.ExecuteAsync(pipeModel);
}
}
private IEnumerable<IPipeStep<TPipeModel>> GetInitializedSteps()
{
return _pipeSteps.Select(s => s.Invoke());
}
}
ExecuteAsync
works in a way that until it finds step which implements IInQueuePipeStep
interface and queue name for such step is equal to queue name passed in pipe model it will be skipping steps. From the matching step it gets all next steps but if one of them implements IOutQueuePipeStep
, it is going to be an indication that after this step some request message is going to be sent out and pipeline will be terminated until receiving response. To achieve this functionality I created two IEnumerable
extension methods: SkipUntil
and TakeUntil
.QueuePipeService
and PipeService
. To follow DRY (don’t repeat yourself) principle, under this link, you can find final implementation with extracted PipeServiceBase
abstract class which contains common implementation for both services.