AMQP是类似于http的一种二进制协议,AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。本文介绍的是基于php的客户端。
1. RabbitMQ的安装
RabbitMQ 是一个AMQP的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。官方首页参考:http://www.rabbitmq.com/。
1. RabbitMQ Server安装
系统:ubuntu, 32位。本文介绍的基于网络安装。参考:http://www.rabbitmq.com/install-debian.html。安装命令如下
sudo deb http://www.rabbitmq.com/debian/ testing main
wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo apt-key add rabbitmq-signing-key-public.asc
sudo apt-get update
sudo apt-get install rabbitmq-server
2. RabbitMQ启动
命令:invoke-rc.d rabbitmq-server stop/start/etc.
可以使用rabbitmqctl命令来管理RabbitMQ Server。
例如,查看server状态:sudo rabbitmqctl status
停止server:rabbitmqctl stop
更多命令可参考:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html。
2. PHP AMQP扩展的安装
PHP AMQP扩展依赖于librabbitmq库,因此首先安装librabbitmq,安装方法参考https://github.com/alanxz/rabbitmq-c,具体步骤如下:
git clone git://github.com/alanxz/rabbitmq-c.gitcd rabbitmq-cgit submodule initgit submodule update
autoreconf –i
./configure
make
sudo make install
注:安装过程碰到的问题是,在使用autoreconf时,报错:Can’t exec “libtoolize”: No such file or directory at /usr/bin/autoreconf line 195。
解决方法:sudo apt-get install autotools-dev libltdl-dev libtool
此时,编译成功后,会生成amqp.so并安装到了合适的位置。此时我们修改php.ini, 加入extension=amqp.so就可以了。注意在ubuntu系统下,cli和apache的php.ini的位置是不同的,分别是:/etc/php5/apache2/php.ini和/etc/php5/cli/php.ini。
3. PHP AMQP扩展的使用举例
在RabbitMQ的官方网站上列出了5种消息处理的防止,分别是:Work queues, Publish/Subscribe, Routing, Topics,RPC。在PHP官方网站上的事例由于版本的更新,并不是最新的,并不准确,下面简单的举例说明PHP AMQP的使用。
1. 简介
send.php 代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
<?php if (!$argv[1]) { return ; } $queue = 'work_queue'; $exchange = 'exchange'; // 新建连接 $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel // 可以创建多个 $ch = new AMQPChannel($cnn); // Declare a new exchange // 相当于投递员,投递消息 $ex = new AMQPExchange($ch); $ex->setName($exchange); // $ex->setType(AMQP_EX_TYPE_FANOUT); $ex->setType(AMQP_EX_TYPE_DIRECT); //$ex->setFlags(AMQP_DURABLE); $ex->declare(); $q = new AMQPQueue($ch); $q->setName($queue); $q->declare(); $q->bind($exchange, $queue); // 投递消息都一个队列中,队列标识为routing.key // 把命令行中的参数作为消息投递 $res = $ex->publish($argv[1], $queue); if ($res) { echo 'message sent' . PHP_EOL; } else { echo 'message not sent' . PHP_EOL; } $cnn->close(); |
recive.php代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
<?php $queue = 'work_queue'; // Create a connection $cnn = new AMQPConnection(); $cnn->connect(); //Create a channel $ch = new AMQPChannel($cnn); // Create a new queue $q = new AMQPQueue($ch); $q->setName($queue); $q->declare(); // 死循环获取消息 while(true) { // msg 类型为 AMQPEnvelope $msg = $q->get(AMQP_NOPARAM); if ($msg) { echo 'recive msg: ' . $msg->getBody() . PHP_EOL; // 通知server已收到消息,server会认为消息投递成功,把消息 // 从消息队列中删除 // 用sudo rabbitmqctl list_queues 命令可查看队列中消息个数 // $q->ack($msg->getDeliveryTag()); } } ?> |
2. Work queues
send.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
<?php if (!$argv[1]) { return ; } $queue = 'work_queue2'; $exchange = 'exchange2'; // 新建连接 $cnn = new AMQPConnection(); $cnn->connect(); // Create a channel // 可以创建多个 $ch = new AMQPChannel($cnn); // Declare a new exchange // 相当于投递员,投递消息 $ex = new AMQPExchange($ch); $ex->setName($exchange); //$ex->setType(AMQP_EX_TYPE_FANOUT); $ex->setType(AMQP_EX_TYPE_DIRECT); //此选项防止rabbitmq退出时,丢失消息 $ex->setFlags(AMQP_DURABLE); $ex->declare(); $q = new AMQPQueue($ch); $q->setName($queue); $q->declare(); $q->bind($exchange, $queue); // 投递消息都一个队列中,队列标识为routing.key // 把命令行中的参数作为消息投递 $res = $ex->publish($argv[1], $queue); if ($res) { echo 'message sent' . PHP_EOL; } else { echo 'message not sent' . PHP_EOL; } |
recive.php:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
<?php $queue = 'work_queue2'; // Create a connection $cnn = new AMQPConnection(); $cnn->connect(); //Create a channel $ch = new AMQPChannel($cnn); //$ch->qos(5, 1); // Create a new queue $q = new AMQPQueue($ch); $q->setName($queue); $q->declare(); // 死循环获取消息 while(true) { // msg 类型为 AMQPEnvelope $msg = $q->get(AMQP_NOPARAM); if ($msg) { echo 'recive msg: ' . $msg->getBody() . PHP_EOL; // 通知server已收到消息,server会认为消息投递成功,把消息 // 从消息队列中删除 // 用sudo rabbitmqctl list_queues 命令可查看队列中消息个数 $q->ack($msg->getDeliveryTag()); } sleep(3); } ?> |