Olá pessoal. Tudo certo!?
Tornar nossas aplicações escaláveis é um grande (e importante) desafio. A abordagem mais comum passa pela decomposição de grandes atividades em “tarefas” menores que possam ser executadas com independência de estado.
Um dos patterns arquiteturais mais comuns para esse desafio é o “Pipes and Filters” (que já foi discutido amplamente em outro post). A implementação desse pattern pode ser muito simplificada através da adoção de Message Queues (comuns para MSMQ e Azure), como pipes, e a implementação de “unidades de processamento”, como filters.
Endentendo um pouco mais o cenário
Imagine que você esteja processando solicitações de geração de mídia personalizada (como ocorre no site da Safari Books, por exemplo). A geração de um “pdf personalizado” incorre em diversas etapas de processamento que podem ser fracionadas. Observe:
Essa abordagem permite que essas etapas de processamento possam ser paralelizadas. Veja:
Como o “Process A” não precisa esperar que seus subsequentes concluam suas atividades, pode atender uma nova solicitação. Na próxima vez que concluir, precisa apenas colocar a atividade na “sua” Queue de tarefas concluídas. Ela será processada assim que o processo relacionado estiver pronto.
A capacidade e “velocidade” de processamento tenderá para a do “Filter” mais lento (em atividades em “cadeia” a performance da corrente inteira é determinada pelo elo mais fraco). Sabendo disso, podemos escalar pontos específicos minimizando impactos e maximizando o resultado.
Sou fascinado pela Teoria das Restrições (você é?). Mas esse seria um tema para outro post.
Implementando um “Filter” abstrato usando MSMQ
Interagir com o MSMQ, como indicado no post anterior é muito simples. Entretanto, podemos generalizar (e facilitar) a criação de filters. Observe:
using System;
using System.Messaging;
namespace MSMQ.PipersAndFilters
{
public abstract class Filter
{
readonly MessageQueue inputQueue;
readonly MessageQueue outputQueue;
public Filter(MessageQueue input, MessageQueue output)
{
this.inputQueue = input;
this.outputQueue = output;
}
public void Start()
{
this.inputQueue.ReceiveCompleted += OnReceiveCompleted;
this.inputQueue.BeginReceive();
}
void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
var source = (MessageQueue)sender;
var message = source.EndReceive(e.AsyncResult);
message.Formatter = new XmlMessageFormatter(new [] { typeof(T) });
this.outputQueue.Send(Process(message));
this.inputQueue.BeginReceive();
}
protected abstract Message Process(Message original);
}
}
Basicamente, esse filtro permanece “ouvindo” a Queue “input” indefinidamente. Cada vez que uma mensagem é recebida, é processada e encaminhada para a Queue “output”.
Para criar um exemplo simples, considere que desejemos converter todas as mensagens colocadas na Queue para maiúsculo:
using System.Messaging;
namespace MSMQ.PipersAndFilters
{
public class ToUpperFilter : Filter
{
public ToUpperFilter(MessageQueue input, MessageQueue output) :
base(input, output)
{}
protected override Message Process(Message original)
{
var s = (string) original.Body;
return new Message(s.ToUpper());
}
}
}
Bacana, não. Por fim, veja como é simples colocar esse filtro para funcionar:
using System;
using System.Messaging;
namespace MSMQ.PipersAndFilters
{
class Program
{
static void Main(string[] args)
{
Console.Title = "Filter";
new ToUpperFilter(
GetMessageQueue(@".\private$\sample_queue"),
GetMessageQueue(@".\private$\sample_transformed")
).Start();
Console.WriteLine("Press ENTER to exit");
Console.ReadLine();
}
static MessageQueue GetMessageQueue(string name)
{
if (MessageQueue.Exists(name))
return new MessageQueue(name);
return MessageQueue.Create(name);
}
}
}
Esse “filtro” interage perfeitamente com o exemplo do post anterior. Acho que você pegou a idéia.
Perceba que essa implementação não suporta transações (o que não é bom no mundo real). Mas, esse, é tema para outro post.
Por agora, era isso!






janeiro 27th, 2012 → 19:43
[...] Esse pattern pode ser combinado com “Pipers And Filters” (apresentei implantação no post anterior). [...]
janeiro 28th, 2012 → 13:15
[...] dia, falamos sobre como aumentar a flexibilidade de nossos sistemas usando “Pipes and Filters”. Também exploramos alternativas para aumentar integração e escalabilidade através de [...]
janeiro 28th, 2012 → 19:24
[...] Ganhando Flexibilidade e Escalabilidade com Mensageria e “Pipes and Filters” (usando C#) [...]
fevereiro 21st, 2012 → 23:32
[...] Pipes and Filters [...]
fevereiro 21st, 2012 → 23:35
[...] Pipes and Filters [...]