W ostatnim wpisie stworzyliśmy prostego producenta oraz konsumenta wiadomości. Wszystko działało prawidłowo, jednak nasze aplikacje były bardzo proste oraz ich działanie nie było uzależnione od żadnych aplikacji zewnętrznych, co nie zawsze będzie prawdą. Dlatego warto przygotować się na najgorsze scenariusze. A takim scenariuszem może być sytuacja, w której podczas przetwarzania wiadomości przez konsumenta nastąpi jego niespodziewane zakończenie i nie zdąży on poprawnie przetworzy otrzymanej wiadomości. Co zrobić w takim przypadku ??

Odpowiedzią na to pytanie jest mechanizm potwierdzeń. Dzięki niemu możemy powiedzieć RabbitMQ, że przetworzyliśmy wiadomość i można ją usunąć z kolejki. W tym celu będziemy musieli delikatnie zmodyfikować nasz kod konsumenta.

Zaczynamy od pobrania kodu z GitHub-a.

https://github.com/elem84/rabbitmq.git

Dodajemy błąd do konsumenta

Zacznijmy od końca, dodamy błąd do funkcji, która jest odpowiedzialna za przetwarzanie wiadomości w kolejce.

W tym celu modyfikujemy funkcję anonimową, która jest wywoływana dla każdej wiadomości w kolejce.

$callback = function($msg) {
    echo " [x] Received: ", $msg->body, "\n";
    throw new Exception('Błąd przetwarzania wiadomości');
    sleep(1);
};

Do funkcji dodaliśmy rzucenie wyjątku throw new Exception('Błąd przetwarzania wiadomości'); co ma symulować błąd np. zewnętrznej aplikacji lub jakiś błąd w kodzie.

Zobaczmy jak zadziała nasz zmodyfikowany kod. Dla ułatwienia dodałem już do kolejki jedną wiadomość wykorzystując naszego producenta.

Mając wiadomości w kolejce możemy uruchomić konsumenta z błędem i zobaczyć co się wydarzy.

php consumer.php

W wyniku działania powyższego polecenia otrzymamy błąd w konsoli. Co nie jest dla nas żadnym zaskoczeniem, przecież sami go tam umieściliśmy 😉

Oczywiście Stack trace jest długi i pokazuje dokładnie co się wydarzyło, ale pokazywanie go w tym przypadku nie ma żadnego sensu. Wiemy gdzie jest błąd, ciekawsze jest to jak wygląda kolejka po wywołaniu konsumenta.

Widać wyraźnie moment pobrania wiadomości z kolejki i pomimo wystąpienia błędu wiadomość ta niestety do kolejki nie wróciła 🙁 I tu jest problem, co gdy będziemy musieli odpytać jakiś serwis zewnętrzny o informację, a on będzie niedostępny ?? Przecież nie powiemy klientowi, że ma jeszcze raz złożyć zamówienie bo nie przetworzyliśmy jego płatności bo strona z płatnościami była niedostępna. Mam nadzieję, że czujesz już problem. Jeśli nie, to zobacz jeszcze jeden przykład.

Tym razem mamy w kolejce 10 wiadomości, wyprodukowanych przez naszego producenta.

Jak myślisz, co się stanie po uruchomieniu konsumenta z błędem ??

php consumer.php

Wynik w pierwszym momencie może być zaskakujący, jak to wszystkie wiadomości zostały pobrane z kolejki i przetworzone ?? Przecież już podczas przetwarzania pierwszej wiadomości wystąpił błąd !!! Zgadza się, ale to w cale nie oznacza, że konsument nie będzie przetwarzał nowych wiadomości. W ten sposób można stracić wszystkie dane przesyłane przez producentów, a to było by niewybaczalne.

Włączamy potwierdzanie wiadomości

Potwierdzanie wiadomości włącza się przy wywołaniu metody basic_consume. Obecnie wygląda to następująco:

$channel->basic_consume(
    RABBITMQ_QUEUE_NAME, // queue
    '',                  // consumer_tag
    false,               // no_local
    true,                // no_ack
    false,               // exclusive
    false,               // nowait
    $callback
);

My potrzebujemy zmienić parametr no_ack z wartości true na false.

$channel->basic_consume(
    RABBITMQ_QUEUE_NAME, // queue
    '',                  // consumer_tag
    false,               // no_local
    false,               // no_ack
    false,               // exclusive
    false,               // nowait
    $callback
);

Już taka zmiana zabezpiecza nas przed usunięciem wiadomości, która nie została potwierdzona. Czyli nasz konsument już teraz nie będzie usuwał wiadomości z kolejki. Zobaczmy, czy tak będzie w rzeczywistości. Dodajemy wiadomość do kolejki.

Wywołujemy naszego konsumenta, który posiada nadal błąd, ale teraz wymaga potwierdzania wiadomości.

php consumer.php

I co się wydarzyło ?? Widać, że konsument został wywołany, ale wiadomość z kolejki nie zniknęła. Czyli dokładnie o to nam chodziło. Teraz pozostaje tylko dodać potwierdzanie w naszej funkcji przetwarzającej wiadomości. Niestety w PHP nie jest to zbyt eleganckie, ale co możemy zrobić 🙁

Dodajemy poniższy fragment kodu:

$msg->delivery_info['channel']->basic_ack(
    $msg->delivery_info['delivery_tag']
);

W zależności od miejsca jej dodania potwierdzanie będzie działało, lub nie 😉 My chcemy, aby potwierdzanie zaczęło działać więc dodamy ją przed wystąpieniem błędu.

$callback = function($msg) {
    echo " [x] Received: ", $msg->body, ", counter: ", $counter, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    throw new Exception('Błąd podczas przetwarzania wiadomości');
    sleep(1);
};

To znowu powoduje, że pomimo wystąpienia błędu wszystkie wiadomości zostaną przetworzone. Więc, aby lepiej oddać losowość wystąpienia błędu i w zależności od jego wystąpienia lub nie potwierdzać wiadomość. Dodamy sobie zmienną $counter, która będzie liczyła ilość wywołań funkcji i co drugie wywołanie będzie rzucała wyjątek.

Dodajemy zmienną $counter i przekazujemy ją przez referencję do funkcji.

$counter = 0;

$callback = function($msg) use (&$counter) {
    // zawartość funkcji
};

Teraz wypadało by uzależnić wystąpienie wyjątku od tego czy podzielenie wartości licznika daje nam resztę.

$callback = function($msg) use (&$counter) {
    
    $counter++;

    if ($counter % 2)
    {
        throw new Exception('Błąd podczas przetwarzania wiadomości');
    }
    
    echo " [x] Received: ", $msg->body, ", counter: ", $counter, "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

    sleep(1);
};

Ostatni szlif i gotowe. Przechwytujemy wyjątek, zasadniczo nie musimy tego robić, ale dzięki temu czytelność z poziomu konsoli wzrośnie 😉

$callback = function($msg) use (&$counter) {

    try 
    {
        if ($counter % 2)
        {
            throw new Exception('Błąd podczas przetwarzania wiadomości');
        }

        echo " [x] Received: ", $msg->body, ", counter: ", $counter, ", Error: false", "\n";

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
    }
    catch (Exception $e)
    {
        echo " [x] Received: ", $msg->body, ", counter: ", $counter, ", Error: true", "\n";
    }

    $counter++;

    sleep(1);
};

Od teraz nasz w konsoli będziemy widzieli czy wystąpił błąd podczas przetwarzania danych przez konsumenta. Jeśli taki błąd wystąpił to nie potwierdzamy przetworzenia wiadomości. Co skutkuje jej pozostaniem w kolejce.

Wprawne oko zauważy, że wiadomości które zostały pobrane przez konsumenta, ale nie zostały potwierdzone nie mają już statusu Ready, ale Unacked. Co w praktyce oznacza, że te wiadomości nie zostaną przetworzone dopóki konsument nie zakończy połączenia z kanałem. Jest to o tyle istotne, że startując dodatkowego konsumenta, on także nie będzie mógł przetworzyć tych wiadomości. Co w takim razie zrobić ??

Informowanie o błędzie przetwarzania

Chcąc jak najbardziej poprawnie obsłużyć przetwarzanie wiadomości, musimy dodać informowanie kolejki o wystąpieniu błędu przetwarzania. Robimy to po to, aby wiadomość mogła zostać przetworzona ponownie przez konsumenta.

W naszym przypadku wywołujemy metodę basic_nack. Całe wywołanie będzie wyglądało następująco.

$msg->delivery_info['channel']->basic_nack(
    $msg->delivery_info['delivery_tag']
);

I zapewne wiesz już gdzie dodajemy tę linijkę 🙂 Tak, dokładnie w sekcji catch.

catch (Exception $e)
{
    echo "[x] Received: ", $msg->body, ", counter: ", $counter, ", Error: true", "\n";
    $msg->delivery_info['channel']->basic_nack(
        $msg->delivery_info['delivery_tag']
    );
}

I teraz zobaczmy różnicę pomiędzy sytuacją w której nie informujemy kolejki o błędnym przetworzeniu wiadomości. A sytuacją w kto®ej ta informacja jest wysyłana do kolejki.

Nie informujemy kolejki o niepowodzeniu.

Po zatrzymaniu producenta, konsument nie wraca do nieobsłużonych wiadomości. Będą one tam czekały na zakończenie połączenia przez konsumenta.

Informujemy kolejkę o niepowodzeniu.

Tutaj konsument obsłużył wszystkie wiadomości. Nic nie zostało w kolejce.

Podsumowanie

Ten wpis pokazał Ci, że brak potwierdzania obsłużenia wiadomości może spowodować bardzo niemiłe konsekwencje utraty danych. Oczywiście znajdziemy sytuację, w których potwierdzanie obsłużenia wiadomości może nie być potrzebne. Mimo to uważam, że warto wiedzieć jak poprawnie obsługiwać wiadomości. W większości przypadków będziemy chcieli mieć pewność, że wiadomość została obsłużona poprawnie i możemy przejść do kolejnej 😉

W razie, gdyby coś było niejasne to piszcie w komentarzach, a postaram się rozwić wszelkie wątpliwości.

9 Comments

  1. Dzięki za artykuł, jest bardzo pomocny!
    Rabbit ma wiele możliwości, ale osiągnięcie niektórych scenariuszy bez użycia frameworków jest dość ciężkie.
    Ostatnio próbuję zastosować wzorzec z przekazaniem niepotwierdzonych wiadomości do deadletta i niestety nie specjalnie mi to wychodzi.
    Może mógłbyś opisać taki scenariusz?:)

    • Oto chodzi, żeby pomagał 🙂 Więc cieszę się, że był pomocny.

      Masz rację, że przy pomocy frameworków można wycisnąć więcej z RabbitMQ. Ale dotyczy to praktycznie wszystkich rozwiązań 😉 Zaproponowany przez Ciebie temat jest bardzo fajny i kolejny wpis z tej serii będzie opisywał taki scenariusz. Niestety nie wiem jak szybko będę w stanie taki wpis przygotować. Ale postaram się jak najszybciej.

  2. Cześć,
    jak obsłużyć wiadomości wysłane przez producera do Rabbita w momencie, gdy consumer jest offline? Mi po 60 sec znikają z kolejki…

    • Wszystko zależy od ustawień kolejki lub samej wiadomości. Jeśli masz ustawiony TTL na 60 sek. to wiadomości będą usuwane z kolejki po tym czasie. Przyjrzyj się opisowi w dokumentacji https://www.rabbitmq.com/ttl.html

      • Mam kolejkę work.queue oraz retry.queue. Do kolejki retry trafiają zadania, które zostały w błędny sposób przetworzone w kolejce work (oznacza je jako neck). Kolejka retry co 60sec ponownie kolejkuje zadania do work. Dzieje się to tylko w przypadku, gdy zadanie oznaczone jest jako rejected. Gdy consumer prsetwarzajacy kolejkę work jest wyłączony, zadanie trafia do kolejki retry że statusem expired, jednak zadanie to zamiast zostać na nowo dodane do kolejki work, po 60sek usuwa się z retry i ślad po nim zaginął.
        Masz jakiś pomysł? Chyba, że niezbyt uważnie czytam dok.

        • Masz szczęście, właśnie kończę wpis opisujący cały proces, który chcesz zrobić. Wpis pojawi się w poniedziałek na blogu 🙂
          A w skrócie powinno to wyglądać tak, masz kolejkę work.queue, która ma ustawiony parametr x-dead-letter-exchange na exchane, który przekierowuje wiadomości do kolejki retry.queue W kolejce retry.queue masz ustawiony parametr x-message-ttl: 60000 oraz x-dead-letter-exchange skierowany na exchange wskazujący na work.queue. I teraz w zależności od tego w czym piszesz w głównej kolejce w razie błędu obsługi powiadamiasz o tym kolejkę nack lub reject. Przy czym parametr requeue musi być ustawiony na false.

          • Czy jest możliwość sprawdzenia przed publishem wiadomości przez producera, czy consumer obsługujący kolejkę przetwarza kolejkę? Chcę Rabbitem obsłużyć akcje użytkownika na froncie, gdzie przy zapisie leci zadanie do Rabbita, które za zadanie ma zaktualizować wpis w zewnętrznym systemie. Aktualnie dodaje zadanie do kolejki i…czeka sobie na przetworzenie. Jednak użytkownik na froncie chciałby mieć informacje w max 30 sekund. Jakieś porady/pomysł?

          • Marcin Lewandowski

            Cześć, rozumiem do czego dążysz i możliwe jest osiągnięcie tego poprzez odpowiednie mechanizmy RabbitMQ – Consumer Acknowledgements and Publisher Confirms. Jest to mechanizm potwierdzania przetworzenia wiadomości dla producenta, opisanie tego mechanizmu w osobnym wpisie świetnie uzupełni serię 🙂 Niestety najszybciej ukaże się za miesiąc, ale na pewno się ukaże !!!

Leave a Reply

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

Close