微服务架构中的链路追踪
Logging、Metrics、Tracing
从Monitoring到Observability
Tracing
基于zipkin和starrocks构建链路追踪分析系统
数据采集
数据存储
分析计算
实践效果
总结与展望
参考文档
❝本文主要介绍搜狐智能媒体,在微服务体系架构下,使用
zipkin
进行服务链路追踪(Tracing
)的埋点采集,将采集的Trace
信息存储到starrocks
中,通过starrocks
强大的SQL计算能力,对Tracing
信息进行多维度的统计、分析等操作,提升微服务监控能力,从简单统计的Monitoring
上升到更多维度探索分析的Observability
。
本文主要分为三个部分,第一节主要介绍微服务下的常用监控方式,其中链路追踪技术,可以串联整个服务调用链路,获得整体服务的关键信息,对微服务的监控有非常重要的意义;第二节主要介绍搜狐智能媒体是如何构建链路追踪分析体系的,主要包括Zipkin
的数据采集,starrocks
的数据存储,以及根据应用场景对starrocks
进行分析计算等三个部分;第三节主要介绍搜狐智能媒体通过引入zipkin
和starrocks
进行链路追踪分析取得的一些实践效果。
近年来,企业IT应用架构逐步向微服务、云原生等分布式应用架构演进,在搜狐智能媒体内部,应用服务按照微服务
、Docker
、Kubernetes
、Spring Cloud
等架构思想和技术方案进行研发运维,提升部门整体工程效率。
微服务架构提升工程效率的同时,也带来了一些新的问题。微服务是一个分布式架构,它按业务划分服务单元,用户的每次请求不再是由某一个服务独立完成了,而是变成了多个服务一起配合完成。这些服务可能是由不同的团队、使用不同的编程语言实现,可能布在了不同的服务器、甚至不同的数据中心。如果用户请求出现了错误和异常,微服务分布式调用的特性决定了这些故障难以定位,相对于传统的单体架构,微服务监控面临着新的难题。
微服务监控可以包含很多方式,按照监测的数据类型主要划分为Logging
、Metrics
和Tracing
三大领域:
❝用户主动记录的离散事件,记录的信息一般是非结构化的文本内容,在用户进行问题分析判断时可以提供更为详尽的线索。
❝具有聚合属性的采集数据,旨在为用户展示某个指标在某个时段的运行状态,用于查看一些指标和趋势。
❝记录一次请求调用的生命周期全过程,其中包括服务调用和处理时长等信息,含有请求上下文环境,由一个全局唯一的
Trace ID
来进行标识和串联整个调用链路,非常适合微服务架构的监控场景。
三者的关系如上图所示,这三者之间也是有重叠的,比如Logging
可以聚合相关字段生成Metrics
信息,关联相关字段生成Tracing
信息;Tracing
可以聚合查询次数生成Metrics
信息,可以记录业务日志生成Logging
信息。一般情况下要在Metrics
和Logging
中增加字段串联微服务请求调用生命周期比较困难,通过Tracing
获取Metrics
和Logging
则相对容易很多。
另外,这三者对存储资源有着不同的需求,Metrics
是天然的压缩数据,最节省资源;Logging
倾向于无限增加的,甚至会超出预期的容量;Tracing
的存储容量,一般介于Metrics
和Logging
两者之间,另外还可通过采样率进一步控制容量需求。
❝Monitoring tells you whether the system works. Observability lets you ask why it's not working.
– Baron Schwarz
微服务监控从数据分析层次,可以简单分为Monitoring
和Observability
。
❝告诉你系统是否在工作,对已知场景的预定义计算,对各种监控问题的事前假设,对应上图
Known Knowns
和Known Unknowns
,都是事先假设可能会发生的事件,包括已经明白和不明白的事件。
❝可以让你询问系统为什么不工作,对未知场景的探索式分析,对任意监控问题的事后分析,对应上图
Unknown Knowns
和Unknown Unknowns
,都是事未察觉可能会发生的事件,包括已经明白和不明白的事件。
很显然,通过预先假设所有可能发生事件进行Monitoring
的方式,已经不能满足微服务复杂的监控场景,我们需要能够提供探索式分析的Observability
监控方式。在Logging
、Metrics
和Tracing
,Tracing是目前能提供多维度监控分析能力的最有效方式。
链路追踪Tracing Analysis
为分布式应用的开发者提供了完整的调用链路还原、调用请求量统计、链路拓扑、应用依赖分析等工具,可以帮助开发者快速分析和诊断分布式应用架构下的性能瓶颈,提高微服务时代下的开发诊断效率。
Tracing
可以串联微服务中分布式请求的调用链路,在微服务监控体系中有着重要的作用。另外,Tracing
介于Metrics
和Logging
之间,既可以完成Monitoring
的工作,也可以进行Observability
的分析,提升监控体系建设效率。
链路追踪(Tracing
)系统,需要记录一次特定请求经过的上下游服务调用链路,以及各服务所完成的相关工作信息。如下图所示的微服务系统,用户向服务A
发起一个请求,服务A会生成一个全局唯一的Trace ID
,服务A
内部Messaging
方式调用相关处理模块(比如跨线程异步调用等),服务A
模块再通过RPC
方式并行调用服务B
和服务C
,服务B
会即刻返回响应,但服务C
会采用串行方式,先用RPC
调用服务D
,再用RPC
调用服务E
,然后再响应服务A
的调用请求,服务A
在内部两个模块调用处理完后,会响应最初的用户请求。最开始生成的Trace ID
会在这一系列的服务内部或服务之间的请求调用中传递,从而将这些请求调用连接起来。另外,Tracing
系统还会记录每一个请求调用处理的timestamp
、服务名等等相关信息。
注:服务内部串行调用对系统性能有影响,一般采用并行调用方式,后续章节将只考虑并行调用场景。
在Tracing
系统中,主要包含Trace
和Span
两个基础概念,下图展示了一个由Span
构成的Trace
。
Trace
树中的节点,如下图所示的由Span
构成的Trace
树,树中的Span
节点之间存在父子关系。Span
主要包含Span名称
、Span ID
、父ID
,以及timestamp
、dration
(包含子节点调用处理的duration
)、业务数据
等其他log
信息。Span
根据调用方式可以分为RPC Span
和Messaging Span
:
❝由
RPC Tracing
生成,分为Client
和Server
两类Span
,分别由RPC
服务调用的Client
节点和Server
节点记录生成,两者共享Span ID
、Parent Span ID
等信息,但要注意,这两个Span
记录的时间是有偏差,这个偏差是服务间的调用开销,一般是由网络传输开销、代理服务或服务接口消息排队等情况引起的。
❝由
Messaging Tracing
生成,一般用于Tracing
服务内部调用,不同于RPC Span
,Messaging Span
之间不会共享Span ID
等信息。
根据Tracing
的系统模型,可获得服务响应等各类Metric
信息,用于Alerting
、DashBoard
查询等;也可根据Span
组成的链路,分析单个或整体服务情况,发现服务性能瓶颈、网络传输开销、服务内异步调用设计等各种问题。如下图所示,相比于Metrics
和Logging
,Tracing
可以同时涵盖监控的Monitoring
和Observability
场景,在监控体系中占据重要位置,Opentracing
、Opencensus
、Opentelemetry
等协会和组织都包含对Tracing
的支持。
从微服务的角度,Tracing
记录的Span
信息可以进行各种维度的统计和分析。下图基于HTTP API
设计的微服务系统为例,用户查询Service1的 /1/api 接口,Service1再请求Service2的 /2/api,Service2内部异步并发调用msg2.1和msg2.2,msg2.1请求Service3的 /3/api接口,msg2.2请求Service4的 /4/api接口,Service3内部调用msg3,Service4再请求Service5的 /5/api,其中Service5没有进行Tracing
埋点,无法采集Service5的信息。
针对上图的微服务系统,可以进行如下两大类的统计分析操作:
关注单个服务运行情况,比如对外服务接口和上游接口查询的性能指标等,分析场景主要有:
❝如Service1提供的 /1/api ,Service4提供的 /4/api等,统计获得次数、QPS、耗时百分位数、出错率、超时率等等
metric
信息。
❝如Service1请求的 /2/api ,Service4请求的 /5/api等,统计查询次数、QPS、耗时百分位数、出错率、超时率等等
metric
信息。
❝服务对外接口在内部可能会被分拆为多个
span
,可以按照span name
进行分组聚合统计,发现耗时最长的span
等,如Service2接口 /2/api ,接口服务内部Span
包括 /2/api 的Server Span
,call2.1对应的Span
和call2.2对应的Span
,通过Span
之间的依赖关系可以算出这些Span
自身的耗时duraion
,进行各类统计分析。
在进行微服务整体分析时,我们将单个服务看作黑盒,关注服务间的依赖、调用链路上的服务热点等,分析场景主要有:
❝可以根据服务间调用的
Client Span
和Server Span
,获得整个服务系统的拓扑结构,以及服务之间调用请求次数
、duration
等统计信息。
❝分析某个对外请求接口的调用链路上的性能瓶颈,这个瓶颈可能是某个服务内部处理开销造成的,也可能是某两个服务间的网络调用开销等等原因造成的。
❝对于一次调用涉及到数十个以上微服务的复杂调用请求,每次出现的性能瓶颈很可能都会不一样,此时就需要进行聚合统计,算出性能瓶颈出现频次的排名,分析出针对性能瓶颈热点的服务或服务间调用。
以上仅仅是列举的部分分析场景,Tracing
提供的信息其实可以支持更多的metric
统计和探索式分析场景,本文不再一一例举。
链路追踪系统主要分为数据采集、数据存储和分析计算三大部分,目前使用最广泛的开源链路追踪系统是Zipkin
,它主要包括数据采集和分析计算两大部分,底层的存储依赖其他存储系统。搜狐智能媒体在构建链路追踪系统时,最初采用Zipkin
+ElasticSearch
得方式进行构建,后增加starrocks
作为底层存储系统,并基于starrocks
进行分析统计,系统总体架构如下图。
Zipkin
支持客户端全自动埋点,只需将相关库引入应用程序中并简单配置,就可以实现Span
信息自动生成,Span
信息通过HTTP
或Kafka
等方式自动进行上传。Zipkin
目前提供了绝大部分语言的埋点采集库,如Java
语言的Spring Cloud
提供了Sleuth
与Zipkin
进行深度绑定,对开发人员基本做到透明使用。为了解决存储空间,在使用时一般要设置1/100左右的采样率,Dapper
的论文中提到即便是1/1000的采样率,对于跟踪数据的通用使用层面上,也可以提供足够多的信息。
对应图6,下面给出了Zipkin Span
埋点采集示意图(图8),具体流程如下:
Request
中,不含有Trace
和Span
信息,Service1会创建一个Server Span
,随机生成全局唯一的TraceID
(如图中的X)和SpanId
(如图中的A,此处的X和A会使用相同的值),记录Timestamp
等信息;Service1在给用户返回Response
时,Service1会统计Server Span
的处理耗时duration
,会将包含TraceID
、SpanID
、Timestamp
、duration
等信息的Server Span
完整信息进行上报。Client Span
,使用X作为Trace ID
,随机生成全局唯一的SpanID
(如图中的B),记录Timestamp
等信息,同时Service1
会将Trace ID
(X)和SpanID
(B)传递给Service2
(如在HTTP
协议的HEADER
中添加TraceID
和SpanID
等相关字段);Service1在收到Service2的响应后,Service1会处理Client Span
相关信息,并将Client Span
进行上报Request
中,包含Trace
(X)和Span
(B)等信息,Service2会创建一个Server Span
,使用X作为Trace ID
,B作为SpanID
,内部调用msg2.1和msg2.2同时,将Trace ID
(X)和SpanID
(B)传递给它们;Service2在收到msg2.1和msg2.2的返回后,Service1会处理Server Span
相关信息,并将此Server Span
进行上报Messaging Span
,使用X作为Trace ID
,随机生成全局唯一的SpanID
(如图中的C和F),记录Timestamp
等信息,分别向Service3和Service4发送请求;msg2.1和msg2.2收到响应后,会分别处理Messaging Span
相关信息,并将两个Messaging Span
进行上报Client Span
,使用X作为Trace ID
,随机生成全局唯一的SpanID
(如图中的D和G),记录Timestamp
等信息,同时Service2
会将Trace ID
(X)和SpanID
(D或G)传递给Service3和Service4;Service12在收到Service3和Service3的响应后,Service2会分别处理Client Span
相关信息,并将两个Client Span
进行上报Request
中,包含Trace
(X)和Span
(D)等信息,Service3会创建一个Server Span
,使用X作为Trace ID
,D作为SpanID
,内部调用msg3;Service3在收到msg3的返回后,Service3会处理此Server Span
相关信息,并将此Server Span
进行上报Messaging Span
,使用X作为Trace ID
,随机生成全局唯一的SpanID
(如图中的E),记录Timestamp
等信息,msg3处理完成后,处理此Messaging Span
相关信息,并将此Messaging Span
进行上报Request
中,包含Trace
(X)和Span
(G)等信息,Service4会创建一个Server Span
,使用X作为Trace ID
,G作为SpanID
,再向Service5发送请求;Service4在收到Service5的响应后,Service4会处理此Server Span
相关信息,并将此Server Span
进行上报Client Span
,使用X作为Trace ID
,随机生成全局唯一的SpanID
(如图中的H),记录Timestamp
等信息,同时Service4会将Trace ID
(X)和SpanID
(H)传递给Service5;Service4在收到Service5的响应后,Service4会处理Client Span
相关信息,并将此Client Span
进行上报上面整个Trace X
调用链路会生成的Span
记录如下图,每个Span
主要会记录Span Id
、Parent Id
、Kind
(CLIENT表示RPC CLIENT端Span,SERVER表示RPC SERVER端SPAN,NULL表示Messaging SPAN),SN
(Service Name),还会包含Trace ID
,时间戳
、duration
等信息。Service5没有进行Zipkin
埋点采集,因此不会有Service5的Span
记录。
设置了Zipkin
埋点的应用服务,默认会使用Json
格式向Kafka
上报Span
信息,上报的信息主要有如下几个注意点:
Span
,组成一个Json
数组上报Json
数组里包含不同Trace
的Span
,即不是所有的Trace ID
都相同Http
、Grpc
、Dubbo
等),除了主要字段相同外,在tags
中会各自记录一些不同的字段[
{
"traceId": "3112dd04c3112036",
"id": "3112dd04c3112036",
"kind": "SERVER",
"name": "get /2/api",
"timestamp": 1618480662355011,
"duration": 12769,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"remoteEndpoint": {
"ipv4": "111.25.140.166",
"port": 50214
},
"tags": {
"http.method": "GET",
"http.path": "/2/api",
"mvc.controller.class": "Controller",
"mvc.controller.method": "get2Api"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "3112dd04c3112036",
"id": "b4bd9859c690160a",
"name": "msg2.1",
"timestamp": 1618480662357211,
"duration": 11069,
"localEndpoint": {
"serviceName": "SERVICE2"
},
"tags": {
"class": "MSG",
"method": "msg2.1"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "3112dd04c3112036",
"id": "c31d9859c69a2b21",
"name": "msg2.2",
"timestamp": 1618480662357201,
"duration": 10768,
"localEndpoint": {
"serviceName": "SERVICE2"
},
"tags": {
"class": "MSG",
"method": "msg2.2"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "b4bd9859c690160a",
"id": "f1659c981c0f4744",
"kind": "CLIENT",
"name": "get /3/api",
"timestamp": 1618480662358201,
"duration": 9206,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"tags": {
"http.method": "GET",
"http.path": "/3/api"
}
},
{
"traceId": "3112dd04c3112036",
"parentId": "c31d9859c69a2b21",
"id": "73cd1cab1d72a971",
"kind": "CLIENT",
"name": "get /4/api",
"timestamp": 1618480662358211,
"duration": 9349,
"localEndpoint": {
"serviceName": "SERVICE2",
"ipv4": "172.24.132.32"
},
"tags": {
"http.method": "GET",
"http.path": "/4/api"
}
}
]
zipkin
支持MySQL
、Cassandra
和ElasticSearch
三种数据存储,这三者都存在各自的缺点:
Tracing
信息基本都在每天上亿行甚至百亿行以上,MySQL
无法支撑这么大数据量。Trace
的Span
信息分析,但对聚合查询等数据统计分析场景支持不好Trace
的分析和简单的聚合查询分析,但对于一些较复杂的数据分析计算不能很好的支持,比如涉及到join
、窗口函数
等等的计算需求,尤其是任务间依赖计算,zipkin
目前还不能实时计算,需要通过离线跑spark
任务计算任务间依赖信息。我们在实践中也是首先使用ElasticSearch,发现了上面提到的问题,比如zipkin
的服务依赖拓扑必须使用离线方式计算,便新增了starrocks
作为底层数据存储。将zipkin
的trace
数据导入到starrocks
很方便,基本步骤只需要两步,CREATE TABLE
+CREATE ROUTINE LOAD
。另外,在调用链路性能瓶颈分析场景中,要将单个服务看作黑盒,只关注RPC SPAN
,屏蔽掉服务内部的Messaging Span
,使用了Flink
对服务内部span
进行ParentID
溯源,即从RPC Client SPAN
,一直追溯到同一服务同一Trace ID
的RPC Server SPAN
,用RPC Server SPAN
的ID替换RPC Client SPAN
的parentId
,最后通过flink-connector-starrocks
将转换后的数据实时写入starrocks。
基于starrocks
的数据存储架构流程如下图所示。
建表语句示例参考如下,有如下几点注意点:
zipkin
和zipkin_trace_perf
两张表,zipkin_trace_perf
表只用于调用链路性能瓶颈分析场景,其他统计分析都适用zipkin
表timestamp
字段,生成dt
、hr
、min
时间字段,便于后续统计分析DUPLICATE
模型、bitmap
索引等设置,加快查询速度zipkin
表使用id
作为分桶字段,在查询服务拓扑时,查询计划会优化为colocate join
,提升查询性能。CREATE TABLE `zipkin` (
`traceId` varchar(24) NULL COMMENT "",
`id` varchar(24) NULL COMMENT "Span ID",
`localEndpoint_serviceName` varchar(512) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`parentId` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`hr` int(11) NULL COMMENT "",
`min` bigint(20) NULL COMMENT "",
`kind` varchar(16) NULL COMMENT "",
`duration` int(11) NULL COMMENT "",
`name` varchar(300) NULL COMMENT "",
`localEndpoint_ipv4` varchar(16) NULL COMMENT "",
`remoteEndpoint_ipv4` varchar(16) NULL COMMENT "",
`remoteEndpoint_port` varchar(16) NULL COMMENT "",
`shared` int(11) NULL COMMENT "",
`tag_error` int(11) NULL DEFAULT "0" COMMENT "",
`error_msg` varchar(1024) NULL COMMENT "",
`tags_http_path` varchar(2048) NULL COMMENT "",
`tags_http_method` varchar(1024) NULL COMMENT "",
`tags_controller_class` varchar(100) NULL COMMENT "",
`tags_controller_method` varchar(1024) NULL COMMENT "",
INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`id`) BUCKETS 100
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "100",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
CREATE TABLE `zipkin_trace_perf` (
`traceId` varchar(24) NULL COMMENT "",
`id` varchar(24) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`parentId` varchar(24) NULL COMMENT "",
`localEndpoint_serviceName` varchar(512) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`hr` int(11) NULL COMMENT "",
`min` bigint(20) NULL COMMENT "",
`kind` varchar(16) NULL COMMENT "",
`duration` int(11) NULL COMMENT "",
`name` varchar(300) NULL COMMENT "",
`tag_error` int(11) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`traceId`) BUCKETS 32
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "12",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
ROUTINE LOAD
创建语句示例如下:
CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS(
id,
kind,
localEndpoint_serviceName,
traceId,
`name`,
`timestamp`,
`duration`,
`localEndpoint_ipv4`,
`remoteEndpoint_ipv4`,
`remoteEndpoint_port`,
`shared`,
`parentId`,
`tags_http_path`,
`tags_http_method`,
`tags_controller_class`,
`tags_controller_method`,
tmp_tag_error,
tag_error = if(`tmp_tag_error` IS NULL, 0, 1),
error_msg = tmp_tag_error,
dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'),
hr = from_unixtime(`timestamp` / 1000000, '%H'),
`min` = from_unixtime(`timestamp` / 1000000, '%i')
) PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "50",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"max_error_number" = "1000000",
"strict_mode" = "false",
"format" = "json",
"strip_outer_array" = "true",
"jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]"
)
FROM
KAFKA (
"kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3",
"kafka_topic" = "XXXXXXXXX"
);
针对调用链路性能瓶颈分析场景中,使用Flink
进行ParentID
溯源,代码示例如下:
env
// 添加kafka数据源
.addSource(getKafkaSource())
// 将采集到的Json字符串转换为JSONArray,
// 这个JSONArray是从单个服务采集的信息,里面会包含多个Trace的Span信息
.map(JSON.parseArray(_))
// 将JSONArray转换为JSONObject,每个JSONObejct就是一个Span
.flatMap(_.asScala.map(_.asInstanceOf[JSONObject]))
// 将Span的JSONObject对象转换为Bean对象
.map(jsonToBean(_))
// 以traceID+localEndpoint_serviceName作为key对span进行分区生成keyed stream
.keyBy(span => keyOfTrace(span))
// 使用会话窗口,将同一个Trace的不同服务上的所有Span,分发到同一个固定间隔的processing-time窗口
// 这里为了实现简单,使用了processing-time session窗口,后续我们会使用starrocks的UDAF函数进行优化,去掉对Flink的依赖
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 使用Aggregate窗口函数
.aggregate(new TraceAggregateFunction)
// 将经过溯源的span集合展开,便于调用flink-connector-starrocks
.flatMap(spans => spans)
// 使用flink-connector-starrocks sink,将数据写入starrocks中
.addSink(
StarRocksSink.sink(
StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))
以图6作为一个微服务系统用例,给出各个统计分析场景对应的Starrocks SQL
语句。
下面的sql
使用zipkin
表数据,计算服务Service2请求上游服务Service3和上游服务Service4的查询统计信息,按小时和接口分组统计查询指标
select
hr,
name,
req_count,
timeout / req_count * 100 as timeout_rate,
error_count / req_count * 100 as error_rate,
avg_duration,
tp95,
tp99
from
(
select
hr,
name,
count(1) as req_count,
AVG(duration) / 1000 as avg_duration,
sum(if(duration > 200000, 1, 0)) as timeout,
sum(tag_error) as error_count,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
zipkin
where
localEndpoint_serviceName = 'Service2'
and kind = 'CLIENT'
and dt = 20220105
group by
hr,
name
) tmp
order by
hr
下面的sql
使用zipkin
表数据,计算服务Service2响应下游服务Service1的查询统计信息,按小时和接口分组统计查询指标
select
hr,
name,
req_count,
timeout / req_count * 100 as timeout_rate,
error_count / req_count * 100 as error_rate,
avg_duration,
tp95,
tp99
from
(
select
hr,
name,
count(1) as req_count,
AVG(duration) / 1000 as avg_duration,
sum(if(duration > 200000, 1, 0)) as timeout,
sum(tag_error) as error_count,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
zipkin
where
localEndpoint_serviceName = 'Service2'
and kind = 'SERVER'
and dt = 20220105
group by
hr,
name
) tmp
order by
hr
下面的sql
使用zipkin
表数据,查询服务Service2
的接口 /2/api,按span name
分组统计duration
等信息
with
spans as (
select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2"
),
api_spans as (
select
spans.id as id,
spans.parentId as parentId,
spans.name as name,
spans.duration as duration
from
spans
inner JOIN
(select * from spans where kind = "SERVER" and name = "/2/api") tmp
on spans.traceId = tmp.traceId
)
SELECT
name,
AVG(inner_duration) / 1000 as avg_duration,
percentile_approx(inner_duration, 0.95) / 1000 AS tp95,
percentile_approx(inner_duration, 0.99) / 1000 AS tp99
from
(
select
l.name as name,
(l.duration - ifnull(r.duration, 0)) as inner_duration
from
api_spans l
left JOIN
api_spans r
on l.parentId = r.id
) tmp
GROUP BY
name
下面的sql
使用zipkin
表数据,计算服务间的拓扑关系,以及服务间接口duration
的统计信息
with tbl as (select * from zipkin where dt = 20220105)
select
client,
server,
name,
AVG(duration) / 1000 as avg_duration,
percentile_approx(duration, 0.95) / 1000 AS tp95,
percentile_approx(duration, 0.99) / 1000 AS tp99
from
(
select
c.localEndpoint_serviceName as client,
s.localEndpoint_serviceName as server,
c.name as name,
c.duration as duration
from
(select * from tbl where kind = "CLIENT") c
left JOIN
(select * from tbl where kind = "SERVER") s
on c.id = s.id and c.traceId = s.traceId
) as tmp
group by
client,
server,
name
下面的sql
使用zipkin_trace_perf
表数据,针对某个服务接口响应超时的查询请求,统计出每次请求的调用链路中处理耗时最长的服务或服务间调用,进而分析出性能热点是在某个服务或服务间调用。
select
service,
ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent
from
(
select
traceId,
service,
duration,
ROW_NUMBER() over(partition by traceId order by duration desc) as rank4
from
(
with tbl as (
SELECT
l.traceId as traceId,
l.id as id,
l.parentId as parentId,
l.kind as kind,
l.duration as duration,
l.localEndpoint_serviceName as localEndpoint_serviceName
FROM
zipkin_trace_perf l
INNER JOIN
zipkin_trace_perf r
on l.traceId = r.traceId
and l.dt = 20220105
and r.dt = 20220105
and r.tag_error = 0 -- 过滤掉出错的trace
and r.localEndpoint_serviceName = "Service1"
and r.name = "/1/api"
and r.kind = "SERVER"
and r.duration > 200000 -- 过滤掉未超时的trace
)
select
traceId,
id,
service,
duration
from
(
select
traceId,
id,
service,
(c_duration - s_duration) as duration,
ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2
from
(
select
c.traceId as traceId,
c.id as id,
concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service,
c.duration as c_duration,
ifnull(s.duration, 0) as s_duration
from
(select * from tbl where kind = "CLIENT") c
left JOIN
(select * from tbl where kind = "SERVER") s
on c.id = s.id and c.traceId = s.traceId
) tmp1
) tmp2
where
rank2 = 1
union ALL
select
traceId,
id,
service,
duration
from
(
select
traceId,
id,
service,
(s_duration - c_duration) as duration,
ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2
from
(
select
s.traceId as traceId,
s.id as id,
s.localEndpoint_serviceName as service,
s.duration as s_duration,
ifnull(c.duration, 0) as c_duration,
ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank
from
(select * from tbl where kind = "SERVER") s
left JOIN
(select * from tbl where kind = "CLIENT") c
on s.id = c.parentId and s.traceId = c.traceId
) tmp1
where
rank = 1
) tmp2
where
rank2 = 1
) tmp3
) tmp4
where
rank4 = 1
GROUP BY
service
order by
percent desc
sql
查询的结果如下图所示,在超时的Trace
请求中,性能瓶颈服务或服务间调用的比例分布。
目前搜狐智能媒体已在30+ 个服务中接入Zipkin
,涵盖上百个线上服务实例,1% 的采样率每天产生近10亿多行的日志。
通过zipkin server
查询starrocks
,获取的Trace
信息如下图所示:
通过zipkin server
查询starrocks
,获取的服务拓扑信息如下图所示:
基于Zipkin
+starrocks
的链路追踪体系实践过程中,明显提升了微服务监控分析能力和工程效率:
提升微服务监控分析能力
starrocks
查询统计线上服务当前时刻的响应延迟百分位数、错误率等指标,根据这些指标及时产生各类告警;starrocks
按天、小时、分钟等粒度统计服务响应延迟的各项指标,更好的了解服务运行状况;starrocks
强大的SQL
计算能力,可以进行服务、时间、接口等多个维度的探索式分析查询,定位故障原因。提升微服务监控工程效率
Metric
和Logging
数据采集,很多需要用户手动埋点和安装各种采集器Agent
,数据采集后存储到ElasticSearch
等存储系统,每上一个业务,这些流程都要操作一遍,非常繁琐,且资源分散不易管理。
而使用zipkin
+starrocks
的方式,只需在代码中引入对应库SDK
,设置上报的Kafka
地址和采样率等少量配置信息,Tracing
便可自动埋点采集,通过zikpin server
界面进行查询分析,非常简便。
基于zipkin
+starrocks
构建链路追踪系统,能够提供微服务监控的Monitoring
和Observability
能力,提升微服务监控的分析能力和工程效率。
后续有几个优化点,可以进一步提升链路追踪系统的分析能力和易用性:
starrocks
的UDAF
、窗口函数等功能,将parentID
溯源下沉到starrocks
计算,通过计算后置的方式,取消对Flink
的依赖,进一步简化整个系统架构。tags
等字段,并没有完全采集,starrocks
正在实现json
数据类型,能够更好的支持tags
等嵌套数据类型。Zipkin Server
目前的界面还稍显简陋,我们已经打通了Zipkin Server
查询starrokcs
,后续会对Zipkin Server
进行UI
等优化,通过starrocks
强大的计算能力实现更多的指标查询,进一步提升用户体验。Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8