了解一下active mq - jms consumer

前言

公司的网管系统经常会发告警信息到active mq,然后就想整个web界面借助jms consumer和webSocket滚动显示告警信息。

百度了一下,发现大部分的都是业务场景只会在程序初始化的时候和active mq建立连接,然后开个监听的线程。我的业务场景稍微复杂一下就用户可以在web界面输入不同的broker url来看不同的服务器上的producer发来的信息。

所以就必须用户提交表单 -> 建立连接 -> 根据用户ID存取

依赖(springboot中添加如下依赖)

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

Java代码实现(只有consumer实现,因为只用到了consumer)

Jms对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.lucent.demo.domain;

/**
* @author zhangxingrui
* @create 2018-11-01 13:38
**/
public class Jms {

private String userId;

private String brokerUrl;

private String username;

private String password;

private String[] topics;

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getBrokerUrl() {
return brokerUrl;
}

public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String[] getTopics() {
return topics;
}

public void setTopics(String[] topics) {
this.topics = topics;
}

}

JmsUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.lucent.demo.utils;

import com.lucent.demo.component.JmsConsumer;
import com.lucent.demo.domain.Jms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author zhangxingrui
* @create 2018-11-08 12:40
**/
@Component
public class JmsUtil {

private static final Logger logger = LoggerFactory.getLogger(JmsUtil.class);

private static Map<String, JmsConsumer> map = new ConcurrentHashMap<>();

public static JmsConsumer getJmsCustomer(Jms jms){
if(map.get(jms.getUserId()) != null){
return map.get(jms.getUserId());
}

JmsConsumer consumer = new JmsConsumer(jms.getUsername(),
jms.getPassword(), jms.getBrokerUrl());
consumer.init(jms.getUserId());
map.put(jms.getUserId(), consumer);
return consumer;
}

}

JmsConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.lucent.demo.component;

import com.alibaba.fastjson.JSON;
import com.lucent.demo.config.GeneralConstant;
import com.lucent.demo.config.MessageTypeEnum;
import com.lucent.demo.exception.MyException;
import com.lucent.demo.server.WebSocketServer;
import com.lucent.demo.utils.HandleUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;
import java.util.HashMap;
import java.util.Map;

/**
* @author zhangxingrui
* @create 2018-11-07 16:58
**/
public class JmsConsumer {

private static final Logger logger = LoggerFactory.getLogger(JmsConsumer.class);

private String username;

private String password;

private String brokerUrl;

private ConnectionFactory connectionFactory;

private Connection connection;

private Session session;

public JmsConsumer(String username, String password, String brokerUrl) {
this.username = username;
this.password = password;
this.brokerUrl = brokerUrl;
}

public void init(String userId){
try {
connectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
} catch (JMSException e) {
logger.error(e.getMessage() + "\n" + e.getCause());
Map<String, String> map = new HashMap<>();
map.put("messageType", MessageTypeEnum.ERROR.getIndex());
map.put("error", "连接active mq失败,请检查url | username | password等");
try {
WebSocketServer.sendMessageToOne(JSON.toJSONString(map), userId);
} catch (MyException e1) {
logger.error(e1.getMessage() + "\n" + e1.getCause());
}
}
}

public void getMessageWithTopicListener(String topicName, String userId){
try {
Topic topic = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(topic);

consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
Map<String, String> map = new HashMap<>();
try {
if(msg==null || HandleUtil.isEmpty(msg.getText())) {
return;
}

if(msg.getText().contains(GeneralConstant.NT_HEARTBEAT)){
return;
}

logger.info(msg.getText());
map.put("messageType", MessageTypeEnum.CUSTOMER.getIndex());
map.put("xml", msg.getText());
map.put("topicName", topicName);
WebSocketServer.sendMessageToOne(JSON.toJSONString(map), userId);
} catch (JMSException | MyException e) {
logger.error(e.getMessage() + "\n" + e.getCause());
try {
consumer.close();
} catch (JMSException e1) {
logger.error(e1.getMessage() + "\n" + e1.getCause());
}
}
}

});
} catch (JMSException e) {
logger.error(e.getMessage() + "\n" + e.getCause());
}
}

public void getMessageWithTopicsListener(String[] topicNames, String userId){
if(topicNames == null || topicNames.length == 0){
return;
}

for (String topicName : topicNames) {
getMessageWithTopicListener(topicName, userId);
}
}

}

参考文章

Java消息队列–ActiveMq 初体验

springboot与ActiveMQ整合

zhangxingrui wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!