关于FM功能模块及相关实现介绍

前言


本博文仅供个人总结工作,没有任何参考价值,无需多看。

项目需求


大概三周之前,leader给了我一个模块去实现,该模块主要是采集公司网管系统上的告警(alarm)以及监听告警,从而对外提供接口。

项目背景


  • 网管提供Restful接口用于身份认证(getToken)、刷新认证(refreshToken)、查询alarm list以及单个alarm的详细信息。
  • 网管通过kafka producer向kafka server中指定topic发送message,要求我们在项目中建立kafka consumer来接收指定topic的message。

需求分析


告警采集流程

相关技术


1
Kakfa、Restful、HttpClient、Multithreading、LinkedBlockingQueue

实际应用


connect kafka server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Calendar calendar = Calendar.getInstance();
TimeZone timeZone = TimeZone.getTimeZone("GMT+08:00");
calendar.setTimeZone(timeZone);

Properties props = new Properties();
props.put("bootstrap.servers", otnIp + ":9092");
props.put("group.id", calendar.getTimeInMillis()/1000 + "");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections
.singletonList(RestfulConstant.CORE.getTopicId()));

new Thread(new Runnable() {
@Override
public void run() {
pollMessage();
}
}).start();

poll message from kafka server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void pollMessage(){
log.debug("poll message from kafka server");
NbiQueue queue = new NbiQueue<>("handleMessage", new IGEventProcessor(){
@Override
public void process(Object event) {
handleMessage((ConsumerRecords<String, String>)event);
}
});

while (true){
ConsumerRecords<String, String> records = consumer.poll(100);
if(records.isEmpty() || records.count() == 0)
continue;
queue.push(records);
}
}

注意事项


由于每次启动项目都会创建一个新的Topic,可能造成服务器端不必要的内存开销,所以如果项目开启了FM模块,每次启动项目想要关闭的时候,需要在console按回车键关闭,Jvm会在关闭之前执行钩子函数做一些清理工作。

亟待优化


  1. kafka consumer的容错机制:当kafka的网络中断,kafka consumer连接的topic被删除掉之后如何recover。
  2. 多线程下的并发处理待检验。
zhangxingrui wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!