Ganhando Flexibilidade e Escalabilidade com Mensageria e “Pipes and Filters” (usando C#)

Publicado em 26/01/2012

5


Olá pessoal. Tudo certo!?

Tornar nossas aplicações escaláveis é um grande (e importante) desafio. A abordagem mais comum passa pela decomposição de grandes atividades em “tarefas” menores que possam ser executadas com independência de estado.

Um dos patterns arquiteturais mais comuns para esse desafio é o “Pipes and Filters” (que já foi discutido amplamente em outro post). A implementação desse pattern pode ser muito simplificada através da adoção de Message Queues (comuns para MSMQ e Azure), como pipes, e a implementação de “unidades de processamento”, como filters.

Endentendo um pouco mais o cenário

Imagine que você esteja processando solicitações de geração de mídia personalizada (como ocorre no site da Safari Books, por exemplo). A geração de um “pdf personalizado” incorre em diversas etapas de processamento que podem ser fracionadas. Observe:

image

 

Essa abordagem permite que essas etapas de processamento possam ser paralelizadas. Veja:

image

Como o “Process A” não precisa esperar que seus subsequentes concluam suas atividades, pode atender uma nova solicitação. Na próxima vez que concluir, precisa apenas colocar a atividade na  “sua” Queue de tarefas concluídas. Ela será processada assim que o processo relacionado estiver pronto.

A capacidade e “velocidade” de processamento tenderá para a do “Filter” mais lento (em atividades em “cadeia” a performance da corrente inteira é determinada pelo elo mais fraco). Sabendo disso, podemos escalar pontos específicos minimizando impactos e maximizando o resultado.

image

Sou fascinado pela Teoria das Restrições (você é?). Mas esse seria um tema para outro post.

Implementando um “Filter” abstrato usando MSMQ

Interagir com o MSMQ, como indicado no post anterior é muito simples. Entretanto, podemos generalizar (e facilitar) a criação de filters. Observe:

using System;
using System.Messaging;

namespace MSMQ.PipersAndFilters
{
    public abstract class Filter
    {
        readonly MessageQueue inputQueue;
        readonly MessageQueue outputQueue;

        public Filter(MessageQueue input, MessageQueue output)
        {
            this.inputQueue = input;
            this.outputQueue = output;
        }

        public void Start()
        {
            this.inputQueue.ReceiveCompleted += OnReceiveCompleted;
            this.inputQueue.BeginReceive();
        }

        void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
        {
            var source = (MessageQueue)sender;
            var message = source.EndReceive(e.AsyncResult);
            message.Formatter = new XmlMessageFormatter(new [] { typeof(T) });
            this.outputQueue.Send(Process(message));
            this.inputQueue.BeginReceive();
        }

        protected abstract Message Process(Message original);
    }
}

Basicamente, esse filtro permanece “ouvindo” a Queue “input” indefinidamente. Cada vez que uma mensagem é recebida, é processada e encaminhada para a Queue “output”.

Para criar um exemplo simples, considere que desejemos converter todas as mensagens colocadas na Queue para maiúsculo:

using System.Messaging;

namespace MSMQ.PipersAndFilters
{
    public class ToUpperFilter : Filter
    {
        public ToUpperFilter(MessageQueue input, MessageQueue output) :
            base(input, output)
        {}

        protected override Message Process(Message original)
        {
            var s = (string) original.Body;
            return new Message(s.ToUpper());
        }
    }
}

Bacana, não. Por fim, veja como é simples colocar esse filtro para funcionar:

using System;
using System.Messaging;

namespace MSMQ.PipersAndFilters
{
    class Program
    {
         static void Main(string[] args)
        {
            Console.Title = "Filter";

            new ToUpperFilter(
                GetMessageQueue(@".\private$\sample_queue"),
                GetMessageQueue(@".\private$\sample_transformed")
                ).Start();

            Console.WriteLine("Press ENTER to exit");
            Console.ReadLine();
        }

        static MessageQueue GetMessageQueue(string name)
        {
            if (MessageQueue.Exists(name))
                return new MessageQueue(name);
           
            return MessageQueue.Create(name);
        }
    }
}

Esse “filtro” interage perfeitamente com o exemplo do post anterior. Acho que você pegou a idéia.

Perceba que essa implementação não suporta transações (o que não é bom no mundo real). Mas, esse, é tema para outro post.

Por agora, era isso!

Publicado em: Post