RabbitMQ简单介绍和使用

最近项目中使用到了消息队列,在此总结一下学习和使用过程中的心得,同时针对过程中踩过的坑和需要注意的地方做一下记录

AMQP

在说RabbitMQ之前,需要先了解一下AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。


同时PHP也有对应的扩展 来帮助我们快速的使用 PHP AMQP扩展官方文档
扩展提供的方法可以分为五大类

  • AMQPConnection
  • AMQPChannel
  • AMQPExchange
  • AMQPQueue
  • AMQPEnvelope

贴一下这几个名词的概念:

  • Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Connection: publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
  • Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
  • Exchange: message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
  • Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

RabbitMQ 使用

通常,我们像这样来建立连接
<?php 

// 建立连接
$connection = new AMQPConnection();
$connection->setHost($config['host']);
$connection->setVhost($config['vhost']);
$connection->setPort($config['port']);
$connection->setLogin($config['login']);
$connection->setPassword($config['password']);
$connection->setReadTimeout($config['read_timeout']);
$connection->connect();

$channel = new AMQPChannel($connection);

// 生成交换机实例
$exchange_instance = new AMQPExchange($channel);
$exchange_instance->setType($option['exchange_type']);
$exchange_instance->setName($option['exchange_name']);
if(!empty($option['flags'])) {
    $exchange_instance->setFlags($option['flags']);
}

// 生成queue实例
$queue_instance = new AMQPQueue($channel);
$queue_instance->setName($queue_name);
if(!empty($option['flags'])) {
    $queue_instance->setFlags($option['flags']);
}

// 绑定
foreach ($routing_keys as $k => $routing_key) {
    $queue_instance->bind($exchange_instance->getName(), $routing_key);
}

然后就可以去发布\消费消息了
// 发布一条消息
// $msg 消息内容 一般使用json字符串
// $routing_key 路由信息 与刚才绑定的对应
// $attributes 其他属性 可以类比为http的头部信息
$exchange_instance->publish($msg, $routing_key, AMQP_NOPARAM, $attributes);

// 消费一条消息
// $callback_func 回调方法名 拉取到消息后 会交给这个方法处理
// 这个方法必须发送应答 队列才会认为这条消息被正确的消费
$queue_instance->consume($callback_func);

// 假设 $callback_func=consume 则大致实现如下
function consume(AMQPEnvelope $message, AMQPQueue $q) {
    $encode_msg = $message->getBody();
    $data = json_decode($encode_msg, true);
    // do something..
    // 如果是定时脚本拉取队列的消息 最好不要在这里写特别复杂\耗时的逻辑
    // 否则脚本会占用大量的内存 而且一旦消息过多短时间内很难处理完成
    // 最好的方案是写入handle表 由另一个脚本负责处理拉回来的消息

    // 发送应答 这条消息将从队列中删除
    $q->ack($message->getDeliveryTag());

    // 另外还有一个方法是nack 即not ack
    // 关于他们的区别 请参考 https://stackoverflow.com/questions/28794123/ack-or-nack-in-rabbitmq
    // $q->nack($message->getDeliveryTag());
}



关于RabbitMQ集群以及高可用,参考我司大佬的文章
https://yuerblog.cc/2018/04/27/rabbitmq3-7-ha-cluster/

发表评论

邮箱地址不会被公开。 必填项已用*标注