时间:2021-05-02
**背景**日志系统接入的日志种类多、格式复杂多样,主流的有以下几种日志:- Filebeat采集到的文本日志,格式多样- Winbeat采集到的操作系统日志- 设备上报到Logstash的syslog日志- 接入到Kafka的业务日志以上通过各种渠道接入的日志,存在2个主要的问题:- 格式不统一、不规范、标准化不够- 如何从各类日志中提取出用户关心的指标,挖掘更多的业务价值为了解决上面2个问题,我们基于Flink和Drools规则引擎做了实时的日志处理服务。**系统架构**架构比较简单,架构图如下: 各类日志都是通过Kafka汇总,做日志中转。Flink消费Kafka的数据,同时通过API调用拉取Drools规则引擎,对日志做解析处理后,将解析后的数据存储到Elasticsearch中,用于日志的搜索和分析等业务。为了监控日志解析的实时状态,Flink会将日志处理的统计数据,如每分钟处理的日志量,每种日志从各个机器IP来的日志量写到Redis中,用于监控统计。**模块介绍**系统项目命名为Eagle。eagle-api:基于Spring Boot,作为Drools规则引擎的写入和读取API服务。eagle-common:通用类模块。eagle-log:基于Flink的日志处理服务。重点讲一下eagle-log:**对接kafka、ES和Redis**对接Kafka和ES都比较简单,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),详见代码。对接Redis,最开始用的是org.apache.bahir提供的redis connector,后来发现灵活度不够,就使用了Jedis。在将统计数据写入redis的时候,最开始用的keyby分组后缓存了分组数据,在sink中做统计处理后写入,参考代码如下:``` String name = "redis-agg-log"; DataStream>> keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .process(new ProcessWindowFunction>, String, TimeWindow>() { @Override public void process(String s, Context context, Iterable iterable, Collector>> collector) { ArrayList logs = Lists.newArrayList(iterable); if (logs.size() > 0) { collector.collect(new Tuple2(s, logs)); } } }).setParallelism(redisSinkParallelism).name(name).uid(name);``` 后来发现这样做对内存消耗比较大,其实不需要缓存整个分组的原始数据,只需要一个统计数据就OK了,优化后:``` String name = "redis-agg-log"; DataStream keyedStream = dataSource.keyBy((KeySelector) log -> log.getIndex()) .timeWindow(Time.seconds(windowTime)) .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime)) .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction()) .setParallelism(redisSinkParallelism).name(name).uid(name);``` 这里使用了Flink的聚合函数和Accumulator,通过Flink的agg操作做统计,减轻了内存消耗的压力。**使用Broadcast广播Drools规则引擎**1、Drools规则流通过broadcast map state广播出去。2、Kafka的数据流connect规则流处理日志。``` //广播规则流env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1) .broadcast(ruleStateDescriptor);//Kafka数据流FlinkKafkaConsumer010 source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);//数据流connect规则流处理日志BroadcastConnectedStream connectedStreams = dataSource.connect(ruleSource);connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);``` 具体细节参考开源代码。**小结**本系统提供了一个基于Flink的实时数据处理参考,对接了Kafka、Redis和Elasticsearch,通过可配置的Drools规则引擎,将数据处理逻辑配置化和动态化。对于处理后的数据,也可以对接到其他sink,为其他各类业务平台提供数据的解析、清洗和标准化服务。> 【云栖号在线课堂】每天都有产品技术专家分享!> 课程地址:https://yqh.aliyun.com/live> 立即加入社群,与专家面对面,及时了解课程最新动态!> 【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK原文发布时间:2020-07-09本文作者: aoxiang本文来自:“”,了解相关信息可以关注“dockone”
声明:本页内容来源网络,仅供用户参考;我单位不保证亦不表示资料全面及准确无误,也不保证亦不表示这些资料为最新信息,如因任何原因,本网内容或者用户因倚赖本网内容造成任何损失或损害,我单位将不会负任何法律责任。如涉及版权问题,请提交至online#300.cn邮箱联系删除。
本文实例为大家分享了python定时提取实时日志的具体代码,供大家参考,具体内容如下这是一个定时读取实时日志文件的程序。目标文件是target_file.它是应
基于邮件通知的服务监控和告警系统主要功能点:配置专用日志格式记录耗时日志格式:'simple':{'format':'%(asctimme)s%(message
大致思路:1.利用tornado提供的websocket功能与浏览器建立长连接,读取实时日志并输出到浏览器2.写一个实时读取日志的脚本,利用saltstack远
Sherlock.IO是eBay现有的监控平台,每天要处理上百亿条日志、事件和指标。FlinkStreamingjob实时处理系统用于处理其中的日志和事件。图片
使用Xshell查看tomcat实时日志时,出现中文乱码,但是日志文件里的中文不是乱码的,可能有两个原因,可能系统的i18n没设置支持中文,或是Xshell的编