Exchange czyli czym jest centrala wiadomości w RabbitMQ

Exchange czyli czym jest centrala wiadomości w RabbitMQ

W pierwszym wpisie z tej serii zrobiłem pełen przekrój przez system kolejkowania RabbitMQ. Teraz zajmę się dokładniejszym omówieniem poszczególnych elementów systemu kolejkowania, a na pierwszy ogień idą centrale wiadomości (ang. exchange).

Otóż niezależnie od języka programowania w jakim piszemy powinniśmy wiedzieć jak wygląda przetwarzanie danych przez Rabbit-a. Pozwoli nam to na bardziej świadome korzystanie z tego narzędzia. Najprostszy schemat mógłby wyglądać następująco:

Wiadomość która trafi do systemu kolejkowania trafia do centrali wiadomości, a następnie jest przypisywana (binding) do kolejki. Gdzie grzecznie czeka (lub nie), aż konsument ją zabierze do przetworzenia. Mechanizm jest prosty i elegancki, dając przy tym ogromne możliwości w zakresie tworzenia central wiadomości oraz reguł przypisujących wiadomości do kolejek.

Komunikacja z RabbitMQ

Zanim przejdziemy do samej idei central wiadomości przydadzą nam się podstawy z budowy pakietu danych. Parametry zawarte w takim pakiecie później mają wpływ na przypisanie wiadomości do odpowiedniej kolejki, ale po kolei. Komunikacja z Rabbit-em odbywa się poprzez protokół AMQP 0-9-1, który jest obsługiwany natywnie. W przypadku innych protokołów np. STOMP czy MQTT konieczne jest użycie dodatkowych pluginów. Implementację protokołu AMQP znajdziecie bez problemu dla większości popularnych języków programowanie. Budowa pojedynczej wiadomości (pakietu danych) w tym protokole wygląda następująco:

Ładunek to treść naszej wiadomości i w żaden sposób nie jest modyfikowany przez system kolejkowania. Więc jeśli chcemy wiedzieć np. z jakiej aplikacji wiadomość pochodzi, to informację tą powinniśmy zawrzeć w treści wiadomości. Powszechną praktyką jest przesyłanie treści w formie zserializowanej np. w formacie JSON.

Atrybuty są to dodatkowe elementy, które pozwalają na zidentyfikowanie odbiorców oraz sposobu ich dostarczenia. Atrybuty są ustawiane w momencie publikacji wiadomości, a następnie usuwane po ich wykorzystaniu na potrzeby routingu. Możemy wyróżnić następujące atrybuty wiadomości:

  • content type - typ danych,
  • content encoding - kodowanie danych,
  • routing key - klucz routing-u,
  • delivery mode - sposób dostarczenia wiadomości określający, czy wiadomość ma być utrwalona (ang. persistent),
  • message priority - priorytet wiadomości,
  • message publishing timestamp - czas publikacji wiadomości,
  • expiration period - inaczej TTL czyli opóźnienie w dostarczeniu wiadomości podawane w milisekundach,
  • publisher application id - identyfikator producenta,

Protokół umożliwia przesyłanie dowolnego ciągu bajtów. Dzięki czemu możemy przesyłać dane w formie tekstowej, oraz dane binarne co umożliwia zapis plików. Jednak należy pamiętać, że wiadomości powinny być jak najmniejsze, gdyż będą one przechowywane w pamięci RAM. Wysyłanie dużych wiadomości np. nieskompresowanych ścieżek dźwiękowych, mających po kilkaset megabajtów spowoduje “zapchanie” systemu. Wielkość wiadomości wpływa także bezpośrednio na wydajność działania Rabbit-a więc warto zwrócić na ten aspekt uwagę.

Przesyłanie wiadomości do RabbitMQ

Przesyłając wiadomość do RabbitMQ w praktyce nie wysyłamy danych bezpośrednio do kolejki. Wiadomości trafiają najpierw do odpowiedniej centrali wiadomości, a dopiero później są przekazywane do odpowiedniej kolejki. Choć w poprzednim wpisie mogło się wydawać, że centrala wiadomości została pominięta i odwołaliśmy się bezpośrednio do kolejki. Spójrzmy jeszcze raz na kod producenta z poprzedniego wpisu:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
const RABBITMQ_HOST = 'localhost';
const RABBITMQ_PORT = '5672';
const RABBITMQ_USERNAME = 'guest';
const RABBITMQ_PASSWORD = 'guest';
const RABBITMQ_QUEUE_NAME = 'moja_kolejka';
$connection = new AMQPStreamConnection(
    RABBITMQ_HOST,
    RABBITMQ_PORT,
    RABBITMQ_USERNAME,
    RABBITMQ_PASSWORD
);
$channel = $connection->channel();
$channel->queue_declare(
    $queue = RABBITMQ_QUEUE_NAME, // nazwa kolejki
    $passive = false,             // passive
    $durable = true,              // durable
    $exclusive = false,           // exclusive
    $auto_delete = false,         // auto deete
    $nowait = false,              // nowait
    $arguments = null,            // arguments
    $ticket = null                // ticket
);
$taskId = 0;
while (true)
{
    $taskId++;
    $messageBody = 'Zadanie #'.$taskId;
    $msg = new AMQPMessage($messageBody);
    $channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
    echo $messageBody . PHP_EOL;
    sleep(1);
}

Linia odpowiedzialna za umieszczenie wiadomości w kolejce wygląda następująco:

$channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);

Mamy tutaj funkcję, której prototyp wygląda następująco:

basic_publish($msg, $exchange, $queue)

gdzie:

$msg - to wiadomość, $exchange - nazwa centrali wiadomości, $queue - nazwa kolejki

Porównując prototyp z implementacją widzimy, że w nazwie centrali wiadomości nie podaliśmy żadnej nazwy. Jednak nie jest to błąd i nie oznacza pominięcia centrali wiadomości. Bowiem w RabbitMQ istnieje centrala wiadomości nieposiadająca nazwy (ang. nameless exchange). Przesyłając wiadomości do tej centrali możemy podać nazwę kolejki, a wiadomości zostaną do niej przesłane o ile ta kolejka istnieje.

Tworzenie własnych central wiadomości

Tworzenie własnych central wiadomości jest bardzo proste, możemy to zrobić z poziomu konsoli lub panelu administracyjnego. Ja lubię konsolę więc poniżej polecenie do wykonania z poziomu konsoli.

sudo rabbitmqadmin declare exchange name="moja_centrala" type=direct

Jak łatwo wywnioskować z treści polecenia tworząc nową centralę wiadomości podajemy dwa parametry:

name - nazwa centrali,

type - typ centrali, tutaj zatrzymamy się na chwilę i zobaczymy jakie typy centrali mamy do dyspozycji. Nie martwcie się jeśli od razu nie zrozumiecie wszystkiego, będziemy to zgłębiać później. A teraz lista dostępnych typów:

  • direct - wiadomości przesłane do centrali muszą posiadać identyczną wartość parametru routingKey, aby zostały przesłane do powiązanej kolejki,
  • topic - typ ten rozbudowuje możliwości typu direct poprzez rozszerzenie możliwości parametru routingKey. W tym typie parametr routingKey składa się ze słów rozdzielanych kropkami. Pozwala to na pewnego rodzaju filtrowanie wiadomości, ustawiając routingKey na wartość “female.blond” będziemy w stanie odfirltrować i przekazać do kolejki np. kobiety o określonym kolorze włosów,
  • headers - typ bardzo podobny do typu direct z tą różnicą, że zamiast parametru routingKey wykorzystywane są nagłówki, a routingKey jest ignorowany. Nagłówki mogą przyjmować dowolne typy danych, a nie jak to miało miejsce w typie direct tylko string. Dodatkowo mamy specjalny nagłówek o nazwie x-match, który może przyjąć dwie wartości:
    • any – wystarczy jedno dopasowanie, aby wiadomość trafiła do kolejki,
    • all – wiadomości trafią do kolejki jedynie gdy wszystkie klucze zostaną dopasowane,
  • fanout, w tym przypadku żadne parametry nie są brane pod uwagę. Wszystkie wiadomości przesłane do centrali będą przekazywane do powiązanych kolejek,

Zasady działania poszczególnych typów central wiadomości

Przy tworzeniu central wiadomości mamy do wyboru kilka typów takich central. Najlepszym sposobem na zrozumienie różnić pomiędzy nimi jest obejrzenie jak każdy z nich działa w praktyce. Pominiemy na tym etapie definiowanie powiązań (ang. binding) pomiędzy centralą, a kolejką. Na ten temat pojawi się osobny wpis pokazujący jak tworzyć kolejki oraz takie powiązania. Przejdźmy teraz do naszych central wiadomości.

Centrala wiadomości bezpośrednich (ang. direct exchange)

Bardzo często wykorzystywany typ centrali ze względu na prostotę działania. Centrala wiadomości na podstawie parametru routingKey wysyła wiadomości do określonej kolejki.

Powyższy schemat pokazuje nam producenta, który przesyła wiadomości do centrali wiadomości. Każda z przesłanych przez producenta wiadomości zawiera parametr routingKey. Centrala jest typu direct, więc na podstawie parametru routingKey przesyła wiadomości do określonej kolejki.

Centrala rozgłośni wiadomości (ang. fanout exchange)

Najprostszy typ ze wszystkich dostępnych. Jego działanie sprowadza się do wysyłania wiadomości do wszystkich aktywnych, tymczasowych kolejek stworzonych przez konsumentów.

Producent tworzy wiadomość i wysyła ją do centrali wiadomości typu fanout. Do tego typu centrali w większości przypadków będą podłączone kolejki tymczasowe tworzone przez konsumentów choć oczywiście możemy stworzyć stałe kolejki i powiązać je z centralą. W momencie, gdy wiadomość trafi do centrali tego typu jest przekazywana do wszystkich kolejek. Przy czym jeśli żadnej kolejki nie będzie to wiadomość nie będzie czekała w centrali tylko zostanie usunięta. W zawiązku z czym zastosowanie tej centrali ma sens dla wiadomości, których utrata nie będzie wiązała się z żadnymi konsekwencjami.

Centrala wiadomości z nagłówkami (ang. headers exchange)

Można powiedzieć że jest to rozbudowana wersja typu direct, z tą różnicą, że nie wykorzystywany jest parametr routingKey, a wykorzystywane są nagłówki. Dzięki wykorzystaniu nagłówków możemy stosować liczby, łańcuchy znaków oraz wyniki działania funkcji skrótu (ang. hash). Dodatkowo wykorzystując nagłówek x-match mamy kontrolę nad sposobem weryfikacji nagłówków.

Jeśli w nagłówku x-match przekażemy wartość all to wszystkie ustawione nagłówki będą musiały się zgadzać, aby wiadomość została przekazana do kolejki. Ładnie to widać na poniższym schemacie:

Do centrali trafiły różne wiadomości, jednak tylko wiadomości mające nagłówek kolor ustawiony na czerwony i nagłówek plik ustawiony na pdf trafią do kolejki.

Mniej rygorystyczny schemat działania mamy, gdy ustawimy nagłówek x-match na wartość any:

Ustawienie to powoduje, że jeśli wiadomość ma choć jeden nagłówek pasujący to taka wiadomość zostanie przekazana do kolejki.

Centrala wiadomości tematycznych (ang. topic exchange)

Ostatni typ centrali bazujący na rozwinięciu parametru routingKey. W przypadku tej centrali wiadomości parametr ten musi składać się ze słów oddzielonych kropkami. Pojedyncze słowa mogą zostać zastąpione przez:

  • * (nag. asterix), która zastąpi dokładnie jedno słowo,
  • # (ang. hash), który zastępuję dowolną liczbę słów oraz oddzielające je kropki,

Zobaczmy jak to wygląda w praktyce:

Wiadomość od producenta trafia do centrali i zawiera w sobie parametr routingKey. I w zależności od zawartości wiadomość zostanie przekazana do odpowiedniej kolejki podobnie jak to miało miejsce w typie direct. Tutaj jednak mamy większe możliwości filtrowania co widać po trzech powiązaniach zdefiniowanych na schemacie.

Pierwsze powiązanie elektronika.telefony.apple.iphonex oczekuje dokładnej wartości. Czyli praktycznie tak samo jak w typie direct.

Kolejne powiązanie elektronika.telefony.* zawiera już w sobie * (ang. asterix) zastępujące jedno słowo. Tym samym do schematu będą pasowały wiadomości z wartością parametru routingKey:

  • elektronika.telefony.apple
  • elektronika.telefony.samsung
  • elektronika.telefony.lg

Ostatnie powiązanie elektronika.telefony.apple.# zawiera w sobie znak # (ang. hash) zastępującego dowolną liczbę słów. Czyli wiadomości z poniższymi wartościami będą pasowały:

  • elektronika.telefony.apple.iphonex
  • elektronika.telefony.apple.iphonex.nowe
  • elektronika.telefony.apple.iphonex.uzywane
  • elektronika.telefony.apple.iphonse

Centrale wiadomości tego typu mają bardzo duże możliwości filtrowania. I tylko od nas zależy jak dobrze je wykorzystamy.

Podsumowanie

Mam nadzieję że przybliżyłem wam temat central wiadomości. Oczywiście wpis ten nie jest w stanie wyczerpać tematu, jednak możecie go traktować jako wprowadzenie i zgłębiać na własną rękę wiedzę w tym zakresie :) Ja tymczasem będę pracował nad kolejnym wpisem z tej serii.