应用场景
使用rabbitMQ向不同的公司下的用户发送不同的短信,暂时才用了direct模式,通过不同的routing-key区别发送短信
复制代码
扩展安装
采用php-amqplib插件进行开发
安装步骤:
1、yum install librabbitmq -y
2、pecl install amqp
3、composer require php-amqplib/php-amqplib
复制代码
处理代码
RabbitMQService.php
<?php
namespace common\services\rabbitmq\impl;
use common\services\rabbitmq\RabbitMQService;
use common\services\rabbitmq\vo\RabbitMQConfigVo;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class RabbitMQServiceImpl implements RabbitMQService
{
/**
* @var
*/
private $rabbitConfig;
/**
* @var
*/
public static $connection;
/**
* @var AMQPChannel
*/
public $channel;
/**
* @var string
*/
public $queue;
/**
* @var string
*/
public $type;
/**
* @var string
*/
public $exchange;
/**
* @var string company-{$company_id}
*/
public $routingKey;
public function __construct()
{
$this->rabbitConfig = \Yii::$app->params['rabbitmq_config'];
self::$connection = new AMQPStreamConnection(
$this->rabbitConfig['host'],
$this->rabbitConfig['port'],
$this->rabbitConfig['user'],
$this->rabbitConfig['password']
);
$this->channel = self::$connection->channel();
}
/**
* @param $callBack
* @return AMQPChannel
* @throws \ErrorException
*/
public function setConsumerChannel($callBack = []): AMQPChannel
{
$this->channel->exchange_declare($this->exchange, $this->type, false, false, false);
list($queueName, , ) = $this->channel->queue_declare('', false, false, false);
$this->channel->queue_bind($queueName, $this->exchange, $this->routingKey);
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($queueName, '', false, false, false, false, $callBack);
while ($this->channel->is_consuming()) { // 循环获取消息
$this->channel->wait();
}
return $this->channel;
}
/**
* @return AMQPChannel
*/
public function setProducerChannel(): AMQPChannel
{
$this->channel->exchange_declare($this->exchange, $this->type, false, false, false); // 声明一个交换机
$this->channel->queue_declare($this->queue, false, true, false, false); // 试探性声明一个队列/
$this->channel->queue_bind($this->queue, $this->exchange, $this->routingKey); // 队列绑定交换器
return $this->channel;
}
/**
* @param string $type
* @return $this
*/
public function setType(string $type): self
{
$this->type = $type;
return $this;
}
/**
* @param string $exchange
* @return $this
*/
public function setExchange(string $exchange): self
{
$this->exchange = $exchange;
return $this;
}
/**
* @param string $queue
* @return $this
*/
public function setQueue(string $queue): self
{
$this->queue = $queue;
return $this;
}
public function setRoutingKey(int $companyId): self
{
$this->routingKey = 'company-' . $companyId;
return $this;
}
/**
* @return mixed|void
* @throws \Exception
*/
public function close()
{
$this->channel->close();
self::$connection->close();
}
/**
* @param string $data
* @param string $routingKey
* @param array $options
*/
public function publish(string $data, string $routingKey, array $options = []): void
{
if (empty($options)) {
$options = ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT];
}
$message = new AMQPMessage($data, $options);
$this->channel->basic_publish($message, $this->exchange, $routingKey);
}
}
复制代码
producer.php
$rabbitMqService = \Yii::createObject(RabbitMQService::class);
$rabbitMqService->setQueue(RabbitMQConstCode::SMS_QUEUE)
->setType(AMQPExchangeType::DIRECT)
->setExchange(RabbitMQConstCode::SMS_EXCHANGE)
->setProducerChannel();
for ($i = 0; $i < 100000; $i++) {
$routes = ['dota', 'csgo', 'lol'];
$key = array_rand($routes);
$arr = [
'match_id' => $i,
'status' => rand(0,3)
];
$rabbitMqService->publish(json_encode($arr, JSON_UNESCAPED_UNICODE), $routes[$key]);
echo '发送 '.$routes[$key].' 消息: ' . $data . PHP_EOL;
}
复制代码
consumer.php
rabbitMqService = \Yii::createObject(RabbitMQService::class);
$callback = function ($msg) { // 回调函数
$queue = $msg->body;
Log::info('send sms info ' . $queue);
echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
// 处理代码 $this->handleSms($queue);
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$rabbitMqService->setExchange(RabbitMQConstCode::SMS_EXCHANGE)
->setQueue($queue)
->setType(AMQPExchangeType::DIRECT)
->setRoutingKey($company_id)
->setConsumerChannel($callback);
复制代码
函数参数说明
name: $queue // should be unique in fanout exchange. [队列名称]
passive: false // don't check if a queue with the same name exists [是否检测同名队列]
durable: false // the queue will not survive server restarts [是否开启队列持久化]
exclusive: false // the queue might be accessed by other channels [队列是否可以被其他队列访问]
auto_delete: true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]
name: $exchange [交换机名称]
type: direct [路由类型]
passive: false []
durable: true [交换机是否开启持久化]
auto_delete: false //the exchange won't be deleted once the channel is closed.
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END