当前位置:首页 > MQ > 正文内容

RabbitMQ的PHP实现代码

phpmianshi6年前 (2015-04-26)MQ332
接下来看一下创建队列及接收消息的TP5.1示例
需要提前启动消费者,绑定交换机与队列,并指定路由键
1.测试多消费者监听同一个事件,打开2个消费者,启动生产者,发现2个消费者可以同时收到消息
2.测试消费者挂掉时,消息是否丢失,停掉一个消费者1,启动生产者,再启动消费者1,发现还可以读取到数据,说明数据没有丢失
/**
* 生产者
* @param
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_send
*/
public static function send(){
Log::record("send start");
Log::save();
$ex_name = 'amq.direct';
$routing_key = 'rk';
$queue = 'q'; //队列需要提前建好
$message = array();
$message['order_id'] = 12345;

$host = '172.16.0.133';
$port = 5672;
$vhost = "vtest";
$login = "test";
$pass = "test123";
$body = json_encode($message);
try {
$connection = new \AMQPConnection([
'host' => $host,
'port' => $port,
'vhost' => $vhost,
'login' => $login,
'password' => $pass,
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
//发送消息可以不指定队列,不指定队列时,如果没有绑定的队列,消息会丢失,所以需要提前有绑定的队列(队列创建以后,消费者提前启动起来,否则消息会丢失),这种情况所有绑定的队列的消费者都能接收到消息
$rs = $exchange->publish($body, $routing_key);
Log::record("send params=".$body." result=".$rs);
Log::save();
} catch (\AMQPConnectionException $e) {
Log::record($e->getMessage);
Log::save();
}
$connection->disconnect();
}
/**
* 消费者测试1
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_rev1
*/
public static function rev1(){
self::rev("q1");
}
/**
* 消费者测试2
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_rev2
*/
public static function rev2(){
self::rev("q2");
}
public static function rev3(){
self::rev("q3");
}
/**
* 消费者 提前建好q1 q2 2个队列
* @param $queue 队列名字
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_send
*/
private static function rev($queue){
Log::record("rev start");
Log::save();
$ex_name = 'amq.direct';
$routing_key = 'rk';

$host = '172.16.0.133';
$port = 5672;
$vhost = "vtest";
$login = "test";
$pass = "test123";

try {
$connection = new \AMQPConnection([
'host' => $host,
'port' => $port,
'vhost' => $vhost,
'login' => $login,
'password' => $pass,
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
$q = new \AMQPQueue($channel);
//设置队列名字 如果不存在则添加
$q->setName($queue);
// AMQP_DURABLE | AMQP_AUTODELETE
$q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
//绑定交换机与队列,并指定路由键
$q->bind($ex_name, $routing_key);

Log::record($queue." waiting for msg...");
Log::save();

//阻塞模式接收消息 AMQP_NOPARAM
$q->consume(function($envelope, $queue){
Log::record("processMessage start");
Log::save();
$msg = $envelope->getBody();
Log::record("processMessage body=".$msg);
Log::save();
//        $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
return true;
}, AMQP_AUTOACK); //自动ACK应答
} catch (\AMQPConnectionException $e) {
Log::record($e->getMessage);
Log::save();
}
$connection->disconnect();
}
接下来看一下创建队列及接收消息的Laravel示例
创建队列发送消息:
public function mq_send(){
$ex_name = 'amq.direct';
$routing_key = 'EOSETH';
$queue = 'tEOSETH';
$message = array();
$message['order_id'] = 12345;
$body = json_encode($message);
try {
$connection = new \AMQPConnection([
'host' => env("RABBIT_HOST"),
'port' => env("RABBIT_PORT"),
'vhost' => env("RABBIT_VHOST"),
'login' => env("RABBIT_LOGIN"),
'password' => env("RABBIT_PASS"),
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
            $exchange->setName($ex_name);
            $exchange->setType(AMQP_EX_TYPE_DIRECT);
            $exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
//发送消息可以不指定队列,但是指定队列可以防止队列不存在时创建一个
  $q = new \AMQPQueue($channel);
            //设置队列名字 如果不存在则添加
            $q->setName($queue);
            // AMQP_DURABLE | AMQP_AUTODELETE
            $q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
            //绑定交换机与队列,并指定路由键
            $q->bind($ex_name, $routing_key);
            $rs = $exchange->publish($body, $routing_key);
logger("put_order params=".$body." result=".$rs);
} catch (\AMQPConnectionException $e) {
logger($e->getMessage());
}
$connection->disconnect();
}
创建队列消费者:
public function mq_rev(){
$ex_name = 'amq.direct';
$routing_key = 'EOSETH';
$queue = 'tEOSETH';
try {
$connection = new \AMQPConnection([
'host' => env("RABBIT_HOST"),
'port' => env("RABBIT_PORT"),
'vhost' => env("RABBIT_VHOST"),
'login' => env("RABBIT_LOGIN"),
'password' => env("RABBIT_PASS"),
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
$q = new \AMQPQueue($channel);
//设置队列名字 如果不存在则添加
$q->setName($queue);
// AMQP_DURABLE | AMQP_AUTODELETE
$q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
//绑定交换机与队列,并指定路由键
$q->bind($ex_name, $routing_key);

var_dump("Waiting for message…");

//            while(TRUE) {
//阻塞模式接收消息 AMQP_NOPARAM
//                $q->consume(array($this,'processMessage'));
$q->consume(array($this,'processMessage'), AMQP_AUTOACK); //自动ACK应答
//            }
} catch (\AMQPConnectionException $e) {
logger($e->getMessage());
}
$connection->disconnect();
}

function processMessage($envelope, $queue) {
logger("processMessage start");
$msg = $envelope->getBody();
logger("processMessage body=".$msg);
//        $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
return true;
}
交换机既可以由消息发送端创建,也可以由消息消费者创建。
创建一个队列后,需要将队列绑定到交换机上,队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。
消息的处理,是有两种方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。
关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,'myfunc') ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。
在上述示例中,使用的$routingkey = '', 意味着接收全部的消息。我们可以将其改为 $routingkey = 'key_1',可以看到结果中仅有设置routingkey为key_1的内容了。
注意: routingkey = 'key_1' 与 routingkey = 'key_2' 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = '' 是另类,client_all绑定到 '' 上,将消息全都处理后,client1和client2上也就没有消息了。
在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。


版权声明:本文由PHP面试资料网发布,如需转载请注明出处。
分享给朋友:

相关文章

RabbitMQ - 架构及工作原理

RabbitMQ - 架构及工作原理

1. 系统架构从示意图可以看出消息生产者并没有直接将消息发送给消息队列,而是通过建立与Exchange的Channel,将消息发送给Exchange,Exchange根据规则,将消息转发给指定的消息队...

Rabbitmq官方的六种工作模式

Rabbitmq官方的六种工作模式

RabbitMQ的六种工作模式:官网介绍:https://www.rabbitmq.com/getstarted.html这里简单介绍下六种工作模式的主要特点:简单模式:一个生产者,一个消费者work...

kafka和rabbitmq什么区别,各自适合什么场景?

RabbitMQ1.RabbitMQ遵循AMQP协议,由内在高并发的erlanng语言开发,用在实时的对可靠性要求比较高的消息传递上,适合企业级的消息发送订阅。2.RabbitMQ的broker由Ex...

 中间件RabbitMQ在生产环境占用CPU过多的情况

中间件RabbitMQ在生产环境占用CPU过多的情况

我们项目中用的消息中间件是RabbitMQ,这个消息中间件在使用起来停方便的,也比较健壮,但是使用不当,会对服务器造成很大的压力,会把CPU占用比占到70%左右,今天就来分析一下造成这个结果的原因。要...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。