在PHP语言中如何使用RabbitMQ来实现消息的订阅和发布?
小标 2018-08-02 来源 : 阅读 2147 评论 0

摘要:本文主要向大家介绍了在PHP语言中如何使用RabbitMQ来实现消息的订阅和发布?通过具体的内容向大家展示,希望对大家学习php语言有所帮助。

本文主要向大家介绍了在PHP语言中如何使用RabbitMQ来实现消息的订阅和发布?通过具体的内容向大家展示,希望对大家学习php语言有所帮助。

本文将介绍在PHP中如何使用RabbitMQ来实现消息的订阅和发布。我使用的系统依然是Centos7,为了方便,应用服务器我使用Docker进行部署,容器环境:centos7+nginx+php5.6。

运行环境,安装AMQP扩展:

如何安装Docker我就不说了,网上很多教程非常简单,如果有现成的php环境可以直接使用。Docker中我使用的镜像名为webdevops/php-nginx,tag为:centos-7-php56。下载镜像:

(国际带宽出口不稳定,可能会下载失败,重试记次就好了)

docker pull webdevops/php-nginx:centos-7-php56  //下载镜像

docker run -d -p 80:80 --name rabbitmq webdevops/php-nginx:centos-7-php56  //运行容器

docker exec -ti rabbitmq /bin/bash  //进入容器

进入到容器后检测下环境是否有相应扩展

cd app

vi index.php

    

刚刚我们在运行容器的时候使用80端口,在浏览器中输入//ip

 

搜索下没有amqp相关的信息。下面开始安装amqp扩展。

yum install gcc librabbitmq-devel.x86_64 php56w-devel

 -y

wget //pecl.php.net/get/amqp-1.4.0.tgz

tar -zxvf amqp-1.4.0.tgz

cd amqp-1.4.0

phpize

./configure --with-amqp

make && make install

在php.ini中开启extension=amqp.so 接着重启php-fpm 或 Web服务器

vi /etc/php.ini

    extension=amqp.so

我这里就直接重启容器了,如果是宿主机直接安装php环境直接重启环境。

exit  //退出容器

docker restart rabbitmq  //重启容器

再查看phpinfo,amqp扩展已经安装好了:

 

publish发布消息

在/app路径下新建一个publish.php的文件

touch publish.php

vi publish.php

以下是PHP代码,我们先定义好用来发消息的交换机、队列、RoutingKey、消息等变量。

$queueName = 'superrd';

$exchangeName = 'superrd';

$routeKey = 'superrd';

$message = 'Hello World!';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));

$connection->connect() or die("Cannot connect to the broker!\n");

新建一个信道。

$channel = new AMQPChannel($connection);

新建一个交换机Exchange,并定义属性,第二章我们讲过有四种类型的交换机,这里使用直连型DIRECT。AMQP_DURABLE代表这是一个持久化的交换机,不会以为服务器异常等因素丢失。

$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->setFlags(AMQP_DURABLE);

$exchange->declareExchange();

新建一个队列Queue,前面也讲过生产者将消息发送到Exchange中,Exchange会根据绑定关系投递到队列,也就是如果生产者在生产消息时没有队列与之绑定消息就会丢失。为了保证系统更加健硕,一般无论是消息的生产者还是消费者都会新建一遍Exchange和Queue,新建后属性不会改变。同样AMQP_DURABLE代表这是一个持久化的队列,队列会被写入磁盘。需要注意的是虽然消息是缓存在队列中,但是并不是队列是持久化的队列队列中的消息就是持久化的,消息的持久化需要单独设置。

$queue = new AMQPQueue($channel);

$queue->setName($queueName);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

通过routeKey绑定交换机和队列。

$queue->bind($exchangeName, $routeKey);

好了,下面可以发送消息了

$exchange->publish($message,$routeKey);

如果你希望消息也是持久化的可以使用如下的代码,实际测试结果在持久化消息后消息发布的性能下降一倍,我的磁盘是pcie的固态硬盘,如果你是机械磁盘这个性能下降估计会更明显,24核心CPU,48GB内存,pcie固态硬盘,单线程的情况下每秒可以发布2.5万左右的非持久化消息,持久化之后变为变为1.2万左右。

$exchange->publish($message,$routeKey,AMQP_NOPARAM, array('delivery_mode'=>2));

断开连接。

$connection->disconnect();

同样在发布消息之后可以通过WEB工具来查看是否发布成功,

查看交换机多了一个superid交换机。

 

查看交换机已经有superrd队列。

 

点击队列查看队列详情。Bindings标签可以看到交换机和队列的绑定关系。

 

点击Get messages标签Get message(s)按钮可以看到队列中的消息。

 

到此说明我们已经将一个消息发布到了消息队列中。完整的PHP代码如下。

 '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));

$connection->connect() or die("Cannot connect to the broker!\n");

try {

    $channel = new AMQPChannel($connection);

    $exchange = new AMQPExchange($channel);

    $exchange->setName($exchangeName);

    $exchange->setType(AMQP_EX_TYPE_DIRECT);

    $exchange->setFlags(AMQP_DURABLE);

    $exchange->declareExchange();

 

    $queue = new AMQPQueue($channel);

    $queue->setName($queueName);

    $queue->setFlags(AMQP_DURABLE);

    $queue->declareQueue();

 

    $queue->bind($exchangeName, $routeKey);

 

    $exchange->publish($message,$routeKey);

 

    var_dump("[x] Sent 'Hello World!'");

 

} catch (AMQPConnectionException $e) {

    var_dump($e);

    exit();

}

 $connection->disconnect();

Subscribe订阅消息

在/app路径下新建一个subscribe.php的文件

touch subscribe.php

vi subscribe.php

以下是PHP代码,和发布消息一样我们先定义好用交换机、队列、RoutingKey等变量。

$queueName = 'superrd';

$exchangeName = 'superrd';

$routeKey = 'superrd';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));

$connection->connect() or die("Cannot connect to the broker!\n");

新建一个信道。

$channel = new AMQPChannel($connection);

与发布消息一样新建交换机。

$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->setFlags(AMQP_DURABLE);

$exchange->declareExchange();

新建一个队列Queue。

$queue = new AMQPQueue($channel);

$queue->setName($queueName);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

通过routeKey绑定交换机和队列。

$queue->bind($exchangeName, $routeKey);

重点来了,阻塞订阅消息。

//阻塞模式接收消息

 

echo "Message:\n";

while(True){

    $queue->consume('processMessage');

//自动ACK应答

    //$queue->consume('processMessage', AMQP_AUTOACK);

}

 

$conn->disconnect();

/*

* 消费回调函数

* 处理消息

*/

function processMessage($envelope, $q) {

    $msg = $envelope->getBody();

    echo $msg."\n"; //处理消息

    $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答

}

注意因为是阻塞监听,因为输出缓冲区的原因用浏览器访问该文件是看不到输出的。使用脚本访问。

php /app/subscribe.php

 

通过WEB工具查看队列。superrd队列中的消息数已经为0。

 

完整的PHP代码如下。

 '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));

$connection->connect() or die("Cannot connect to the broker!\n");

 

$channel = new AMQPChannel($connection);

$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);

$exchange->setType(AMQP_EX_TYPE_DIRECT);

$exchange->setFlags(AMQP_DURABLE);

$exchange->declareExchange();

 

$queue = new AMQPQueue($channel);

$queue->setName($queueName);

$queue->setFlags(AMQP_DURABLE);

$queue->declareQueue();

 

$queue->bind($exchangeName, $routeKey);

 

//阻塞模式接收消息

 

echo "Message:\n";

while(True){

        $queue->consume('processMessage');

//自动ACK应答

        //$queue->consume('processMessage', AMQP_AUTOACK);

}

 

$conn->disconnect();

 

/*

* 消费回调函数

* 处理消息

*/

function processMessage($envelope, $q) {

    $msg = $envelope->getBody();

    echo $msg."\n"; //处理消息

    $q->ack($envelope->getDeliveryTag()); //手动发送ACK应答

}

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标编程语言PHP频道!


本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 2 不喜欢 | 0
看完这篇文章有何感觉?已经有2人表态,100%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程