rabbitmq客户端重构问题汇总

工作项目XX云中,由于涉及各模块间消息通信逻辑,放弃内部协议的情况下,已实现部分是将rabbitmq作为消息中间件,完成消息的生产和消费。项目编程语言是C++, rabbitmq官方客户端对于C语言的支持这一块,是由一个rabbitmq-c的开源库来完成的。阅读其源码,并结合实际项目中的使用情况,发现现有库有以下不足

  1. 心跳模块:客户端和server端需要进行一定的数据发送来实时感知链路状态。其中对于服务端来说,达到心跳超时时间之后,会向客户端发送一个心跳包,然后在下一个心跳超时时间之内,只要客户端有任何类型的数据发送至服务端,服务端就可以判断出链接状态的正常,否则服务端在没有收到任何数据时,会认为链路异常,从而主动关闭链路。现有开源库中对于收到服务端心跳包之后,并不会发送任何数据,因此在业务不频繁时,很有可能导致服务端关闭链路。
  2. 底层网络IO交互使用同步模式,阻塞用户线程且性能表现不佳。

基于以上原因,老夫和另外两名同事近期的工作重点就是对rabbitmq-c端进行重构。
貌似有点偏题,本文不是记录重构工作(PS.重构工作基本已经完成,第一个版本已具备基本功能),而是为了记录在开发测试过程中遇到并且解决的问题,以便帮助记忆。

1.64位整型序列化问题

现象:调用生产者接口向服务端发送消息失败。

定位过程:

  • 代码走读,无明显逻辑错误。
  • tcpdump&wireshark抓包分析,发现在发送HEADER帧中,body_size字段异常,真实值为10(十进制),抓包结果显示为0x0000000a00000000(十六进制)
  • 具体分析该字段在赋值以及序列化阶段的代码逻辑发现,该字段类型为64位整型,对于该字段的序列化处理存在问题。原有代码中对64位整型的序列化和反序列化调用的是Linux系统接口htonl, ntohl,这样就会导致对这字段的前32位数据进行序列化,后32位不变,因此出现了前述的异常值。

修复过程:

  • 参考原rabbitmq-c开源库中逻辑,对于64位的整型处理,由于没有现成的系统接口可供调用,开源库作者通过将64位整型分成2个32位整型分别处理并且合并的方式来实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#define DECLARE_XTOXLL(func) \
static inline uint64_t func##ll(uint64_t val) \
{ \
union { \
uint64_t whole; \
uint32_t halves[2]; \
} u; \
uint32_t t; \
u.whole = val; \
t = u.halves[0]; \
u.halves[0] = func##l(u.halves[1]); \
u.halves[1] = func##l(t); \
return u.whole; \
}
#elif defined(AMQP_BIG_ENDIAN)
#define DECLARE_XTOXLL(func) \
static inline uint64_t func##ll(uint64_t val) \
{ \
union { \
uint64_t whole; \
uint32_t halves[2]; \
} u; \
u.whole = val; \
u.halves[0] = func##l(u.halves[0]); \
u.halves[1] = func##l(u.halves[1]); \
return u.whole; \
}
DECLARE_XTOXLL(hton)
DECLARE_XTOXLL(ntoh)

总结:

  • 该问题知识点涵盖了大小端,序列化和反序列化
  • C语言union特性
  • 在序列化和反序列化,通信协议这块,除了自己实现之外,还可以使用一些第三方优秀开源库,例如Google公司的ProtoBuf

2.malloc&free

现象:消费者收到的消息体中,某些字段(char*)内容异常。消息体中有两个字段分别为char* consume_tag, char* queue。正常显示应为consume_tag = “my message tag”; queue=”helloworld”。而异常情况下,有概率出现consume_tag = “my message tag”;queue=”hellowrold tag”.

定位过程:

  • 首先分析这两个字段的赋值和内存操作。查看代码核实,每次接收到新的消息时,对于消息体中char*变量指向的内存均是重新申请的,即调用malloc函数,之后在完成赋值,消息处理完成之后进行free内存释放。
  • 通过gdb运行程序,在接收消息之后的步骤中打断点,进行单步调试,在每次收到消息之后,将消息体内部各字段值print出来,重点分析异常字段的长度,内存首地址
  • 发现先前分配给consume_tag的内存,有可能在后续中被分配给queue,从而导致在print queue变量时,内容异常。
  • 分析glibc中malloc和free,glibc中会根据大小将内存分为不同的bin,根据用户申请的大小在不同的bin中分配内存,对于已经free的内存,glibc中类似会将该内存置为可用状态,以供下次用户申请时分配。因此上述问题产生的原因是不可避免的,但是由于消息体中对于消息的长度会有一个len字段来标记,因此用户仍可以取到正确的值。

总结:

  • gdb工具s(进入函数内部),b(断点),c(继续执行),watch(观察某变量,当其值发生变化时报告)等命令使用
  • 编译时加上-g符号,便于调试定位
  • glibc内存管理(这一块后续将单独整理一份文档来说明)

上面整理的两个问题都是比较基础,有些知识点也不够深入,后面再专门写一份博客来补充好啦。本文就权当笔记之用,以免时间一长便忘得一干二净。