activemq自定义认证和状态监控

有两种不联系:一种是忘记了,一种是放在回忆里

Posted by yishuifengxiao on 2020-10-13

一 自定义认证

1.1 添加依赖

在项目里添加以下以下依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-broker</artifactId>
        <version>5.15.6</version>
    </dependency>

注意:依赖的版本一定要与部署的activemq版本保持一致

1.2 创建认证拦截器

1
2
3
4
5
6
7
8
9
10
11
package com.yishuifengxiao.study.activemq_plugin;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;

public class AuthPlugin implements BrokerPlugin {
@Override
public Broker installPlugin(Broker broker) throws Exception {
return new AuthBroker(broker);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.yishuifengxiao.study.activemq_plugin;

import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.HashSet;
import java.util.Set;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.security.AbstractAuthenticationBroker;
import org.apache.activemq.security.SecurityContext;

public class AuthBroker extends AbstractAuthenticationBroker {

public AuthBroker(Broker next) {
super(next);
}

@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

System.out.println("-----------------------------------------> ConnectionInfo = " + info);

SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) {
securityContext = authenticate(info.getUserName(), info.getPassword(), null);
context.setSecurityContext(securityContext);
securityContexts.add(securityContext);
}

try {
super.addConnection(context, info);
} catch (Exception e) {
securityContexts.remove(securityContext);
context.setSecurityContext(null);
throw e;
}
}

/**
* 在这里执行用户认证逻辑
*/
@Override
public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates)
throws SecurityException {

System.out.println("-----------------------------------------> authenticate username= " + username
+ " password= " + password);
SecurityContext securityContext = new SecurityContext(username) {
@Override
public Set<Principal> getPrincipals() {
Set<Principal> groups = new HashSet<Principal>();

groups.add(new Principal() {

@Override
public String getName() {
// TODO Auto-generated method stub
return username;
}
});// 默认加入了users的组
return groups;
}
};

return securityContext;
}

@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {
super.removeDestination(context, destination, timeout);

System.out.println("removeDestination -----------------------------------------> destination =" + destination);
}

@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
System.out.println("removeConnection -----------------------------------------> info =" + info
+ " error= " + error);
}

}

1.3 打包部署

将上述代码导出为jar包,拷贝到ActiveMq安装目录的lib文件夹中

1.4 修改activemq.xml

  1. activemq.xml文件的broker节点下加入自定义的插件配置

    1
    2
    3
    4
    5
    6
    7
    8
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

    <!-- 插件配置 -->
    <plugins>
    <bean xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin"></bean>
    </plugins>

    </broker>

配置成功后,重启ActiveMq

1.5 观察结果

启动activemq,然后使用 客户端连接activemq,并在连接后执行断开操作,可以看到如下的输出结果。

1
2
3
jvm 1    | -----------------------------------------> ConnectionInfo = ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false}
jvm 1 | -----------------------------------------> authenticate username= aaaa password= 12344
jvm 1 | removeConnection -----------------------------------------> info =ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} error= null

二 进阶使用

通过使用 JdbcTemplate访问数据库。

2.1 添加依赖

在项目依赖里加上 spring-jdbc 依赖

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
        <version>4.2.6.RELEASE</version>
    </dependency>

注意此spring-jdbc的版本与 activemq里使用的spring的版本保持一致。

2.2 修改代码

修改后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.yishuifengxiao.study.activemq_plugin;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.springframework.jdbc.core.JdbcTemplate;

public class AuthPlugin implements BrokerPlugin {

private JdbcTemplate jdbcTemplate;

public AuthPlugin(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}

@Override
public Broker installPlugin(Broker broker) throws Exception {
return new AuthBroker(broker, this.jdbcTemplate);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.yishuifengxiao.study.activemq_plugin;

import java.security.Principal;
import java.security.cert.X509Certificate;
import java.util.HashSet;
import java.util.Set;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.security.AbstractAuthenticationBroker;
import org.apache.activemq.security.SecurityContext;
import org.springframework.jdbc.core.JdbcTemplate;

public class AuthBroker extends AbstractAuthenticationBroker {
private JdbcTemplate jdbcTemplate;

public AuthBroker(Broker next, JdbcTemplate jdbcTemplate) {
super(next);
this.jdbcTemplate = jdbcTemplate;
System.out.println("-----------------------------------------> constructor = " + jdbcTemplate);
}

@Override
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {

System.out.println("-----------------------------------------> ConnectionInfo = " + info);

SecurityContext securityContext = context.getSecurityContext();
if (securityContext == null) {
securityContext = authenticate(info.getUserName(), info.getPassword(), null);
context.setSecurityContext(securityContext);
securityContexts.add(securityContext);
}

try {
super.addConnection(context, info);
} catch (Exception e) {
securityContexts.remove(securityContext);
context.setSecurityContext(null);
throw e;
}
}

/**
* 在这里执行用户认证逻辑
*/
@Override
public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates)
throws SecurityException {

System.out.println("-----------------------------------------> authenticate username= " + username
+ " password= " + password);
SecurityContext securityContext = new SecurityContext(username) {
@Override
public Set<Principal> getPrincipals() {
Set<Principal> groups = new HashSet<Principal>();

groups.add(new Principal() {

@Override
public String getName() {
// TODO Auto-generated method stub
return username;
}
});// 默认加入了users的组
return groups;
}
};

return securityContext;
}

@Override
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout)
throws Exception {
super.removeDestination(context, destination, timeout);

System.out.println("removeDestination -----------------------------------------> destination =" + destination);
}

@Override
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
System.out.println("removeDestination -----------------------------------------> info =" + info
+ " error= " + error);
}

}

2.3 添加jar到activemq中

1.将mysql-connector-java-xxx.jar复制到activemq\lib目录下

2.数据库操作采用spring-jdbc的方式,需要将spring-jdbc-xxx.RELEASE.jar复制到activemq\lib\optional目录下(spring-jdbc的版本应与lib\optional其他的spring相同)

2.4 修改activemq.xml

1 先在 beans节点下增加以下内容 (beans为根节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- mysql数据库数据源-->  
<bean id="mySqlDataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
</bean>
<!-- 增加jdbcTemplate-->
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" abstract="false"
lazy-init="false" autowire="default" >
<property name="dataSource">
<ref bean="mySqlDataSource" />
</property>
</bean>

2 修改 PropertyPlaceholderConfigurer配置,修改后的内容如下:

1
2
3
4
5
6
7
8
9
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>file:${activemq.conf}/credentials.properties</value>
<!--这一行是新增的-->
<value>file:${activemq.conf}/db.properties</value>
</list>
</property>
</bean>

3 将2.4中修改的broker节点中的plugins修改为以下内容

1
2
3
4
5
6
7
8
<plugins>
<bean
xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin">
<constructor-arg>
<ref bean="jdbcTemplate"/>
</constructor-arg>
</bean>
</plugins>

4 在activemq\conf\目录下加入db.properties文件

该文件的内容如下:

1
2
3
4
jdbc.driverClassName=com.mysql.jdbc.Driver
jdbc.url=jdbc:mysql://192.168.195.130:3306/demo?autoReconnect=true&useUnicode=true&characterEncoding=utf8
jdbc.username=root
jdbc.password=123456ss

完成上述修改后,重新将代码打包成jar中,部署到activemq中,然后重启即可。

三 连接状态监控

通过订阅ActiveMQ.Advisory.Connection 就能获取到断开与超时

3.1 配置activeMQ.xml

activeMQ.xmldestinationPolicy节点上配置

1
2
3
4
5
6
7
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true"> </policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

打开Advisories , 默认Advisory的功能是关闭的

3.2 编写监听代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package activemqClient.activemqClient;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.RemoveInfo;

public class ConnectionTest {

public static void main(String[] args) throws JMSException {
ConnectionFactory factory = new ActiveMQConnectionFactory("manage","123456","tcp://localhost:61616");
Connection connection = factory.createConnection();
connection.start();
final Session session = connection.createSession(false/*支持事务*/, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic test = AdvisorySupport.getConnectionAdvisoryTopic();
MessageConsumer consumer = session.createConsumer(test);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof ActiveMQMessage) {
ActiveMQMessage msg = (ActiveMQMessage) message;
System.out.println(msg);
if(msg.getDataStructure() instanceof ConnectionInfo) {
ConnectionInfo info = (ConnectionInfo) msg.getDataStructure();
System.out.println(info.getUserName() +"建立连接:"+info.getConnectionId());
}else if(msg.getDataStructure() instanceof RemoveInfo ) {
RemoveInfo info = (RemoveInfo) msg.getDataStructure();
System.out.println("断开连接:"+info.getObjectId());
}

}

}
});
}

}

客户端一建立连接首先的请求就是 tcp://ActiveMQ.Advisory ,所以我们的建立连接以及 订阅或者发布都是tcp://XXX 形式来所以我们的 ActiveMQ.Advisory.Connection topic 也是可以订阅的

参考资料

1
2
3
4
5
6
7
https://blog.csdn.net/SpbDev/article/details/106520573

https://blog.csdn.net/weixin_33971977/article/details/92832351

https://www.itcto.cn/mqtt/connectauth/

https://www.cnblogs.com/huangzhex/p/6339761.html (重要)