使用 AspectJ 和 JMX 进行消息流跟踪

工程 | Ben Hale | 2006年4月25日 | ...

在一个我曾经参与的项目中,我们有一个系统可以接收来自设备的消息,并决定是否将这些信息传递给用户。有多个决策级别,我们总是发现自己遇到的问题之一是,消息在系统中传递的过程中是否丢失了。

在我们转向 Spring 之前,几乎不可能回答这个问题。曾尝试使用日志记录,但由于决策所涉及的消息量巨大,使得这种方法最多只能算作乏味。还尝试过使用调试器,但由于消息量和时序变化的组合,导致仅能间歇性地取得成功。

不幸的是,在我能够实施更合适的解决方案之前离开了项目,但如果我当时做了,它可能看起来像这样。最后,我将讨论在这种工作中可能有用的一些扩展。

首先,我们有一组接口及其实现


package flowtracingexample;

public interface Component1 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent1 implements Component1 {
	
	private Component2 child;

	private Random r = new Random();
	
	public DefaultComponent1(Component2 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component2 {

	void forwardCall();

}

package flowtracingexample;

import java.util.Random;

public class DefaultComponent2 implements Component2 {
	
	private Component3 child;

	private Random r = new Random();
	
	public DefaultComponent2(Component3 child) {
		this.child = child;
	}

	public void forwardCall() {
		if (r.nextBoolean()) {
			child.forwardCall();
		}
	}

}

package flowtracingexample;

public interface Component3 {

	void forwardCall();

}

package flowtracingexample;

public class DefaultComponent3 implements Component3 {

	public void forwardCall() {
	}

}

这是一个非常简单的例子,但要点是,使用fowardCall()方法,消息有 50% 的概率传递到下一个子组件(在本例中按数字顺序递增)。请注意,这些 POJO 中没有涉及跟踪的逻辑。

为了实现我们的跟踪行为,我们希望有一组计数器;每个组件一个。此外,我们希望能够重置计数器、启动和停止监控,以及确定是否正在进行监控。为此,我们实现了一个包含计数器的类。


package flowtracingexample;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;

@ManagedResource
public class FlowTracer {

	private long component1Count = 0;

	private long component2Count = 0;

	private long component3Count = 0;

	private boolean tracing = false;

	@ManagedAttribute
	public long getComponent1Count() {
		return this.component1Count;
	}

	@ManagedAttribute
	public long getComponent2Count() {
		return this.component2Count;
	}

	@ManagedAttribute
	public long getComponent3Count() {
		return this.component3Count;
	}

	@ManagedAttribute
	public boolean getTracing() {
		return this.tracing;
	}

	public void incrementComponent1Count() {
		if (this.tracing) {
			component1Count++;
		}
	}

	public void incrementComponent2Count() {
		if (this.tracing) {
			component2Count++;
		}
	}

	public void incrementComponent3Count() {
		if (tracing) {
			component3Count++;
		}
	}

	@ManagedOperation
	public void resetAllComponentCount() {
		resetComponent1Count();
		resetComponent2Count();
		resetComponent3Count();
	}

	@ManagedOperation
	public void resetComponent1Count() {
		this.component1Count = 0;
	}

	@ManagedOperation
	public void resetComponent2Count() {
		this.component2Count = 0;
	}

	@ManagedOperation
	public void resetComponent3Count() {
		this.component3Count = 0;
	}

	@ManagedOperation
	public void startTracing() {
		tracing = true;
	}

	@ManagedOperation
	public void stopTracing() {
		tracing = false;
	}
}

此类的方法及其内容非常简单明了。您可能不熟悉的是此类上的注释。这些注释由 Spring 的 JMX 支持使用,以便在每个 Bean 部署到 JMX MBeanServer 时自动构建 MBean 管理接口。

  • ManagedResource:声明此类应作为 JMX MBean 公开。
  • ManagedAttribute:声明此 getter/setter 表示的 JavaBean 属性应为 MBean 属性。如果您希望对该属性进行读写访问,则需要同时注释 getter 和 setter。
  • ManagedOperation:声明此方法应作为 MBean 操作公开。

最后,将所有内容连接起来。首先,我们将构成流的组件连接在一起。接下来,我们声明将在每个组件上放置跟踪器的方面。在本例中,我们使用非常棒的 AspectJ 切点语言。最后,我们设置 JMX 导出器以自动检测具有@ManagedResource注释的类的实例。


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/aop 
       http://www.springframework.org/schema/aop/spring-aop.xsd">

	<!-- Components -->
	<bean id="component3" class="flowtracingexample.DefaultComponent3" />

	<bean id="component2"
		class="flowtracingexample.DefaultComponent2">
		<constructor-arg ref="component3" />
	</bean>

	<bean id="component1"
		class="flowtracingexample.DefaultComponent1">
		<constructor-arg ref="component2" />
	</bean>

	<!-- Aspect -->
	<bean id="flowTracer" class="flowtracingexample.FlowTracer" />

	<aop:config>
		<aop:aspect id="component1Aspect" ref="flowTracer">
			<aop:before method="incrementComponent1Count"
				pointcut="execution(public void flowtracingexample.Component1.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component2Aspect" ref="flowTracer">
			<aop:before method="incrementComponent2Count"
				pointcut="execution(public void flowtracingexample.Component2.forwardCall())" />
		</aop:aspect>

		<aop:aspect id="component3Aspect" ref="flowTracer">
			<aop:before method="incrementComponent3Count"
				pointcut="execution(public void flowtracingexample.Component3.forwardCall())" />
		</aop:aspect>
	</aop:config>

	<!-- JMX -->
	<bean class="org.springframework.jmx.export.MBeanExporter">
		<property name="autodetectModeName" value="AUTODETECT_ALL" />
		<property name="assembler">
			<bean
				class="org.springframework.jmx.export.assembler.MetadataMBeanInfoAssembler">
				<property name="attributeSource">
					<bean
						class="org.springframework.jmx.export.annotation.AnnotationJmxAttributeSource" />
				</property>
			</bean>
		</property>
		<property name="namingStrategy">
			<bean
				class="org.springframework.jmx.export.naming.IdentityNamingStrategy" />
		</property>
	</bean>

</beans>

接下来我们需要做的是有一个驱动程序类。在本例中,驱动程序类只是在 750 毫秒以下的某个随机延迟下发送消息。


package flowtracingexample;

import java.io.IOException;
import java.util.Random;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class FlowTracingExample {

	public static void main(String[] args) throws InterruptedException,
			IOException {
		ApplicationContext ctx = new ClassPathXmlApplicationContext(
				"classpath:flowtracingexample/applicationContext.xml");

		Component1 comp = (Component1) ctx.getBean("component1");
		Random r = new Random();

		System.out.print("Ready...");
		 System.in.read();

		for (;;) {
			comp.forwardCall();
			Thread.sleep(r.nextInt(750));
		}
	}
}

在我的情况下,我将使用 Java VM Management 运行此应用程序,因为它为我提供了一个免费的 MBean 服务器(而且我喜欢漂亮的内存图表)。如果您没有听说过它,它是 Java 5 VM 中的一个系统属性,它导致 VM 使用 JMX 自我管理。它具有内存消耗、线程和数百万其他事物的 Bean。您可以通过在运行应用程序的命令行上添加-Dcom.sun.management.jmxremote来简单地启动它。在另一个巧妙的 Java 5 新增功能中,我将使用jconsole来显示我的结果。

根据我生疏的数学技能,从长远来看,我希望看到组件 1 被调用 100% 的时间,组件 2 被调用 50% 的时间,组件 3 被调用 25% 的时间。让我们看看

Tracing Screen Shot

很高兴看到我还记得我的概率知识。最棒的是,这仍然符合良好的设计原则。例如,没有任何组件知道任何关于跟踪的信息,因为这不是它们的功能。同样,此子系统的所有跟踪需求都包含在一个类中,并且只有一个实现满足 AOP 的 1:1 需求到实现的目标。最后,通过能够关闭跟踪,任何性能影响或多或少都被抵消了。我知道,我知道递增一个整数并不那么昂贵,但如果您的跟踪做了昂贵的事情,那么拥有它是很好的,而且您不必担心是否要将其发送到生产环境中;您只需禁用监控,直到您的客户致电寻求支持即可。

因此,图表当然很漂亮,而且如果您知道预期的百分比,甚至可能会告诉您一些信息,但您还可以做什么?如何查看最近 100 条消息及其决策?如何记录消息被丢弃的原因?如何关联丢弃决策和管道末端消息的缺失?如果知道消息丢失了(可能是由于线程问题),因为您从未故意丢弃它,但它在进入 500 毫秒内没有到达末端,那不是很好吗?沿着同一思路,如果从管道的一端到另一端所需的时间超过 250 毫秒,如何向管理员发送电子邮件?

跟踪/监控的可能性是无限的(并且是可插拔的!)。您将如何使用它?

当然,源代码

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部