一 自定义认证
1.1 添加依赖
在项目里添加以下以下依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>5.15.6</version>
</dependency>
注意:依赖的版本一定要与部署的activemq版本保持一致
1.2 创建认证拦截器
package com.yishuifengxiao.study.activemq_plugin;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin;
public class AuthPlugin implements BrokerPlugin { @Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker); } }
|
package com.yishuifengxiao.study.activemq_plugin;
import java.security.Principal; import java.security.cert.X509Certificate; import java.util.HashSet; import java.util.Set;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.security.AbstractAuthenticationBroker; import org.apache.activemq.security.SecurityContext;
public class AuthBroker extends AbstractAuthenticationBroker {
public AuthBroker(Broker next) { super(next); }
@Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
System.out.println("-----------------------------------------> ConnectionInfo = " + info);
SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); }
try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } }
@Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
System.out.println("-----------------------------------------> authenticate username= " + username + " password= " + password); SecurityContext securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>();
groups.add(new Principal() {
@Override public String getName() { return username; } }); return groups; } };
return securityContext; }
@Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout);
System.out.println("removeDestination -----------------------------------------> destination =" + destination); }
@Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); System.out.println("removeConnection -----------------------------------------> info =" + info + " error= " + error); }
}
|
1.3 打包部署
将上述代码导出为jar
包,拷贝到ActiveMq
安装目录的lib
文件夹中
1.4 修改activemq.xml
-
在activemq.xml
文件的broker
节点下加入自定义的插件配置
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<plugins> <bean xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin"></bean> </plugins>
</broker>
|
配置成功后,重启ActiveMq
1.5 观察结果
启动activemq,然后使用 客户端连接activemq,并在连接后执行断开操作,可以看到如下的输出结果。
jvm 1 | -----------------------------------------> ConnectionInfo = ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} jvm 1 | -----------------------------------------> authenticate username= aaaa password= 12344 jvm 1 | removeConnection -----------------------------------------> info =ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:DESKTOP-J2Q1CEL-24338-1602505390192-3:1, clientId = 9edd1383-3ffb-44da-9beb-137ca80135841602505402257, clientIp = tcp://127.0.0.1:24381, userName = aaaa, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} error= null
|
二 进阶使用
通过使用 JdbcTemplate
访问数据库。
2.1 添加依赖
在项目依赖里加上 spring-jdbc
依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>4.2.6.RELEASE</version>
</dependency>
注意此spring-jdbc
的版本与 activemq里使用的spring
的版本保持一致。
2.2 修改代码
修改后的代码如下:
package com.yishuifengxiao.study.activemq_plugin;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerPlugin; import org.springframework.jdbc.core.JdbcTemplate;
public class AuthPlugin implements BrokerPlugin {
private JdbcTemplate jdbcTemplate;
public AuthPlugin(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; }
@Override public Broker installPlugin(Broker broker) throws Exception { return new AuthBroker(broker, this.jdbcTemplate); } }
|
package com.yishuifengxiao.study.activemq_plugin;
import java.security.Principal; import java.security.cert.X509Certificate; import java.util.HashSet; import java.util.Set;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.security.AbstractAuthenticationBroker; import org.apache.activemq.security.SecurityContext; import org.springframework.jdbc.core.JdbcTemplate;
public class AuthBroker extends AbstractAuthenticationBroker { private JdbcTemplate jdbcTemplate;
public AuthBroker(Broker next, JdbcTemplate jdbcTemplate) { super(next); this.jdbcTemplate = jdbcTemplate; System.out.println("-----------------------------------------> constructor = " + jdbcTemplate); }
@Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
System.out.println("-----------------------------------------> ConnectionInfo = " + info);
SecurityContext securityContext = context.getSecurityContext(); if (securityContext == null) { securityContext = authenticate(info.getUserName(), info.getPassword(), null); context.setSecurityContext(securityContext); securityContexts.add(securityContext); }
try { super.addConnection(context, info); } catch (Exception e) { securityContexts.remove(securityContext); context.setSecurityContext(null); throw e; } }
@Override public SecurityContext authenticate(String username, String password, X509Certificate[] peerCertificates) throws SecurityException {
System.out.println("-----------------------------------------> authenticate username= " + username + " password= " + password); SecurityContext securityContext = new SecurityContext(username) { @Override public Set<Principal> getPrincipals() { Set<Principal> groups = new HashSet<Principal>();
groups.add(new Principal() {
@Override public String getName() { return username; } }); return groups; } };
return securityContext; }
@Override public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception { super.removeDestination(context, destination, timeout);
System.out.println("removeDestination -----------------------------------------> destination =" + destination); }
@Override public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { super.removeConnection(context, info, error); System.out.println("removeDestination -----------------------------------------> info =" + info + " error= " + error); }
}
|
2.3 添加jar
到activemq中
1.将mysql-connector-java-xxx.jar
复制到activemq\lib
目录下
2.数据库操作采用spring-jdbc
的方式,需要将spring-jdbc-xxx.RELEASE.jar
复制到activemq\lib\optional
目录下(spring-jdbc的版本应与lib\optional其他的spring相同)
2.4 修改activemq.xml
1 先在 beans
节点下增加以下内容 (beans
为根节点)
<bean id="mySqlDataSource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="${jdbc.driverClassName}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate" abstract="false" lazy-init="false" autowire="default" > <property name="dataSource"> <ref bean="mySqlDataSource" /> </property> </bean>
|
2 修改 PropertyPlaceholderConfigurer
配置,修改后的内容如下:
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>file:${activemq.conf}/credentials.properties</value> <value>file:${activemq.conf}/db.properties</value> </list> </property> </bean>
|
3 将2.4中修改的broker
节点中的plugins
修改为以下内容
<plugins> <bean xmlns="http://www.springframework.org/schema/beans" id="AuthPlugin" class="com.yishuifengxiao.study.activemq_plugin.AuthPlugin"> <constructor-arg> <ref bean="jdbcTemplate"/> </constructor-arg> </bean> </plugins>
|
4 在activemq\conf\
目录下加入db.properties
文件
该文件的内容如下:
jdbc.driverClassName=com.mysql.jdbc.Driver jdbc.url=jdbc:mysql://192.168.195.130:3306/demo?autoReconnect=true&useUnicode=true&characterEncoding=utf8 jdbc.username=root jdbc.password=123456ss
|
完成上述修改后,重新将代码打包成jar
中,部署到activemq中,然后重启即可。
三 连接状态监控
通过订阅ActiveMQ.Advisory.Connection
就能获取到断开与超时
3.1 配置activeMQ.xml
在activeMQ.xml
的destinationPolicy
节点上配置
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
|
打开Advisories , 默认Advisory的功能是关闭的
3.2 编写监听代码
package activemqClient.activemqClient;
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.RemoveInfo;
public class ConnectionTest { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("manage","123456","tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ActiveMQTopic test = AdvisorySupport.getConnectionAdvisoryTopic(); MessageConsumer consumer = session.createConsumer(test); consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof ActiveMQMessage) { ActiveMQMessage msg = (ActiveMQMessage) message; System.out.println(msg); if(msg.getDataStructure() instanceof ConnectionInfo) { ConnectionInfo info = (ConnectionInfo) msg.getDataStructure(); System.out.println(info.getUserName() +"建立连接:"+info.getConnectionId()); }else if(msg.getDataStructure() instanceof RemoveInfo ) { RemoveInfo info = (RemoveInfo) msg.getDataStructure(); System.out.println("断开连接:"+info.getObjectId()); } } } }); }
}
|
客户端一建立连接首先的请求就是 tcp://ActiveMQ.Advisory
,所以我们的建立连接以及 订阅或者发布都是**tcp://XXX
形式来所以我们的** ActiveMQ.Advisory.Connection topic
也是可以订阅的
参考资料