İçeriğe geç
Muhammet Şafak
Araçlar & Teknolojiler 3 dk okuma

RabbitMQ (PHP ile)

PHP ile RabbitMQ kullanımı: mesaj kuyruğu oluşturma, mesaj gönderme (publish) ve tüketme (consume).


Bir HTTP isteği bitmeden kullanıcıyı bekleten her işlem — toplu e-posta gönderimi, görsel dönüştürme, dış API çağrısı — aslında kuyruğa alınmayı bekleyen bir adaydır. RabbitMQ, bu iş yükünü üreten ve işleyen tarafları birbirinden ayıran, AMQP protokolü üzerine inşa edilmiş açık kaynaklı bir message broker’dır. Erlang ile yazılmıştır; bu seçim tesadüf değil: Erlang’ın eşzamanlılık modeli, binlerce bağlantıyı düşük kaynak tüketimiyle yönetmek için biçilmiş kaftan.

Bu yazıda PHP tarafındaki temel akışı — bir producer ve bir consumer — elle kuracağız. Amacım soyutlamanın arkasındaki mekanik görünür hale getirmek; production’da Laravel Queue veya Symfony Messenger arkasına gizlenecek olsa bile temel kavramların oturması önemli.

Ortamı Ayağa Kaldırmak

Geliştirme ortamı için RabbitMQ’yu Docker ile başlatmak en hızlı yol:

docker run -d --name test_rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:latest

5672 AMQP trafiği için, 15672 ise yönetim arayüzü için. http://localhost:15672 adresine guest/guest ile bağlandığınızda kuyruk durumunu, mesaj akışını ve bağlı consumer’ları gerçek zamanlı izleyebilirsiniz — ileride hata ayıklarken bu panel çok işe yarar.

PHP Tarafı: php-amqplib

PHP’nin RabbitMQ ile konuşması için php-amqplib/php-amqplib paketi de facto standart:

composer require php-amqplib/php-amqplib

Paketin MBString ve BCMath PHP eklentilerine bağımlılığı var; modern bir PHP kurulumunda bunlar zaten etkin gelir.

Producer: Mesajı Kuyruğa Göndermek

Producer, işi tanımlayan taraftır. Aşağıdaki örnek her saniye kuyruğa yeni bir iş mesajı gönderir:

<?php
require_once(__DIR__ . '/vendor/autoload.php');

const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

$channel = $connection->channel();

// Kuyruk yoksa oluştur!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);

$jobId = 0;

while (true) {
    $jobData = [
        "id"        => ++$jobId,
        "task"      => "sleep",
        "period"    => rand(0, 3),
    ];
    $job = json_encode($jobData, JSON_UNESCAPED_SLASHES);

    $message = new PhpAmqpLib\Message\AMQPMessage($job, [
        'delivery_mode' => 2, // Mesajı kalıcı yap
    ]);

    $channel->basic_publish($message, '', RABBITMQ_QUEUE);

    echo '[' . date("Y-m-d H:i:s") . '] Job created!' . PHP_EOL;
    sleep(1);
}

queue_declare çağrısındaki true parametresi kuyruğun durable olduğunu belirtir — RabbitMQ yeniden başlasa bile kuyruk varlığını korur. delivery_mode: 2 ise mesajın diske yazılmasını sağlar; hem broker hem kuyruk dayanıklı olmadan mesaj güvencesi olmaz. İkisini birlikte kullanmak önemli.

Producer’ı başlatmak için:

php producer.php

Consumer: Kuyruğu Dinlemek ve İşlemek

Consumer tarafı kuyruğu dinler ve her mesaj geldiğinde tanımladığınız callback’i çağırır:

<?php
require_once(__DIR__ . '/vendor/autoload.php');

const RABBITMQ_HOST = "localhost";
const RABBITMQ_PORT = 5672;
const RABBITMQ_USERNAME = "guest";
const RABBITMQ_PASSWORD = "guest";
const RABBITMQ_QUEUE = "taskQueue";

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD);

$channel = $connection->channel();

// Kuyruk yoksa oluştur!
$channel->queue_declare(RABBITMQ_QUEUE, false, true, false, false, false, null, null);

echo 'Mesajlar bekleniyor. Çıkmak için CTRL+C' . PHP_EOL;

$worker = function (object $message) {
    try {
        $job = json_decode($message->body, true);
        echo '[-] İşleniyor... #' . $job['id'] . PHP_EOL;
        sleep($job['period']);
        echo '[+] Bitti #' . $job['id'] . PHP_EOL;

        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    } catch (\Throwable $e) {
        echo '[x] Failed #' . $job['id'] . PHP_EOL;
        $message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag']);
    }
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume(RABBITMQ_QUEUE, '', false, false, false, false, $worker);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

basic_qos(null, 1, null) satırı kritik: consumer’a “bir önceki mesajı onaylamadan yeni mesaj alma” der. Bu olmadan RabbitMQ tüm bekleyen mesajları tek bir consumer’a yığar; birden fazla consumer çalışıyorsa yük dağılımı bozulur.

basic_ack ve basic_nack ise mesajın yazgısını belirler. ack ile “aldım ve işledim” dersiz, mesaj kuyruktan düşmez. nack ile geri iade edersiniz; yeniden kuyruğa alınıp alınmayacağı ikinci parametreyle (requeue) kontrol edilir.

Consumer’ı başlatmak için:

php consumer.php

Neye Dikkat Etmek Gerekiyor

Bu örnekte kasıtlı olarak basit tuttuğum birkaç nokta production’da önem kazanır:

Bağlantı yönetimi. AMQPStreamConnection her zaman açık bir TCP bağlantısı tutar. Uzun soluklu consumer süreçlerinde heartbeat ayarlarını AMQPStreamConnection’ın dördüncü parametresinden yapılandırmak gerekir; aksi hâlde broker zaman aşımına düşebilir.

Hata ve yeniden deneme. basic_nack ile mesajı geri göndermek sonsuz döngü yaratabilir — işlenemez bir mesaj sürekli kuyruğa döner. Dead letter exchange (DLX) kurarak belirli sayıda başarısız denemeden sonra mesajı ayrı bir kuyruğa yönlendirmek daha sağlam bir yaklaşım.

Birden fazla consumer. Aynı consumer.php’yi farklı terminal pencerelerinde çalıştırırsanız RabbitMQ iş yükünü aralarında dengeler. basic_qos ayarı yerindeyken her consumer aynı anda yalnızca bir mesaj tutar; ölçeklendirme bu kadar doğrusal.

Temel akışın PHP tarafındaki yansıması bundan ibaret. Gerçek projelerde bu kodu doğrudan yazmak yerine Laravel Queue veya Symfony Messenger’ın RabbitMQ driver’ını kullanırsınız; ama altta dönen mekanik aynı.

Etiketler: #PHP#RabbitMQ
Paylaş:

İlgili Yazılar

Sitede Ara

Yazı, proje ve sayfalarda arama yapmak için yazmaya başlayın.

Esc ile kapat Pagefind ile güçlendirildi