Contents
  1. 1. 写在前面
  2. 2. Packetbeat项目介绍
    1. 2.1. 安装GO
    2. 2.2. GOPATH
    3. 2.3. 获取项目
    4. 2.4. yml配置文件说明
    5. 2.5. 需要修改哪些文件
  3. 3. 原生Thrift简单分析
    1. 3.1. 通讯协议格式
    2. 3.2. 核心模型
  4. 4. Finagle-thrift协议分析
    1. 4.1. 测试数据IDL
    2. 4.2. 一次RPC调用的差异
      1. 4.2.1. 原生Thrift调用
      2. 4.2.2. Fiangle-thrift调用及分析
  5. 5. 参考

写在前面

最近一年多一直在做服务治理相关的开发工作. 起初服务监控采用了成本比较低的方式来实现(提供者,消费者自己按分钟维度上报健康数据到Redis,但是这种方式只是在Java的服务提供者和消费者做到了很好的实现, 其他语言目前只能上报很少部分的监控数据). 因为公司的开发语言是多样的, 其中包括: Nodejs, Ruby, Golang, Java, Scala等, 那么将来要对监控数据的模型拓展, 需求变更等, 将很难快速推广实现. 随着公司业务的高速发展, 以及将来所有服务部署Docker化, 服务的监控预警已经是服务治理工作中的重中之重. 服务监控最好可以同时监控基础服务(Mysql, Redis等),业务服务. 我们的业务服务是采用的Twitter的Finagle-thrift实现多语言之间的RPC调用. Balabala说了这么多, 就是我们现在要做全链路监控, 做监控首先第一步是需要可以收集到这些网络调用的原始数据, 这个时候ElasticStack中的Beats项目进入了我们的视线, Beats项目中的Packetbeat子项目可以抓取到像Mysql, Redis, Thrift等协议的数据包. 但是,我们业务使用的通信协议是Finagle-thrift, 这里面为了满足一些拓展(比如:用于RPC调用链跟踪的Zipkin),Finagle-thrift在原生Thrift上做了二次封装, 接下来需要让Packetbeat对Finagle-thrift协议支持. 下面我将分析过程整理如下, 方便以后温习回顾.

Packetbeat项目介绍

更详细的请参考 Medcl的一个教程

整个Beats项目都是用的Golang语言开发, Golang这几天也是现学现卖, 我在整个调试过程中没有找到可以比较方便进行Debug的方式, 只能通过fmt.Println进行各种调试信息的输出, 这个过程比较痛苦. 这里我顺便记录一下怎么配置Go的环境, 有几个概念比较懵,在此记录一下.

安装GO

这里 获取对应的操作系统的GO安装bao

GOPATH

  • 安装好Go后需要设置环境变量,如下:

    1
    2
    3
    4
    5
    6
    7
    #这是Go的安装路径
    export GOROOT=/usr/local/go
    export GOBIN=$GOROOT/bin
    #这里可以理解为Go项目的工作空间, 这里允许有多个目录,注意用":"分割
    #当有多个GOPATH时,执行 go get命令的内容默认会放在第一个目录下
    export GOPATH=/work/goworkspace
  • GOPATH的的几个目录约定

    • src 放置Go项目的源码
    • pkg Go项目中使用的第三方包
    • bin 编译后生成的可执行文件, 可以把此目录加入到 PATH 变量中

获取项目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/
cd $GOPATH/src/github.com/elastic
#签出源码
git clone https://github.com/elastic/beats.git
cd beats
#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git
#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master
#签出一个名为finagle的分支,用于开发这个功能
git checkout -b finagle
#切换到packetbeat模块
cd packetbeat
#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep
#编译
make

yml配置文件说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
interfaces:
#如果提供者消费者在本机,直接写成lo0
device: en0
protocols:
# 自定义协议名
finaglethrift:
ports: [20880, 9090, 9091, 9099, 9098]
# 自定义Thrift的Transport type一定要是frame的方式, 否则解析不出来
transport_type: framed
protocol_type: binary
# idl文件一定要有
idl_files: ["test_cfg/result.thrift","test_cfg/order.thrift","test_cfg/hello.thrift"]
output:
elasticsearch:
hosts: ["192.168.10.235:9200"]
kafka:
hosts: ["192.168.5.159:9092"]
topic: "packetbeat_test_qqq"
shipper:
logging:
files:
path: /tmp/mybeat

需要修改哪些文件

  • 新增协议目录, packetbeat启动时会自动扫描protos目录下的协议包


    因为是要对Thrift协议进行拓展, 所以之前很多代码是可以复用的, 直接将原来的thrift目录在当前目录下复制一份, 直接改名为finaglethrift

  • 文件修改
    为了便于区分, 我们将原来所有文件名中的thrift变更为finaglethrift, 变更之后我们只需要修改finaglethrift.go文件即可.

    • 将包名从thrift变更为finaglethrift

    • 修改协议注册名,这里的名称直接匹配yml配置文件中的协议名

  • 协议解析的具体方法修改, 主要业务抓包分析将在这个方法中完成,我们本次改动也是针对这个方法的修改

原生Thrift简单分析

通讯协议格式

  • TCompactProtocol
  • TBinaryProtocol(我们主要采用这种格式进行通讯)
    TBinaryProtocol下通信方式采用TFramedTransport,即以帧的方式对数据进行传输

    注意: 服务端, 服务端需要采用Framed的方式进行通信, packetbeat采用Framed的方式进行抓包分析, 如果thrift的传输方式不是这种方式, packetbeat将解析不出

  • TJSONProtocol

核心模型

  • TTransport, 这是一个基类,我们使用的传输方式是Framed, 那么直接使用的TFramedTransport将继承TTransport. TFramedTransport会将数据写入到一个buf中, 等全部写完之后会调用flush方法,首先计算出buf中的数据长度,将4个字节的帧长度和数据内容进行封装进行发送. 针对解析方怎么判断是否解析完,都是通过发送的data中头四个字节判断.具体如下图:

    具体封装源代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Override
    public void flush() throws TTransportException {
    byte[] buf = writeBuffer_.get();
    int len = writeBuffer_.len();
    writeBuffer_.reset();
    # 封装成 4个字节 + 帧内容
    encodeFrameSize(len, i32buf);
    transport_.write(i32buf, 0, 4);
    transport_.write(buf, 0, len);
    transport_.flush();
    }
  • TProtocol, 协议接口, 我们主要是采用TBinaryProtocol的协议类进行通信, 其中实现了接口中的操作协议的方法. TBinaryProtocol需要为消息体封装一个Header, 其中还定义了Thrift中的读写模式(这里很重要,如果模式不匹配将无法正常解析),主要分为: 严谨的读写, 普通读写. 因为我们主要针对严谨读写模式进行抓包分析, 下面将重点解析一下在严谨读写模式下的消息体内容都是什么, 具体如下图:

    2016-04-30_11-39-09

在TBinaryProtocol中有对消息体的读取和写入操作, 具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Reading methods.
*/
public TMessage readMessageBegin() throws TException {
int size = readI32();
if (size < 0) {
int version = size & VERSION_MASK;
if (version != VERSION_1) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin");
}
return new TMessage(readString(), (byte)(size & 0x000000ff), readI32());
} else {
if (strictRead_) {
throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?");
}
return new TMessage(readStringBody(size), readByte(), readI32());
}
}
public void writeMessageBegin(TMessage message) throws TException {
if (strictWrite_) {
int version = VERSION_1 | message.type;
writeI32(version);
writeString(message.name);
writeI32(message.seqid);
} else {
writeString(message.name);
writeByte(message.type);
writeI32(message.seqid);
}
}
/**
* Message type constants in the Thrift protocol.
*
*/
public final class TMessageType {
public static final byte CALL = 1;
public static final byte REPLY = 2;
public static final byte EXCEPTION = 3;
public static final byte ONEWAY = 4;
}
  • TMessage, 服务提供者,消费者在进行RPC通信时都会讲传递的数据封装成TMessage, 主要包含三部分
    • 名称
    • 序号
    • 类型

Finagle-thrift协议分析

因为Finagle-thrift是在Thrift协议之上做了封装, 我们主要对着两个协议中具体的数据进行比对.

测试数据IDL

为了让测试具有代表性, 构建的IDL文件中既有简单的没有入参,返回值的finaglePing方法, 也有有入参,复杂返回值的detail方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
include "result.thrift"
/*订单*/
struct Order {
1:i32 userId
/*买家*/
2:string userName,
/*订单ID*/
3:string orderId,
}
struct OrderResult {
1:result.Result result,
2:optional Order order
}
service OrderServ{
/*订单详情*/
OrderResult detail(1:i32 userId, 2:string userName, 3:string orderId)
void finaglePing()
}
/************************复杂返回值Result的定义***************************/
struct FailDesc {
1:string name,
2:string failCode,
3:string desc
}
struct Result {
1:i32 code,
2:optional list<FailDesc> failDescList
}
struct StringResult {
1:Result result,
2:optional string value,
3:optional string extend
}

一次RPC调用的差异

原生Thrift调用

我们针对finaglePing方法通过原生Thrift进行一次RPC调用,并在Client端TcpDump出产生的数据包
2016-04-30_12-33-13
从图上可以看出,包含了3次握手, 1次Client与Server的业务请求交互, 4次挥手关闭连接.

下面我们看Client发送请求时的具体数据包内容如下图:
2016-04-30_12-39-12
这里包含数据长度, Thrift是否是严谨读写,消息类型, 消息内容等信息.

Fiangle-thrift调用及分析

我们针对finaglePing方法同样通过Fiangle-thrift方式进行一次RPC调用,并在Client端TcpDump出产生的数据包
2016-04-30_12-45-41
从上图看出, 一次RPC调用包含了, 3次握手, 1次fiangle确认协议的请求交互, 1次Client与Server的业务请求交互, 4次挥手关闭连接.

关于Client发送的请求和原生Thrift还不太一样, 在创建完连接之后, 需要发送
一次带有__can__finagle__trace__v3__信息的请求已确认是否是Finagle-thrift协议, 确认成功之后才会进行真正的业务交互, 这次确认是一次标准的Thrift通信,具体如下图:
2016-04-30_12-52-52

下面是在确认Fiangle标识之后进行的真正的业务通信,具体如下图:

2016-04-30_12-57-36

我们上面这张图中可以看出在标准的Thrift协议数据之前Finagle-thrift自己又加了很多自己的数据,具体加了什么, 我们来看一下Fiangle的源码, 具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* ThriftClientFramedCodec implements a framed thrift transport that
* supports upgrading in order to provide TraceContexts across
* requests.
*/
object ThriftClientFramedCodec {
/**
* Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodecFactory]].
* Passing a ClientId will propagate that information to the server iff the server is a finagle
* server.
*/
def apply(clientId: Option[ClientId] = None) =
new ThriftClientFramedCodecFactory(clientId)
def get() = apply()
}
class ThriftClientFramedCodecFactory(
clientId: Option[ClientId],
_useCallerSeqIds: Boolean,
_protocolFactory: TProtocolFactory)
extends CodecFactory[ThriftClientRequest, Array[Byte]]#Client {
def this(clientId: Option[ClientId]) = this(clientId, false, Protocols.binaryFactory())
def this(clientId: ClientId) = this(Some(clientId))
// Fix this after the API/ABI freeze (use case class builder)
def useCallerSeqIds(x: Boolean): ThriftClientFramedCodecFactory =
new ThriftClientFramedCodecFactory(clientId, x, _protocolFactory)
/**
* Use the given protocolFactory in stead of the default `TBinaryProtocol.Factory`
*/
def protocolFactory(pf: TProtocolFactory) =
new ThriftClientFramedCodecFactory(clientId, _useCallerSeqIds, pf)
/**
* Create a [[com.twitter.finagle.thrift.ThriftClientFramedCodec]]
* with a default TBinaryProtocol.
*/
def apply(config: ClientCodecConfig) =
new ThriftClientFramedCodec(_protocolFactory, config, clientId, _useCallerSeqIds)
}
class ThriftClientFramedCodec(
protocolFactory: TProtocolFactory,
config: ClientCodecConfig,
clientId: Option[ClientId] = None,
useCallerSeqIds: Boolean = false
) extends Codec[ThriftClientRequest, Array[Byte]] {
private[this] val preparer = ThriftClientPreparer(
protocolFactory, config.serviceName,
clientId, useCallerSeqIds)
def pipelineFactory: ChannelPipelineFactory =
ThriftClientFramedPipelineFactory
override def prepareConnFactory(
underlying: ServiceFactory[ThriftClientRequest, Array[Byte]]
) = preparer.prepare(underlying)
override val protocolLibraryName: String = "thrift"
}

Scala源码看起来太费劲, 既然知道了原理, 为了可以解析出具体的Fiangle-thrift中的东西, 我只需要设置FrameSize和data的offset的位置, 获取到原生的Thrift协议中的Framed数据即可, 然后复用Packetbeat自带的针对Thrift协议包的抓取与组合逻辑.

通过比对两个业务包我知道中间Fiangle-thrift自己添加的信息字节大小固定为129个字节,这只是Client在发送请求时才会添加这些附加信息, Server端返回值则是在原生Thrift协议中添加了1个字节, 其中我还需要排除创建连接之后发送的Finagle-thrift协议确认请求.

我们完成了普通Finagle-thrift协议的解析,接下来还要针对附带Zipkin信息的Finagle-thrift协议的解析, Zipkin是参考Google的Dapper完成的可以对RPC调用链进行跟踪的框架, 这已经是业内针对分布式系统之间RPC调用链跟踪的通用解决方案. Zipkin无非就是在RPC调用时多传输了TraceId, SpanId, ParentSpanId, IsSample等信息, 通过下面的Zipkin源码可以确定这些信息的大小也是固定字节,并且大小为4个字节. Zipkin关于这块的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* The wire format is (big-endian):
* ''spanId:8 parentId:8 traceId:8 flags:8''
*/
def tryUnmarshal(body: Buf): Try[TraceId] = {
if (body.length != 32)
return Throw(new IllegalArgumentException("Expected 32 bytes"))
val bytes = local.get()
body.write(bytes, 0)
val span64 = ByteArrays.get64be(bytes, 0)
val parent64 = ByteArrays.get64be(bytes, 8)
val trace64 = ByteArrays.get64be(bytes, 16)
val flags64 = ByteArrays.get64be(bytes, 24)
val flags = Flags(flags64)
val sampled = if (flags.isFlagSet(Flags.SamplingKnown)) {
if (flags.isFlagSet(Flags.Sampled)) someTrue else someFalse
} else None
val traceId = TraceId(
if (trace64 == parent64) None else Some(SpanId(trace64)),
if (parent64 == span64) None else Some(SpanId(parent64)),
SpanId(span64),
sampled,
flags)
Return(traceId)
}

下面是Fiangle-thrift针对Zipkin关闭和开启的一个抓包对比图:
2016-04-30_13-46-49

根据上面的分析逻辑,我就可以在Packetbeat中的messageParser方法中通过一些字节特征修正FrameSize和data的offset来把数据包变成原生的Thrift协议数据包, 具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (thrift *Thrift) messageParser(s *ThriftStream) (bool, bool) {
var ok, complete bool
var m = s.message
for s.parseOffset < len(s.data) {
dataStr := string(s.data)
switch s.parseState {
case ThriftStartState:
m.start = s.parseOffset
if thrift.TransportType == ThriftTFramed {
if len(s.data) < 4 {
return true, false
}
frameSize := common.Bytes_Ntohl(s.data[:4])
m.FrameSize = frameSize
s.parseOffset = 4
if (!strings.Contains(dataStr, "__can__finagle__trace__v3__")) {
var thriftFlagIndex1 int = bytes.LastIndex(s.data, thriftFlag1)
if thriftFlagIndex1> -1 {// 如果标识为80010001 那么代表是client->server
// client -> server
m.FrameSize = common.Bytes_Ntohl(s.data[:4]) - uint32(thriftFlagIndex1) - 4 // 从8001位置之后开始
s.parseOffset = thriftFlagIndex1// 从8001位置开始(包括8001位置)
}else{//如果没有标识为80010001, 那么应该有标识位80010002, 那么代表是server->client
// finagle 返回值
if bytes.LastIndex(s.data, thriftFlag2)==5 {
m.FrameSize = frameSize - 1
s.parseOffset = 4 + 1
}
}
}
}
... ...
}

参考

Thrift Tutorial

Packetbeat协议扩展开发教程

由浅入深了解Thrift

(转载本站文章请注明作者和出处 Panda

Contents
  1. 1. 写在前面
  2. 2. Packetbeat项目介绍
    1. 2.1. 安装GO
    2. 2.2. GOPATH
    3. 2.3. 获取项目
    4. 2.4. yml配置文件说明
    5. 2.5. 需要修改哪些文件
  3. 3. 原生Thrift简单分析
    1. 3.1. 通讯协议格式
    2. 3.2. 核心模型
  4. 4. Finagle-thrift协议分析
    1. 4.1. 测试数据IDL
    2. 4.2. 一次RPC调用的差异
      1. 4.2.1. 原生Thrift调用
      2. 4.2.2. Fiangle-thrift调用及分析
  5. 5. 参考