Marcin Lewandowski
Marcin Lewandowski
Programista PHP ( Symfony ), blogger, trener oraz miłośnik kawy. Na co dzień pracuję z Symfony, RabbitMQ, ElasticSearch, Node.js, Redis, Docker, MySQL.

Opóźnienie powrotu wiadomości do kolejki (Dead Letter Exchange)

Opóźnienie powrotu wiadomości do kolejki (Dead Letter Exchange)

W poprzednim wpisie z serii o RabbitMQ dowiedzieliśmy się jak poprawnie obsłużyć sytuację w której konsument z jakiegoś powodu nie poradził sobie z obsłużeniem wiadomości. Powodów może być wiele, ale nie to jest najważniejsze. Najważniejsze jest to, że wiadomość bezpiecznie wróciła do kolejki i może być obsłużona ponownie. Tylko, że w praktyce oznacza to często zapętlenie się jednej operacji, bo wiadomość, która wróciła do kolejki prawdopodobnie za chwilę znów nie zostanie obsłużona poprawnie. W tym wpisie postaram się wyjaśnić co zrobić w takim przypadku i jak rozbudować nasze kolejki o mechanizm Dead Letter Exchange.

Tak może wyglądać działanie konsumenta, gdy nie można obsłużyć wiadomości, a ta wraca ciągle na kolejkę. Nieskończona pętla, tu tylko z jednym zadaniem. Pomyślmy co by się stało gdyby tych zadań było dużo więcej. Zapewne nasz serwer pracował by na najwyższych obrotach. Rozwiązaniem tej sytuacji będzie dodanie opóźnienia w powrocie wiadomości do kolejki. I tym się właśnie zajmiemy.

Środowisko testowe

Zanim przejdziemy do właściwej części potrzebujemy RabbitMQ oraz konfiguracji, która pozwoli nam na testowanie nowego rozwiązania. Do tego typu testów uważam, że Docker sprawdzi się idealnie. Dlatego jeśli go jeszcze nie macie to proszę zainstalujcie go, a następnie pobierzcie sobie repozytorium udostępnione na GitHub-ie.

Kiedy wszystko jest już zainstalowane, a repozytorium ściągnięte. Możemy przejść dalej.

Uruchamiamy konsolę i przechodzimy do katalogu projektu, a następnie uruchamiamy polecenie docker-compose up -d. Po chwili powinniśmy zobaczyć komunikat podobny do tego poniżej.

Teraz w przeglądarce przechodzimy pod adres http://localhost:15672 i powinniśmy zobaczyć okno logowania.

Logujemy się za pomocą poniższych danych.

Login: guest Hasło: guest

Po zalogowaniu naszym oczą ukaże się panel w którym możemy zarządzać kolejkami, centralami wiadomości oraz wieloma innymi elementami RabbitMQ.

Nas najbardziej interesuję zakładka Queues (kolejki) oraz Exchanges ( centrale wiadomości ). W zakładce Queues znajdziemy listę kolejek. Aktualnie powinna znajdować się tam tylko jedna kolejka reports.

Tę kolejkę wykorzystuje producent oraz konsument znajdujący się w repozytorium. Aby przetestować konsumenta i producenta potrzebujemy pobrać pakiety wymagane przez aplikację. Do tego celu wykorzystujemy Composer-a, jeśli nie macie go zainstalowanego to możecie zrobić to trzema poleceniami.

Pobranie instalatora

php -r "copy('https://getcomposer.org/installer', 'composer-setup.php');"

Instalacja

php composer-setup.php

Usunięcie instalatora

php -r "unlink('composer-setup.php');"

W katalogu powinien pojawić się plik composer.phar, który umożliwi nam instalację wymaganych pakietów. Wywołujemy polecenie:

php composer.phar install

W wyniku działania Composer-a powinniśmy zobaczyć mniej więcej takie komunikaty.

Dodatkowo w katalogu projektu pojawi się katalog vendor oraz plik composer.lock. Jeśli wszystko przebiegło prawidłowo to możemy pokusić się o uruchomienie producenta i sprawdzenie, czy wyprodukowana wiadomość trafi do kolejki.

php producer.php

Po wywołaniu powinniśmy zobaczyć informację o wyprodukowaniu pierwszego zadania.

Chcąc się upewnić, że rzeczywiście wiadomość trafiła do kolejki. Logujemy się do panelu administracyjnego i przechodzimy do zakładki Queues.

Producent wyprodukował wiadomość i przekazał ją do kolejki poprawnie. Teraz czas na test konsumenta.

php consumer.php

Uruchamiając konsumenta dostajemy ciągłe błędy przy próbie obsługi jednej wiadomości.

Błąd ten jest spowodowany nie wpisaniem poprawnego adresu URL w konfiguracji.

$config = [
    'queue' => 'reports',
    'connection' => [
        'host' => 'localhost',
        'port' => '5672',
        'username' => 'guest',
        'password' => 'guest',
    ],
    'url' => '',
];

Zmieniając adres URL na jakiś istniejący np. https://google.com spowodujemy poprawne obsłużenie wiadomości przez konsumenta.

A tym samym wiadomość znika z kolejki.

Standardowa obsługa kolejki

Widzimy tutaj bardzo prosty przepływa wiadomości.

Producent, tworzy wiadomość i dodaje ją do kolejki.

Poniżej kod producenta, który dodaje tylko jedną wiadomość z informacją o numerze zadania do kolejki reports.

<?php declare(strict_types=1);
 
require_once __DIR__ . '/vendor/autoload.php';
 
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
$config = [
    'queue' => 'reports',
    'connection' => [
        'host' => 'localhost',
        'port' => '5672',
        'username' => 'guest',
        'password' => 'guest',
    ]
];
 
$connection = new AMQPStreamConnection(
    $config['connection']['host'],
    $config['connection']['port'],
    $config['connection']['username'],
    $config['connection']['password']
);
 
$channel = $connection->channel();
 
$messageBody = 'Zadanie #1';
 
$channel->basic_publish(
    new AMQPMessage($messageBody),
    '',
    $config['queue']
);
 
echo $messageBody . PHP_EOL;

Konsument przetwarza wiadomości trafiające do kolejki

W tym przypadku, konsument będzie pobierał jakieś informacje z zewnętrznego systemu. Aby go maksymalnie uprościć, nie ma tutaj żadnej logiki oprócz sprawdzenia kodu odpowiedzi. W przypadku kodu 200 uznajemy, że wszystko się powiodło. A gdy kod jest inny to taka wiadomość powinna wrócić do kolejki.

<?php declare(strict_types=1);
 
require_once __DIR__ . '/vendor/autoload.php';
 
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
 
$config = [
    'queue' => 'reports',
    'connection' => [
        'host' => 'localhost',
        'port' => '5672',
        'username' => 'guest',
        'password' => 'guest',
    ],
    'url' => '',
];
 
$connection = new AMQPStreamConnection(
    $config['connection']['host'],
    $config['connection']['port'],
    $config['connection']['username'],
    $config['connection']['password']
);
 
$channel = $connection->channel();
 
function getUrl($url)
{
    $ch = curl_init();
 
    $optArray = [
        CURLOPT_FOLLOWLOCATION => true,
        CURLOPT_URL => $url,
        CURLOPT_RETURNTRANSFER => true
    ];
 
    curl_setopt_array($ch, $optArray);
    curl_exec($ch);
 
    $response = curl_getinfo($ch, CURLINFO_HTTP_CODE);
 
    curl_close($ch);
 
    return $response;
}
 
$callback = function(AMQPMessage $msg) use ($config)
{
    if (200 === getUrl($config['url']))
    {
        echo sprintf(
            "[%s][OK] %s \n",
            date('Y-m-d H:i:s'),
            $msg->body
        );
 
        $msg->delivery_info['channel']->basic_ack(
            $msg->delivery_info['delivery_tag']
        );
    }
    else
    {
        echo sprintf(
            "[%s][ERROR] %s \n",
            date('Y-m-d H:i:s'),
            $msg->body
        );
 
        $msg->delivery_info['channel']->basic_nack(
            $msg->delivery_info['delivery_tag'],
            false,
            false
        );
    }
 
    sleep(1);
};
 
$channel->basic_consume(
    $config['queue'],
    '',
    false,
    false,
    false,
    false,
    $callback
);
 
while (count($channel->callbacks))
{
    $channel->wait();
}

Operacja przetwarzania wiadomości może zakończyć się na dwa sposoby:

  • powodzeniem – wysyłana jest informacja do kolejki o powodzeniu przetwarzania co powoduje usunięcie wiadomości z kolejki,
  • niepowodzeniem – wysyłana jest informacja o niepowodzeniu, a wiadomość wraca do kolejki

Na potrzeby testów w konfiguracji mamy klucz url.

$config = [
    'queue' => 'reports',
    'connection' => [
        'host' => 'localhost',
        'port' => '5672',
        'username' => 'guest',
        'password' => 'guest',
    ],
    'url' => '',
];

Na podstawie wywołania adresu URL znajdującego się pod kluczem url będziemy określali czy operacja przetwarzania wiadomości się powiodła czy nie. Odpowiada za to poniższy fragment kodu.

if (200 === getUrl($config['url']))

Domyślna konfiguracja, czyli brak adresu URL powoduje generowanie błędów. Dopiero wpisanie poprawny adres URL spowoduje, że wiadomość zostanie obsłużona poprawnie. Jednak do naszych celów ta konfiguracja jest idealna. Z dwóch powodów, pierwszy to ciągłe generowanie błędu w konsumencie. Dzięki takiemu podejściu nie będziemy musieli uruchamiać wiele razy producenta czy też konsumenta, aby śledzić obsługę wiadomości. Drugi to możliwość śledzenia czasu po jakim wiadomość została obsłużona. I powód drugi jest chyba nawet ważniejszy od pierwszego. Przecież chcemy zobaczyć, że opóźnienie rzeczywiście nastąpiło.

Dead Letter Exchanges

Mamy odpowiednie środowisko testowe i wiemy jak wygląda standardowy przepływ wiadomości. Czas na wdrożenie mechanizmu, który pozwoli na dodanie opóźnień w przesyłaniu wiadomości do kolejki. Mechanizm o którym mowa nazywa się Dead Letter Exchanges. Sam w sobie nie ma on możliwości dodania żadnego opóźnienia, ale pozwala na przesłanie wiadomości, która np. nie została przetworzona poprawnie do jakiejś centrali wiadomości (ang. exchange). W dalszych krokach centrala może umieścić wiadomość w kolejce, która po określonym czasie automatycznie usuwa wiadomość (tu mamy nasze opóźnienie). Kolejka opóźniająca także ma ustawiony parametr Dead Letter Exchanges wskazując z kolei na centralę wiadomości, która przesyła ponownie wiadomość do głównej kolejki.

Wydaje się skomplikowane ?? Nie martw się, jest banalnie proste ;)

Zacznijmy od małej wizualizacji.

Część wygląda znajomo, prawda ?? Mamy producenta, który dodaje wiadomość do kolejki reports. Kolejka ma ustawiony parametr x-dead-letter-exchange wskazujący na centralę wiadomości ex_reports_ttl. To do tej centrali trafi wiadomość, gdy nasz konsument wyśle informację do kolejki o niepowodzeniu w przetwarzaniu wiadomości.

Kolejka już wie do jakiej centrali wiadomości ( ang. exchange ) skierować błędnie przetworzoną wiadomość. Teraz potrzebujemy powiedzieć centrali co ma z takimi wiadomościami robić bo sama centrala służy jedynie przekazywaniu wiadomości dalej. W naszym przypadku chcemy, aby wiadomość została opóźniona np. o 30 sek., a następnie ponownie wróciła do kolejki. Do realizacji tego zadania będzie potrzebna dodatkowa kolejka. Nową kolejkę nazwę reports_ttl i ustawię w niej automatyczne usuwanie wiadomości po określonym czasie.

Gdy zdefiniowany czas w parametrze x-message-ttl dla wiadomości się skończy. Możemy wykorzystać znany już nam parametr x-dead-letter-exchange, aby taką wiadomość przesłać dalej.

Tak ustawioną kolejkę łączymy z centralą wiadomości ex_reports_ttl. W rezultacie da nam to taki bardzo ładny łańcuszek połączeń.

W łańcuszku tym brakuje jedynie ostatniego połączenia, czyli centrali wiadomości, która po przekroczeniu czasu w kolejce reports_ttl wyśle wiadomość ponownie do początkowej kolejki.

Dodajemy centralę wiadomości ex_reports typu fanout, którą w kolejce reports_ttl ustawiamy jako x-dead-letter-exchange co gwarantuje nam przekazanie do niej wiadomości po 30 sekundach. A z drugiej strony w samej centrali ustawiamy przekazywanie wszystkich wiadomości do kolejki reports.

Widać, że czasy tutaj zwiększają się co 30s. czyli dokładnie o tyle ile wynosi opóźnienie na zdefiniowane w parametrze x-message-ttl.

Konfiguracja kolejek z wykorzystaniem Dead Letter Exchanges

W poprzednim rozdziale omówiliśmy jak działa mechanizm Dead Letter Exchanges i jak możemy go wykorzystać do opóźnienia powrotu wiadomości do kolejki. Teraz czas pokazać jak możemy skonfigurować to wszystko sami.

Automatyczna konfiguracja z wykorzystaniem pliku definitions.json

Jako że jestem leniwym człowiekiem i lubię automatyzować procesy, to powstawanie takiej struktury także zautomatyzowałem. Automatyzacja ta jest możliwa dzięki plugin-owi management, który pozwala na zdefiniowanie listy kolejek, central wiadomości oraz połączeń pomiędzy nimi w pliku definitions.json.

Plik z definicją kolejki głównej opóźniającej oraz central wiadomości dla nich.

{
  "users": [
    {
      "name": "guest",
      "password": "guest",
      "tags": "administrator"
    }
  ],
  "vhosts":[
    {
      "name":"/"
    }
  ],
  "permissions": [
    {
      "user": "guest",
      "vhost": "/",
      "configure": ".*",
      "write": ".*",
      "read": ".*"
    }
  ],
  "parameters": [],
  "policies": [],
  "queues":[
    {
      "name":"reports",
      "vhost":"/",
      "durable":true,
      "auto_delete":false,
      "arguments":{
        "x-dead-letter-exchange": "ex_reports_ttl"
      }
    },
    {
      "name":"reports_ttl",
      "vhost":"/",
      "durable":true,
      "auto_delete":false,
      "arguments":{
        "x-dead-letter-exchange": "ex_reports",
        "x-message-ttl": 30000
      }
    }
  ],
  "exchanges": [
    {
      "name": "ex_reports_ttl",
      "type": "fanout",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    },
    {
      "name": "ex_reports",
      "type": "fanout",
      "vhost": "/",
      "durable": true,
      "auto_delete": false,
      "internal": false,
      "arguments": {}
    }
  ],
  "bindings": [
    {
      "source": "ex_reports_ttl",
      "vhost": "/",
      "destination": "reports_ttl",
      "destination_type": "queue",
      "routing_key": "*",
      "arguments": {}
    },
    {
      "source": "ex_reports",
      "vhost": "/",
      "destination": "reports",
      "destination_type": "queue",
      "routing_key": "*",
      "arguments": {}
    }
  ]
}

I taki plik oraz odpowiednie zapisy zawarłem w gałęzi o nazwie v2 w repozytorium, które udostępniłem.

Na gałąź możemy przełączyć się wykonując poniższe polecenie:

git checkout v2

Po przełączeniu konieczne będzie zatrzymanie kontenera Docker-a.

docker-compose down

A następnie uruchamiamy wszystko poleceniem.

docker-compose up -d

Po uruchomieniu w zakładce Queues powinniśmy zobaczyć dwie kolejki.

Zakładka Exchanges także powinna zawierać dwie centrale wiadomości.

Ręczne tworzenie kolejek, central wiadomości oraz relacji pomiędzy nimi

Ręczne tworzenie wszystkiego ma pewne zalety, a mianowicie pozwala na lepsze zrozumienie tego co tworzymy. Kiedy już mamy odpowiednią wiedzę i doświadczenie, to można korzystać z automatyzacji. Dlatego przejdziemy teraz krok po kroku definiowanie wszystkiego ręcznie.

Zacznijmy od podstaw czyli co będzie nam potrzebne.

Bedą nam potrzebne dwie centrale wiadomości i dwie kolejki. Centrale będą przekierowywały do określonych kolejek. I od tego zacznijmy. Zdefiniujemy na początek centrale wiadomości.

Zaczynamy od centrali ex_reports_ttl

Druga centrala ex_reports

Po dodaniu obu central na liście powinniśmy znaleźć nasze nowo dodane centrale.

Mając centralę potrzebujemy kolejki, aby móc później je połączyć.

Dodajemy kolejkę reports

Dodajemy kolejkę reports_ttl

Po dodaniu kolejek nasza lista powinna wyglądać jak poniżej.

Teraz pozostaje jedynie połączyć kolejki z centralami i konfiguracja jest gotowa. W tym celu musimy wejść w szczegóły kolejki, robimy to klikając w nazwę kolejki reports.

Po dodaniu powiązania powinniśmy je zobaczyć w szczegółach.

Identyczną operację wykonujemy w kolejce reports_ttl.

Po dodaniu powiązania powinniśmy je zobaczyć w szczegółach.

Tak zdefiniowane centrale wiadomości, kolejki i relacje pozwolą opóźniać powrót do kolejki wiadomości, która nie została obsłużona poprawnie. Wystarczy teraz uruchomić producenta.

php producer.php

A następnie konsumenta.

php consumer.php

Aby zobaczyć taki efekt.

Podsumowanie

We wpisie wykorzystaliśmy mechanizm Dead Letter Exchanges, aby stworzyć mechanizm opóźniający potwór wiadomości do kolejki po braku możliwości obsłużenia wiadomości przez serwis zewnętrzny. Rozwiązanie to jest bardzo proste i nie wymaga od nas praktycznie żadnych zmian w kodzie naszej aplikacji. Wadą tego rozwiązania w takiej formie jest brak limitu prób, co przy większej ilości wiadomości może powodować problemy. Ten problem rozwiążemy w kolejnym wpisie :)