Marcin Lewandowski
Marcin Lewandowski
Programista PHP ( Symfony ), blogger, trener oraz miłośnik kawy. Na co dzień pracuję z Symfony, RabbitMQ, ElasticSearch, Node.js, Redis, Docker, MySQL.

Potwierdzanie wiadomości w RabbitMQ

Potwierdzanie wiadomości w RabbitMQ

W ostatnim wpisie stworzyliśmy prostego producenta oraz konsumenta wiadomości. Wszystko działało prawidłowo, jednak nasze aplikacje były bardzo proste oraz ich działanie nie było uzależnione od żadnych aplikacji zewnętrznych, co nie zawsze będzie prawdą. Dlatego warto przygotować się na najgorsze scenariusze. A takim scenariuszem może być sytuacja, w której podczas przetwarzania wiadomości przez konsumenta nastąpi jego niespodziewane zakończenie i nie zdąży on poprawnie przetworzy otrzymanej wiadomości. Co zrobić w takim przypadku ??

Odpowiedzią na to pytanie jest mechanizm potwierdzeń. Dzięki niemu możemy powiedzieć RabbitMQ, że przetworzyliśmy wiadomość i można ją usunąć z kolejki. W tym celu będziemy musieli delikatnie zmodyfikować nasz kod konsumenta.

Zaczynamy od pobrania kodu z GitHub-a.

Dodajemy błąd do konsumenta

Zacznijmy od końca, dodamy błąd do funkcji, która jest odpowiedzialna za przetwarzanie wiadomości w kolejce.

W tym celu modyfikujemy funkcję anonimową, która jest wywoływana dla każdej wiadomości w kolejce.

$callback = function($msg) {
    echo " [x] Received: ", $msg->body, "\n";
    throw new Exception('Błąd przetwarzania wiadomości');
    sleep(1);
};

Do funkcji dodaliśmy rzucenie wyjątku throw new Exception('Błąd przetwarzania wiadomości'); co ma symulować błąd np. zewnętrznej aplikacji lub jakiś błąd w kodzie.

Zobaczmy jak zadziała nasz zmodyfikowany kod. Dla ułatwienia dodałem już do kolejki jedną wiadomość wykorzystując naszego producenta.

Mając wiadomości w kolejce możemy uruchomić konsumenta z błędem i zobaczyć co się wydarzy.

php consumer.php

W wyniku działania powyższego polecenia otrzymamy błąd w konsoli. Co nie jest dla nas żadnym zaskoczeniem, przecież sami go tam umieściliśmy ;)

Oczywiście Stack trace jest długi i pokazuje dokładnie co się wydarzyło, ale pokazywanie go w tym przypadku nie ma żadnego sensu. Wiemy gdzie jest błąd, ciekawsze jest to jak wygląda kolejka po wywołaniu konsumenta.

Widać wyraźnie moment pobrania wiadomości z kolejki i pomimo wystąpienia błędu wiadomość ta niestety do kolejki nie wróciła :( I tu jest problem, co gdy będziemy musieli odpytać jakiś serwis zewnętrzny o informację, a on będzie niedostępny ?? Przecież nie powiemy klientowi, że ma jeszcze raz złożyć zamówienie bo nie przetworzyliśmy jego płatności bo strona z płatnościami była niedostępna. Mam nadzieję, że czujesz już problem. Jeśli nie, to zobacz jeszcze jeden przykład.

Tym razem mamy w kolejce 10 wiadomości, wyprodukowanych przez naszego producenta.

Jak myślisz, co się stanie po uruchomieniu konsumenta z błędem ??

php consumer.php

Wynik w pierwszym momencie może być zaskakujący, jak to wszystkie wiadomości zostały pobrane z kolejki i przetworzone ?? Przecież już podczas przetwarzania pierwszej wiadomości wystąpił błąd !!! Zgadza się, ale to w cale nie oznacza, że konsument nie będzie przetwarzał nowych wiadomości. W ten sposób można stracić wszystkie dane przesyłane przez producentów, a to było by niewybaczalne.

Włączamy potwierdzanie wiadomości

Potwierdzanie wiadomości włącza się przy wywołaniu metody basic_consume. Obecnie wygląda to następująco:

$channel->basic_consume(
    RABBITMQ_QUEUE_NAME, // queue
    '',                  // consumer_tag
    false,               // no_local
    true,                // no_ack
    false,               // exclusive
    false,               // nowait
    $callback
);

My potrzebujemy zmienić parametr no_ack z wartości true na false.

$channel->basic_consume(
    RABBITMQ_QUEUE_NAME, // queue
    '',                  // consumer_tag
    false,               // no_local
    false,               // no_ack
    false,               // exclusive
    false,               // nowait
    $callback
);

Już taka zmiana zabezpiecza nas przed usunięciem wiadomości, która nie została potwierdzona. Czyli nasz konsument już teraz nie będzie usuwał wiadomości z kolejki. Zobaczmy, czy tak będzie w rzeczywistości. Dodajemy wiadomość do kolejki.

Wywołujemy naszego konsumenta, który posiada nadal błąd, ale teraz wymaga potwierdzania wiadomości.

php consumer.php

I co się wydarzyło ?? Widać, że konsument został wywołany, ale wiadomość z kolejki nie zniknęła. Czyli dokładnie o to nam chodziło. Teraz pozostaje tylko dodać potwierdzanie w naszej funkcji przetwarzającej wiadomości. Niestety w PHP nie jest to zbyt eleganckie, ale co możemy zrobić :(

Dodajemy poniższy fragment kodu:

$msg->delivery_info['channel']->basic_ack(
    $msg->delivery_info['delivery_tag']
);

W zależności od miejsca jej dodania potwierdzanie będzie działało, lub nie ;) My chcemy, aby potwierdzanie zaczęło działać więc dodamy ją przed wystąpieniem błędu.

$callback = function($msg) {
    echo " [x] Received: ", $msg->body, ", counter: ", $counter, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    throw new Exception('Błąd podczas przetwarzania wiadomości');
    sleep(1);
};

To znowu powoduje, że pomimo wystąpienia błędu wszystkie wiadomości zostaną przetworzone. Więc, aby lepiej oddać losowość wystąpienia błędu i w zależności od jego wystąpienia lub nie potwierdzać wiadomość. Dodamy sobie zmienną $counter, która będzie liczyła ilość wywołań funkcji i co drugie wywołanie będzie rzucała wyjątek.

Dodajemy zmienną $counter i przekazujemy ją przez referencję do funkcji.

$counter = 0;
 
$callback = function($msg) use (&$counter) {
    // zawartość funkcji
};

Teraz wypadało by uzależnić wystąpienie wyjątku od tego czy podzielenie wartości licznika daje nam resztę.

$callback = function($msg) use (&$counter) {
     
    $counter++;
 
    if ($counter % 2)
    {
        throw new Exception('Błąd podczas przetwarzania wiadomości');
    }
     
    echo " [x] Received: ", $msg->body, ", counter: ", $counter, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
 
    sleep(1);
};

Ostatni szlif i gotowe. Przechwytujemy wyjątek, zasadniczo nie musimy tego robić, ale dzięki temu czytelność z poziomu konsoli wzrośnie ;)

$callback = function($msg) use (&$counter) {
 
    try
    {
        if ($counter % 2)
        {
            throw new Exception('Błąd podczas przetwarzania wiadomości');
        }
 
        echo " [x] Received: ", $msg->body, ", counter: ", $counter, ", Error: false", "\n";
 
        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
    catch (Exception $e)
    {
        echo " [x] Received: ", $msg->body, ", counter: ", $counter, ", Error: true", "\n";
    }
 
    $counter++;
 
    sleep(1);
};

Od teraz nasz w konsoli będziemy widzieli czy wystąpił błąd podczas przetwarzania danych przez konsumenta. Jeśli taki błąd wystąpił to nie potwierdzamy przetworzenia wiadomości. Co skutkuje jej pozostaniem w kolejce.

Wprawne oko zauważy, że wiadomości które zostały pobrane przez konsumenta, ale nie zostały potwierdzone nie mają już statusu Ready, ale Unacked. Co w praktyce oznacza, że te wiadomości nie zostaną przetworzone dopóki konsument nie zakończy połączenia z kanałem. Jest to o tyle istotne, że startując dodatkowego konsumenta, on także nie będzie mógł przetworzyć tych wiadomości. Co w takim razie zrobić ??

Informowanie o błędzie przetwarzania

Chcąc jak najbardziej poprawnie obsłużyć przetwarzanie wiadomości, musimy dodać informowanie kolejki o wystąpieniu błędu przetwarzania. Robimy to po to, aby wiadomość mogła zostać przetworzona ponownie przez konsumenta.

W naszym przypadku wywołujemy metodę basic_nack. Całe wywołanie będzie wyglądało następująco.

$msg->delivery_info['channel']->basic_nack(
    $msg->delivery_info['delivery_tag']
);

I zapewne wiesz już gdzie dodajemy tę linijkę :) Tak, dokładnie w sekcji catch.

catch (Exception $e)
{
    echo "[x] Received: ", $msg->body, ", counter: ", $counter, ", Error: true", "\n";
    $msg->delivery_info['channel']->basic_nack(
        $msg->delivery_info['delivery_tag']
    );
}

I teraz zobaczmy różnicę pomiędzy sytuacją w której nie informujemy kolejki o błędnym przetworzeniu wiadomości. A sytuacją w kto®ej ta informacja jest wysyłana do kolejki.

Nie informujemy kolejki o niepowodzeniu.

Po zatrzymaniu producenta, konsument nie wraca do nieobsłużonych wiadomości. Będą one tam czekały na zakończenie połączenia przez konsumenta.

Informujemy kolejkę o niepowodzeniu.

Tutaj konsument obsłużył wszystkie wiadomości. Nic nie zostało w kolejce.

Podsumowanie

Ten wpis pokazał Ci, że brak potwierdzania obsłużenia wiadomości może spowodować bardzo niemiłe konsekwencje utraty danych. Oczywiście znajdziemy sytuację, w których potwierdzanie obsłużenia wiadomości może nie być potrzebne. Mimo to uważam, że warto wiedzieć jak poprawnie obsługiwać wiadomości. W większości przypadków będziemy chcieli mieć pewność, że wiadomość została obsłużona poprawnie i możemy przejść do kolejnej ;)

W razie, gdyby coś było niejasne to piszcie w komentarzach, a postaram się rozwić wszelkie wątpliwości.