欢迎大家来到IT世界,在知识的湖畔探索吧!
需求背景
StarRocks 的审计日志记录业务对SR服务端的请求日志,日志内容默认区分为 ‘slow_query’ 和 ‘query’ 2个模块(标签)的操作请求,除了直接对日志进行分析以外,还可以将审计日志记录从日志流入到sr的自定义表中,方便开发通过SQL查询方式对业务任务请求进行分析优化和性能问题的定位。
实现架构
软件环境需求
Centos 7.x
StarRocks 2.4.x(这里比较重要,不同版本日志内容有差异)
Kafka
filebeat
实现过程
创建kafka topic,用来存储filebeat收集的audit log。
./bin/kafka-topics.sh --create --bootstrap-server broker列表 --replication-factor 副本数 --partitions 分区数 --topic topic名称
欢迎大家来到IT世界,在知识的湖畔探索吧!
部署filebeat,推荐使用rpm包安装。
编写 filebeat.yml 配置文件,这里对版本有要求,我这个只适用sr2.4.0~2.4.2。
欢迎大家来到IT世界,在知识的湖畔探索吧!filebeat.inputs:
- type: log
enabled: true
ignore_older: 5m
include_lines: ['slow_query', 'query']
multiline:
pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}'
negate: true
match: after
paths:
- 这里写审计日志AuditLog的实际存储全路径
fields:
log_topics: 这里写KafkaTopic名称
processors:
- script:
lang: javascript
id: my_filter
tag: enable
source: >
function process(event) {
var str= event.Get("message");
var slow_time = str.substr(0, 19);
var query_type = str.substr(25,5);
var detail_query = str.substr(38);
var js_arr = detail_query.split("|");
var len = js_arr.length;
if(len == 18 && query_type =='query'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
else if(len == 22 && query_type == 'query'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var CpuCostNs_tmp = js_arr[12];
var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
var MemCostBytes_tmp = js_arr[13];
var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
var StmtId_tmp = js_arr[len-8];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-7];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-6];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-5];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-4];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-3];
var Digest = Digest_tmp.replace('Digest=','');
var PlanCpuCost_tmp = js_arr[len-2];
var PlanCpuCost = PlanCpuCost_tmp.replace('PlanCpuCost=','');
var PlanMemCost_tmp = js_arr[len-1];
var PlanMemCost = PlanMemCost_tmp.replace('PlanMemCost=','');
}
else if(len == 18 && query_type == 'slow_'){
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var CpuCostNs_tmp = js_arr[12];
var CpuCostNs = CpuCostNs_tmp.replace('CpuCostNs=','');
var MemCostBytes_tmp = js_arr[13];
var MemCostBytes = MemCostBytes_tmp.replace('MemCostBytes=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
else{
var Client_tmp = js_arr[0];
var Client_tmp2 = Client_tmp.replace('Client=','');
var Client_tmp3 = Client_tmp2.replace('t=','');
var Client_arr = Client_tmp3.split(":");
var Client = Client_arr[0];
var User_tmp = js_arr[1];
var User = User_tmp.replace('User=','');
var AuthorizedUser_tmp = js_arr[2];
var AuthorizedUser = AuthorizedUser_tmp.replace('AuthorizedUser=','');
var ResourceGroup_tmp = js_arr[3];
var ResourceGroup = ResourceGroup_tmp.replace('ResourceGroup=','');
var Catalog_tmp = js_arr[4];
var Catalog = Catalog_tmp.replace('Catalog=','');
var Db_tmp = js_arr[5];
var Db = Db_tmp.replace('Db=','');
var State_tmp = js_arr[6];
var State = State_tmp.replace('State=','');
var ErrorCode_tmp = js_arr[7];
var ErrorCode = ErrorCode_tmp.replace('ErrorCode=','');
var Time_tmp = js_arr[8];
var Time = Time_tmp.replace('Time=','');
var ScanBytes_tmp = js_arr[9];
var ScanBytes = ScanBytes_tmp.replace('ScanBytes=','');
var ScanRows_tmp = js_arr[10];
var ScanRows = ScanRows_tmp.replace('ScanRows=','');
var ReturnRows_tmp = js_arr[11];
var ReturnRows = ReturnRows_tmp.replace('ReturnRows=','');
var StmtId_tmp = js_arr[len-6];
var StmtId = StmtId_tmp.replace('StmtId=','');
var QueryId_tmp = js_arr[len-5];
var QueryId = QueryId_tmp.replace('QueryId=','');
var IsQuery_tmp = js_arr[len-4];
var IsQuery = IsQuery_tmp.replace('IsQuery=','');
var feIp_tmp = js_arr[len-3];
var feIp = feIp_tmp.replace('feIp=','');
var Stmt_tmp = js_arr[len-2];
var Stmt = Stmt_tmp.replace('Stmt=','');
var Stmt = Stmt.substring(0,65530);
var Digest_tmp = js_arr[len-1];
var Digest = Digest_tmp.replace('Digest=','');
}
event.Put("query_type",query_type);
event.Put("execution_time",slow_time);
event.Put("Client",Client);
event.Put("User",User);
event.Put("AuthorizedUser",AuthorizedUser);
event.Put("ResourceGroup","ResourceGroup");
event.Put("Catalog","Catalog");
event.Put("Db",Db);
event.Put("State",State);
event.Put("ErrorCode","ErrorCode");
event.Put("Time",Time);
event.Put("ScanBytes",ScanBytes);
event.Put("ScanRows",ScanRows);
event.Put("ReturnRows",ReturnRows);
event.Put("CpuCostNs",CpuCostNs);
event.Put("MemCostBytes",MemCostBytes);
event.Put("StmtId",StmtId);
event.Put("QueryId",QueryId);
event.Put("IsQuery",IsQuery);
event.Put("feIp",feIp);
event.Put("Stmt",Stmt);
event.Put("Digest",Digest);
event.Put("PlanCpuCost",PlanCpuCost);
event.Put("PlanMemCost",PlanMemCost);
event.Put("igid",'这里写fe节点的自定义标识');
}
- drop_fields:
fields: ["ecs","agent","message","log","host"]
output.kafka:
enabled: true
hosts: ["这里写KafkaBrokerList"]
topic: '%{[fields][log_topics]}'
worker: 1
timeout: 30s
broker_timeout: 10s
keep_alive: 0
compression: gzip
required_acks: 1
client_id: 这里写个自定义的client_id标识
启动filebeat进程,配置文件名: filebeat.yml
filebeat -c filebeat.yml -e >> filebeat.log 2>&1 &
检查Kafka内的日志数据是否正常,特别是要注意kv的切分是否匹配
在StarRocks中建日志存储表,sr可以是收集的来源sr也可以是另外的sr
欢迎大家来到IT世界,在知识的湖畔探索吧!CREATE TABLE 库名.`表名` (
`execution_time` datetime COMMENT "slow time",
`igid` varchar(40) COMMENT "",
`db_name` varchar(300) COMMENT "db name",
`fe_ip` varchar(30) COMMENT "fe ip",
`query_id` varchar(200) COMMENT "QueryId",
`query_type` varchar(10) COMMENT "query_type",
`time` bigint(20) COMMENT "SQL run time",
`client` varchar(30) COMMENT "client ip",
`user` varchar(200) COMMENT "user name",
`authorizedUser` varchar(200) COMMENT "AuthorizedUser",
`resourceGroup` varchar(50) COMMENT "ResourceGroup",
`catalog` varchar(60) COMMENT "Catalog",
`errorCode` varchar(60) COMMENT "ErrorCode",
`state` varchar(200) COMMENT "State",
`scan_bytes` bigint(20) COMMENT "ScanBytes",
`scan_rows` bigint(20) COMMENT "ScanRows",
`return_rows` bigint(20) COMMENT "ReturnRows",
`cpu_cost_ns` bigint(20) COMMENT "CpuCostNs",
`mem_cost_bytes` bigint(20) COMMENT "MemCostBytes",
`PlanCpuCost` decimal(20,6) COMMENT "PlanCpuCost",
`PlanMemCost` decimal(20,6) COMMENT "PlanMemCost",
`stmt_id` bigint(20) COMMENT "StmtId",
`is_query` varchar(50) COMMENT "IsQuery",
`stmt` varchar(65533) COMMENT "Stmt,Query Detail",
`digest` varchar(50) default null comment 'SQL指纹 Digest'
) ENGINE=OLAP
DUPLICATE KEY(`execution_time`, `igid`, `db_name`, `fe_ip`, `query_id`)
COMMENT "Starrocks audit表"
PARTITION BY RANGE(`execution_time`)
(
PARTITION P20230209 VALUES [('2023-02-09 00:00:00'), ('2023-02-10 00:00:00')),
PARTITION P20230210 VALUES [('2023-02-10 00:00:00'), ('2023-02-11 00:00:00')))
DISTRIBUTED BY HASH(`igid`) BUCKETS 24
PROPERTIES (
"replication_num" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "P"
);
创建routine load,实时消费Kafka的日志数据
CREATE ROUTINE LOAD 上文创建的库名.routineLoad名 ON
上文创建的表名 columns(execution_time,igid,db_name,fe_ip,query_id,time,client,user,authorizedUser,resourceGroup,catalog,errorCode,state,scan_bytes,scan_rows,return_rows,cpu_cost_ns,mem_cost_bytes,PlanCpuCost,PlanMemCost,stmt_id,is_query,stmt,query_type,digest)
PROPERTIES ("format"="json","jsonpaths"="[\"$.execution_time\",\"$.igid\",\"$.Db\",\"$.feIp\",\"$.QueryId\",\"$.Time\",\"$.Client\",\"$.User\",\"$.AuthorizedUser\",\"$.ResourceGroup\",\"$.Catalog\",\"$.ErrorCode\",\"$.State\",\"$.ScanBytes\",\"$.ScanRows\",\"$.ReturnRows\",\"$.CpuCostNs\",\"$.MemCostBytes\",\"$.PlanCpuCost\",\"$.PlanMemCost\",\"$.StmtId\",\"$.IsQuery\",\"$.Stmt\",\"$.query_type\",\"$.Digest\"]","desired_concurrent_number"="8","max_error_number" = "9999999999","max_batch_rows"="200000","max_batch_size" = "209715200","max_batch_interval" = "10","strict_mode" = "false" )
FROM KAFKA("kafka_broker_list"= "上文配置的Kafka BrokerList", "kafka_topic" = "上文定义的Kafka Topic名称", "property.kafka_default_offsets" = "OFFSET_END", "property.client.id" = "上文自定义的client_id标识", "property.group.id" = "自定义个groupid标识");
查看表内的数据,如果不满足要排查问题出在哪里
在其他fe node进行同样的filebeat收集
结语
日志流入表后,可以在此基础上开发生成监控分析图表,进行一些聚合统计分析。
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://itzsg.com/18661.html