最适合入门的RabbitMQ+PHP教程(七)之RPC!

技术分享 2019年04月07日 阅读 450 豆浆大叔
上一章我们讲解了最适合入门的RabbitMQ+PHP教程(六)Topic exchange(主题交换机)使用,我们通过实际的执行代码,可以很清晰的了解整体的匹配规则以及操作流程。那么这一章节讲解的是rabbitmq的RPC模式。

前提条件

RabbitMQ 已在标准端口(5672)上的localhost上安装并运行。如果您使用不同的主机,端口或凭据,则需要调整连接设置。如果您再本教程中遇到问题,可以随时通过邮件来联系我们。

RPC 定义

要使用RPC那么我们就要搞清楚这个RPC到底是个什么东西,原理是什么?(RPC) Remote Procedure Call Protocol 远程过程调用协议,一般公稍微大一些的公司都是一个项目有多个系统构成,比如电商中的库存系统,商品系统,订单系统等等,不同的项目开发组维护不同的系统,每个系统有运行在不同的机器上。但是往往机器之间需要互相调用一些数据,由于不在同一台机器可以直接调用,所以需要通过网络来表达调用的语义和传达调用的数据。那么现在RPC的协议很多,Java RMI , WebApi等等。

官方对于PRC的说明

尽管RPC在计算中是一种非常常见的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是慢的RPC时,会出现问题。这样的混淆导致系统不可预测,并增加了调试的不必要的复杂性。错误使用RPC可以导致不可维护的意大利面条代码,而不是简化软件。
考虑到这一点,请考虑以下建议:
1.确保明显哪个函数调用是本地的,哪个是远程的。
2.记录您的系统。使组件之间的依赖关系变得清晰。
3.处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应?
4.如有疑问,请避免使用RPC。如果可以,您应该使用异步管道 - 而不是类似RPC的阻塞,将结果异步推送到下一个计算阶段。

功能说明

本文使用RabbitMQ实现RPC的调用方式,我们需要使用回调队列(Callback queue)。
(1)Callback queue,通过Rabbitmq进行rpc很简单,客户端发送消息,服务端响应消息,但是为了接受响应,我们需要发送带有请求的回调队列地址。

(2) Message attribute

AMQP 0-9-1协议预定义了一组带有消息的14个属性。大多数属性很少使用,但以下情况除外:
1.delivery_mode:标记消息传递模式,2-消息持久化,其他值-瞬态。
2.content_type:内容类型,用于描述编码的mime-type. 例如经常为该属性设置JSON编码。
3.reply_to:应答,通用的回调队列名称,
4.correlation_id:关联ID,方便RPC相应与请求关联。

(3) RPC处理流程

  1. RPC客户端启动后,创建一个匿名、独占的、回调的队列
  2. RPC客户端设置消息的2个属性:replyTo(回调队列名字)和correlationId(标记请求ID),然后将消息发送到队列rpc_queue

3.请求被发送到rpc_queue队列中

  1. RPC服务监听队列rpc_queue队列中的消息请求。rpc服务器端处理之后将结果封装成消息发送到replyTo指定的回调队列中,并且此消息带上correlationId
  2. RPC客户端在队列replyTo上监听消息,当收到消息后,它会判断收到消息的correlationId。如果值和自己之前发送的一样,则这个值就是RPC的处理结果

(4) Correlation Id ???
若果按照正常的来说我们为每一个RPC创建一个回调队列的话,这个效率是非常低效的。那么我们可以选择为每一个客户端创建一个回调队列。但是如果队列收到一条回复消息,那么却不不清楚响应属于哪个请求来源,这是就需要使用correlationId属性了。我们要为每个请求设置唯一的值。然后,在回调队列中获取消息,查看这个属性,关联response和request就是基于这个属性值的。如果我们看到一个未知的correlationId属性值的消息,可以放心的无视它——它不是我们发送的请求。你可能问道,为什么要忽略回调队列中未知的信息,而不是当作一个失败?这是由于在服务器端竞争条件的导致的。虽然不太可能,但是如果RPC服务器在发送给我们结果后,发送请求反馈前就挂掉了,这有可能会发送未知correlationId属性值的消息。如果发生了这种情况,重启RPC服务器将会重新处理该请求。这就是为什么在客户端必须很好的处理重复响应,RPC应该是幂等的。

RPC client code (客户端代码)

主要业务逻辑如下:

  1. 配置连接工厂
  2. 建立TCP连接
  3. 在TCP连接的基础上创建通道
  4. 定义临时队列replyQueueName,声明唯一标志本次请求corrId,并将replyQueueName和corrId配置要发送的消息队列中
  5. 使用默认的交换机发送消息到队列rpc_queue中
  6. 使用阻塞队列BlockingQueue阻塞当前进程
  7. 收到请求后,将请求放入BlockingQueue中,主线程被唤醒,打印返回内容
 public function call($body = null,$name = null)
{
    if (!$body || !$name) return false;
    $this->response = null;
    // 随机一个correlation_id码
    $this->corr_id = uniqid();
    //replyTo指定队列callback_queue并携带correlationId校验码(姑且这么说)
    $params = ['correlation_id' => $this->corr_id,'reply_to' => $this->callback_queue];
    $message = new AMQPMessage((string) $body,$params);
    $this->amqp_channel->basic_publish($message, '', $name);
    while (!$this->response) {
        //等待处理
        $this->amqp_channel->wait();
    }
    return intval($this->response);
}

完整代码
RPC客户端代码:https://www.phpassn.com/article/108.html

RPC Service code (服务端代码)

  1. 配置连接工厂
  2. 建立TCP连接
  3. 在TCP连接的基础上创建通道
  4. 声明一个rpc_queue队列
  5. 设置同时最多只能获取一个消息
  6. 在rpc_queue队列在等待消息
  7. 收到消息后,调用回调对象对消息进行处理,向此消息的replyTo队列中发送处理并带上correlationId
  8. 使用wait-notify实现主线程和消息处理回调对象进行同步
 public function basic_consume($queue_name = null)
{
    if(!$queue_name) return false;
    //回调函数
    echo " [x] Awaiting RPC requests\n";
    $callback = function ($response) {
        $n = intval($response->body);

        echo ' [.] 我是回调的数据哦:(', $response->body, ")\n";

        $params = ['correlation_id' => $response->get('correlation_id')];

        $msg = new AMQPMessage((string) $this->fib($n), $params);

        $response->delivery_info['channel']->basic_publish( $msg,'',$response->get('reply_to'));

        $response->delivery_info['channel']->basic_ack($response->delivery_info['delivery_tag']);
    };
    // $prefetch_size 所取大小  $prefetch_count 获取数量  $a_global全局
    $this->amqp_channel->basic_qos(null, 1, null);

    $this->amqp_channel->basic_consume($queue_name, '', false, false, false, false, $callback);
    while (count($this->amqp_channel->callbacks)) {
        $this->amqp_channel->wait();
    }
}

完整代码
RPC服务端代码: https://www.phpassn.com/article/108.html

测试结果

[RpcServer] Awaiting RPC requests
[RpcClient] Requesting55
[RpcServer receive] [.] 我是回调的数据哦:(10)

总结:RPC的远程过程调用到这里已经结束,应该算是比较清晰一点了,说明白了其实就是客户端与服务端之间的异步交互,通过识别码回调给客户端时,创建一个匿名独占队列,通过这个队列把数据传给客户端。下一章节rabbitmq延时队列实现

github链接:https://github.com/orchid-lyy/rabbitmq-study

豆浆大叔 豆浆大叔 资深PHP工程师@某一线大厂

写了 264486 字,被 3 人关注,共写了 75 篇笔记

(已更名,豆浆大叔)有理想的码农,不应该只探究人性的懒惰面,而是积极的去探索人生道路上的荆棘坎坷,努力提升自己完善自己!
推荐文章:
  • 大数据领域Flink 与 Spark之间的区别?

    学而不思则罔 思而不学则殆,2020年砥砺前行!前言大家都知道已经2020年了,也到了新的一年。作为一个主营电商的公司,年底都会很忙。所以最近的更新进度也停滞不前,本来准备大侃PHP设计模式的,但是因...

    豆浆大叔 21天前 3 吐槽 83 围观 技术分享
  • php如何实现钩子与实践案例

    前言学而不思则罔,思而不学则殆。30则而立,头顶正则脱光!昨天晚上,突然想起了PHP中的钩子如何使用?说实话,像dz,wordpress,TP,CI框架都已经集成了Hook钩子,尽管我不怎么使用框架以...

    豆浆大叔 1个月前 0 吐槽 43 围观 技术分享
  • Linux无法显示ip地址的解决办法

    今天想趁着有时间,用虚拟机调试一下lua脚本和其他的功能,结果启动虚拟机使用xshell连接不上,然后使用终端查看IP地址无法查看到,记录一下排查错误流程。查看IP地址使用ip addr 或者 ifc...

    豆浆大叔 1个月前 0 吐槽 89 围观 技术分享
  • 高并发性能指标QPS,TPS,RT,并发数,吞吐量是指什么?

    QPS,每秒查询QPS:Queries Per Second意思是“每秒查询率”,是一台服务器每秒能够相应的查询次数,是对一个特定的查询服务器在规定时间内所处理流量多少的衡量标准。互联网中,作为域名系...

    豆浆大叔 1个月前 0 吐槽 177 围观 技术分享
  • 分享一些PHP常用的小算法

    下面分享一些最常见的算法,用PHP如何实现,拓展下知识面。冒泡排序function bubble_sort($arr) { $n=count($arr); for($i=0;$i<$n-1;$...

    我也庸俗 2个月前 0 吐槽 86 围观 技术分享
表情
  • [:821l1001:]
  • [:821l1002:]
  • [:821l1003:]
  • [:821l1004:]
  • [:821l1005:]
  • [:821l1006:]
  • [:821l1007:]
  • [:821l1008:]
  • [:821l1009:]
  • [:821l1010:]
  • [:821l1011:]
  • [:821l1012:]
  • [:821l1013:]
  • [:821l1014:]
  • [:821l1015:]
  • [:821l1016:]
  • [:821l1017:]
  • [:821l1018:]
  • [:821l1019:]
  • [:821l1020:]
  • [:821l1021:]
  • [:821l1022:]
  • [:821l1023:]
  • [:821l1024:]
  • [:821l1025:]
  • [:821l1026:]
  • [:821l1027:]
  • [:821l1028:]
  • [:821l1029:]
  • [:821l1030:]
  • [:821l1031:]
  • [:821l1032:]
  • [:821l1033:]
  • [:821l1034:]
  • [:821l1035:]
  • [:821l1036:]
  • [:821l1037:]
  • [:821l1038:]
  • [:821l1039:]
  • [:821l1040:]
  • [:821l1041:]
  • [:821l1042:]
  • [:821l1043:]
  • [:821l1044:]
  • [:821l1045:]
  • [:821l1046:]
  • [:821l1047:]
  • [:821l1048:]
  • [:821l1049:]
  • [:anger:]
  • [:applause:]
  • [:awkward:]
  • [:brokenheart:]
  • [:clown:]
  • [:confused:]
  • [:decline:]
  • [:diggingmouth:]
  • [:eyebrows:]
  • [:grinning:]
  • [:haha:]
  • [:ill:]
  • [:kiss:]
  • [:lascivious:]
  • [:laugh:]
  • [:love:]
  • [:lovely:]
  • [:rhinorrhea:]
  • [:smile:]
  • [:solid:]
  • [:strong:]
  • [:sweat:]
  • [:tearcollapse:]
  • [:tongue:]
  • [:uncomfortable:]
  • [:weak:]
  • [:worry:]
Tips:支持Markdown语法

8 个评论

资深PHP工程师 @ 某一线大厂

登录

第三方账号登录:
GitHub
微信
微博