RabbitMQ–PHP应用

应用场景

使用rabbitMQ向不同的公司下的用户发送不同的短信,暂时才用了direct模式,通过不同的routing-key区别发送短信
复制代码

扩展安装

采用php-amqplib插件进行开发
安装步骤:
1、yum install librabbitmq -y
2、pecl install amqp
3、composer require php-amqplib/php-amqplib
复制代码

处理代码

RabbitMQService.php
<?php
namespace common\services\rabbitmq\impl;

use common\services\rabbitmq\RabbitMQService;
use common\services\rabbitmq\vo\RabbitMQConfigVo;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class RabbitMQServiceImpl implements RabbitMQService
{
    /**
     * @var
     */
    private $rabbitConfig;

    /**
     * @var
     */
    public static $connection;

    /**
     * @var AMQPChannel
     */
    public $channel;

    /**
     * @var string
     */
    public $queue;

    /**
     * @var string
     */
    public $type;

    /**
     * @var string
     */
    public $exchange;

    /**
     * @var string company-{$company_id}
     */
    public $routingKey;

    public function __construct()
    {
        $this->rabbitConfig = \Yii::$app->params['rabbitmq_config'];
        self::$connection = new AMQPStreamConnection(
            $this->rabbitConfig['host'],
            $this->rabbitConfig['port'],
            $this->rabbitConfig['user'],
            $this->rabbitConfig['password']
        );
        $this->channel = self::$connection->channel();
    }

    /**
     * @param $callBack
     * @return AMQPChannel
     * @throws \ErrorException
     */
    public function setConsumerChannel($callBack = []): AMQPChannel
    {
        $this->channel->exchange_declare($this->exchange, $this->type, false, false, false);
        list($queueName, , ) =  $this->channel->queue_declare('', false, false, false);
        $this->channel->queue_bind($queueName, $this->exchange, $this->routingKey);
        $this->channel->basic_qos(null, 1, null);
        $this->channel->basic_consume($queueName, '', false, false, false, false, $callBack);
        while ($this->channel->is_consuming()) { // 循环获取消息
            $this->channel->wait();
        }
        return $this->channel;
    }

    /**
     * @return AMQPChannel
     */
    public function setProducerChannel(): AMQPChannel
    {
        $this->channel->exchange_declare($this->exchange, $this->type, false, false, false); // 声明一个交换机
        $this->channel->queue_declare($this->queue, false, true, false, false); // 试探性声明一个队列/
        $this->channel->queue_bind($this->queue, $this->exchange, $this->routingKey); // 队列绑定交换器
        return $this->channel;
    }

    /**
     * @param string $type
     * @return $this
     */
    public function setType(string $type): self
    {
        $this->type = $type;
        return $this;
    }

    /**
     * @param string $exchange
     * @return $this
     */
    public function setExchange(string $exchange): self
    {
        $this->exchange = $exchange;
        return $this;
    }

    /**
     * @param string $queue
     * @return $this
     */
    public function setQueue(string $queue): self
    {
        $this->queue = $queue;
        return $this;
    }

    public function setRoutingKey(int $companyId): self
    {
        $this->routingKey = 'company-' . $companyId;
        return $this;
    }

    /**
     * @return mixed|void
     * @throws \Exception
     */
    public function close()
    {
        $this->channel->close();
        self::$connection->close();
    }

    /**
     * @param string $data
     * @param string $routingKey
     * @param array $options
     */
    public function publish(string $data, string $routingKey, array $options = []): void
    {
        if (empty($options)) {
            $options = ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
        }
        $message = new AMQPMessage($data, $options);
        $this->channel->basic_publish($message, $this->exchange, $routingKey);
    }
}

复制代码
producer.php

$rabbitMqService = \Yii::createObject(RabbitMQService::class);
$rabbitMqService->setQueue(RabbitMQConstCode::SMS_QUEUE)
    ->setType(AMQPExchangeType::DIRECT)
    ->setExchange(RabbitMQConstCode::SMS_EXCHANGE)
    ->setProducerChannel();
for ($i = 0; $i < 100000; $i++) {
    $routes = ['dota', 'csgo', 'lol'];
    $key = array_rand($routes);
    $arr = [
        'match_id' => $i,
        'status' => rand(0,3)
    ];
    $rabbitMqService->publish(json_encode($arr, JSON_UNESCAPED_UNICODE), $routes[$key]);
    echo '发送 '.$routes[$key].' 消息: ' . $data . PHP_EOL;
}
复制代码
consumer.php
rabbitMqService = \Yii::createObject(RabbitMQService::class);
$callback = function ($msg) { // 回调函数
    $queue = $msg->body;
    Log::info('send sms info ' . $queue);
    echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    // 处理代码 $this->handleSms($queue);
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$rabbitMqService->setExchange(RabbitMQConstCode::SMS_EXCHANGE)
    ->setQueue($queue)
    ->setType(AMQPExchangeType::DIRECT)
    ->setRoutingKey($company_id)
    ->setConsumerChannel($callback);

复制代码

函数参数说明

name:         $queue // should be unique in fanout exchange. [队列名称] 
passive:      false // don't check if a queue with the same name exists [是否检测同名队列]
durable:      false // the queue will not survive server restarts [是否开启队列持久化] 
exclusive:    false // the queue might be accessed by other channels [队列是否可以被其他队列访问] 
auto_delete:  true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]

name:         $exchange [交换机名称] 
type:         direct [路由类型] 
passive:      false [] 
durable:      true [交换机是否开启持久化] 
auto_delete:  false //the exchange won't be deleted once the channel is closed.
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享