首页 > MQ > RabbitMQ的PHP实现代码
2015
04-26

RabbitMQ的PHP实现代码

接下来看一下创建队列及接收消息的TP5.1示例
需要提前启动消费者,绑定交换机与队列,并指定路由键
1.测试多消费者监听同一个事件,打开2个消费者,启动生产者,发现2个消费者可以同时收到消息
2.测试消费者挂掉时,消息是否丢失,停掉一个消费者1,启动生产者,再启动消费者1,发现还可以读取到数据,说明数据没有丢失
/**
* 生产者
* @param
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_send
*/
public static function send(){
Log::record("send start");
Log::save();
$ex_name = 'amq.direct';
$routing_key = 'rk';
$queue = 'q'; //队列需要提前建好
$message = array();
$message['order_id'] = 12345;

$host = '172.16.0.133';
$port = 5672;
$vhost = "vtest";
$login = "test";
$pass = "test123";
$body = json_encode($message);
try {
$connection = new \AMQPConnection([
'host' => $host,
'port' => $port,
'vhost' => $vhost,
'login' => $login,
'password' => $pass,
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
//发送消息可以不指定队列,不指定队列时,如果没有绑定的队列,消息会丢失,所以需要提前有绑定的队列(队列创建以后,消费者提前启动起来,否则消息会丢失),这种情况所有绑定的队列的消费者都能接收到消息
$rs = $exchange->publish($body, $routing_key);
Log::record("send params=".$body." result=".$rs);
Log::save();
} catch (\AMQPConnectionException $e) {
Log::record($e->getMessage);
Log::save();
}
$connection->disconnect();
}
/**
* 消费者测试1
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_rev1
*/
public static function rev1(){
self::rev("q1");
}
/**
* 消费者测试2
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_rev2
*/
public static function rev2(){
self::rev("q2");
}
public static function rev3(){
self::rev("q3");
}
/**
* 消费者 提前建好q1 q2 2个队列
* @param $queue 队列名字
* @return true
* @throws \Exception
* @use php think call CrazyBoxService_send
*/
private static function rev($queue){
Log::record("rev start");
Log::save();
$ex_name = 'amq.direct';
$routing_key = 'rk';

$host = '172.16.0.133';
$port = 5672;
$vhost = "vtest";
$login = "test";
$pass = "test123";

try {
$connection = new \AMQPConnection([
'host' => $host,
'port' => $port,
'vhost' => $vhost,
'login' => $login,
'password' => $pass,
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
$q = new \AMQPQueue($channel);
//设置队列名字 如果不存在则添加
$q->setName($queue);
// AMQP_DURABLE | AMQP_AUTODELETE
$q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
//绑定交换机与队列,并指定路由键
$q->bind($ex_name, $routing_key);

Log::record($queue." waiting for msg...");
Log::save();

//阻塞模式接收消息 AMQP_NOPARAM
$q->consume(function($envelope, $queue){
Log::record("processMessage start");
Log::save();
$msg = $envelope->getBody();
Log::record("processMessage body=".$msg);
Log::save();
//        $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
return true;
}, AMQP_AUTOACK); //自动ACK应答
} catch (\AMQPConnectionException $e) {
Log::record($e->getMessage);
Log::save();
}
$connection->disconnect();
}
接下来看一下创建队列及接收消息的Laravel示例
创建队列发送消息:
public function mq_send(){
$ex_name = 'amq.direct';
$routing_key = 'EOSETH';
$queue = 'tEOSETH';
$message = array();
$message['order_id'] = 12345;
$body = json_encode($message);
try {
$connection = new \AMQPConnection([
'host' => env("RABBIT_HOST"),
'port' => env("RABBIT_PORT"),
'vhost' => env("RABBIT_VHOST"),
'login' => env("RABBIT_LOGIN"),
'password' => env("RABBIT_PASS"),
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
            $exchange->setName($ex_name);
            $exchange->setType(AMQP_EX_TYPE_DIRECT);
            $exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
//发送消息可以不指定队列,但是指定队列可以防止队列不存在时创建一个
  $q = new \AMQPQueue($channel);
            //设置队列名字 如果不存在则添加
            $q->setName($queue);
            // AMQP_DURABLE | AMQP_AUTODELETE
            $q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
            //绑定交换机与队列,并指定路由键
            $q->bind($ex_name, $routing_key);
            $rs = $exchange->publish($body, $routing_key);
logger("put_order params=".$body." result=".$rs);
} catch (\AMQPConnectionException $e) {
logger($e->getMessage());
}
$connection->disconnect();
}
创建队列消费者:
public function mq_rev(){
$ex_name = 'amq.direct';
$routing_key = 'EOSETH';
$queue = 'tEOSETH';
try {
$connection = new \AMQPConnection([
'host' => env("RABBIT_HOST"),
'port' => env("RABBIT_PORT"),
'vhost' => env("RABBIT_VHOST"),
'login' => env("RABBIT_LOGIN"),
'password' => env("RABBIT_PASS"),
]);
$connection->connect();
$channel = new \AMQPChannel($connection);
$exchange = new \AMQPExchange($channel);
$exchange->setName($ex_name);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE); //持久化
$exchange->declareExchange();
$q = new \AMQPQueue($channel);
//设置队列名字 如果不存在则添加
$q->setName($queue);
// AMQP_DURABLE | AMQP_AUTODELETE
$q->setFlags(AMQP_DURABLE);
//            echo 'Message Total: ' . $q->declareQueue() . "\r\n";
//绑定交换机与队列,并指定路由键
$q->bind($ex_name, $routing_key);

var_dump("Waiting for message…");

//            while(TRUE) {
//阻塞模式接收消息 AMQP_NOPARAM
//                $q->consume(array($this,'processMessage'));
$q->consume(array($this,'processMessage'), AMQP_AUTOACK); //自动ACK应答
//            }
} catch (\AMQPConnectionException $e) {
logger($e->getMessage());
}
$connection->disconnect();
}

function processMessage($envelope, $queue) {
logger("processMessage start");
$msg = $envelope->getBody();
logger("processMessage body=".$msg);
//        $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
return true;
}
交换机既可以由消息发送端创建,也可以由消息消费者创建。
创建一个队列后,需要将队列绑定到交换机上,队列才能工作,routingkey也是在这里指定的。有的资料上写成bindingkey,其实一回事儿,弄两个名词反倒容易混淆。
消息的处理,是有两种方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都会立即返回,一般情况下使用轮询处理消息队列就要用这种方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序会进入持续侦听状态,每收到一个消息就会调用callback指定的函数一次,直到某个callback函数返回FALSE才结束。
关于callback,这里多说几句: PHP的call_back是支持使用数组的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,'myfunc') ) 这样就可以调用自己写的处理类。MyClass中myfunc的参数定义,与上例中processMessage一样就行。
在上述示例中,使用的$routingkey = '', 意味着接收全部的消息。我们可以将其改为 $routingkey = 'key_1',可以看到结果中仅有设置routingkey为key_1的内容了。
注意: routingkey = 'key_1' 与 routingkey = 'key_2' 是两个不同的队列。假设: client1 与 client2 都连接到 key_1 的队列上,一个消息被client1处理之后,就不会被client2处理。而 routingkey = '' 是另类,client_all绑定到 '' 上,将消息全都处理后,client1和client2上也就没有消息了。
在程序设计上,需要规划好exchange的名称,以及如何使用key区分开不同类型的标记,在消息产生的地方插入发送消息代码。后端处理,可以针对每一个key启动一个或多个client,以提高消息处理的实时性。


本文》有 0 条评论

留下一个回复