March 19, 2024

Scheduled Message Consumer on ActiveMQ with Spring DMLC and Quartz

In this post I show how to schedule a task to start and stop a JMS MessageListener (Consumer) on ActiveMQ on a predefine date and time with Quartz Scheduler and Spring DefaultMessageListenerContainer (DMLC). From the other side (Producer) if you want to see how the delay and schedule message delivery works, I suggest the ActiveMQ scheduler built-in.

Stack

ActiveMQ 5.11.1
JDK 1.8
Maven 3.2
Spring 3.0.5.RELEASE
Quartz Scheduler 1.5.2

SOURCE CODE (/giuseu/activemq)

GIT
git clone https://gitlab.com/giuseppeurso-eu/activemq

 

Consumer

This is the MessageListener implementation. The onMessage() method is responsible for the consumption of messages. I create two methods startConsumption() and stopConsumption() to run start/stop on the DMLC Spring object. The instance of DMLC will be defined later via dependency-injection in the beans XML file.

public class Consumer implements MessageListener {
	private Object sampleListener;
	// setter injection via Spring...

	public void onMessage(Message message)
    {
	_log.info("Consuming message...");
       //something on message received here...
    }
    
	public void startConsumption(){
		_log.info("Starting the Consumer...");
		((DefaultMessageListenerContainer) getSampleListener()).start();
		_log.info("JOB STARTED!");
	}
	
	public void stopConsumption(){
		_log.debug("Stopping the Consumer... ");
		((DefaultMessageListenerContainer) getSampleListener()).stop();
		_log.info("JOB FINISHED!");
	}	
}

 

Producer

A simple class which make use of Spring JmsTemplate to publish a number of messages. Destinations[ ] could be useful in case of sending on multiple queues.

public class Producer {
	
	private static JmsTemplate template;
	private Destination[] destinations;
		
	public void sendMessage(Message message) {
                try {
			Destination destination = destinations[0];
			template.convertAndSend(destination, message);
			_log.debug("Published message with JMS Correlation ID: "+message.getJMSCorrelationID()+" on queue: "+destination.toString());
		} catch (JMSException e) {
			_log.error(e);
		}
		
	}

}

 

Spring XML configuration

Here is the Spring beans definition.Pay attention to the autoStartup property which is false in the sampleDMLC bean. I will use the Quartz Scheduer to define two jobs which will be responsible for starting and stopping of the message consumer. In order to define the job details I use MethodInvokingJobDetailFactoryBean and SchedulerFactoryBean. The two jobs have a different targetMethod to start/stop message consumption but the same targetObject which refers to the same consumer. The consumer bean includes a property sampleListener injected via Spring which creates an instance of DMLC.

In this example, I use a simple cron expression to start message consumption in a given time and stop it after 15 seconds.

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">

	<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="tcp://localhost:61616" />
		<property name="userName" value="amqsystem"/>
        <property name="password" value="12345"/>
        <!-- If you're using the embedded Broker uncomment this -->
        <!-- <property name="brokerURL" value="vm://localhost?broker.persistent=false" />-->         									
	</bean>

	<bean id="sampleDest" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg value="QUEUE.SAMPLE" />
	</bean>
	
	<!-- PRODUCER -->
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
	</bean>
	<bean id="producer" class="eu.giuseppeurso.activemq.scheduledconsumer.Producer">
		<property name="template" ref="jmsTemplate" />
		<property name="destinations">
			<list>
				<ref local="sampleDest" />
			</list>
		</property>
	</bean>
	
	<!-- CONSUMER -->
	<bean id="consumer" class="eu.giuseppeurso.activemq.scheduledconsumer.Consumer">
		<property name="sampleListener" ref="sampleDMLC" />
	</bean>
	<!-- Spring DMLC -->
	<bean id="sampleDMLC"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="jmsConnectionFactory" />
		<property name="destination" ref="sampleDest" />
		<property name="messageListener" ref="consumer" />
		<property name="autoStartup" value="false"/>
	</bean>
	
	<!-- QUARTZ SCHEDULER (Start Job) -->
	<bean id="jobConsumerStart" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject" ref="consumer" />
		<property name="targetMethod" value="startConsumption" />
	</bean>
	<bean id="cronConsumerStart" class="org.springframework.scheduling.quartz.CronTriggerBean">
		<property name="jobDetail" ref="jobConsumerStart" />
		<property name="cronExpression" value="10 49 19 * * ?" />
	</bean>
	<!-- QUARTZ SCHEDULER (Stop Job) -->
	<bean id="jobConsumerStop" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject" ref="consumer" />
		<property name="targetMethod" value="stopConsumption" />
	</bean>
	<bean id="cronConsumerStop" class="org.springframework.scheduling.quartz.CronTriggerBean">
		<property name="jobDetail" ref="jobConsumerStop" />
		<property name="cronExpression" value="15 49 19 * * ?" />
	</bean>
	
	<!-- TRIGGERs -->
	<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<list>
				<ref bean="cronConsumerStart" />
				<ref bean="cronConsumerStop" />
			</list>
		</property>
	</bean>
	
</beans>

 

Run Test

Finally a simple JUnit test class ScheduledConsumerTest, to simulate a use case. This class sends a message on a sample queue and waits until 20 seconds with a Thread.sleep for the job executions.

public class ScheduledConsumerTest extends TestCase {
//something here ...
public void testScheduledConsumer() throws JMSException, InterruptedException {	
		Session session = producer.getTemplate().getConnectionFactory().createConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
		TextMessage txtMessage = session.createTextMessage();
		
		txtMessage.setText("This is a text message!");
		producer.sendMessage(txtMessage);
		//Waiting a few seconds until the scheduler start/stop
		Thread.sleep(20000);
	}
}
### Run Maven to compile source and execute test
###
$ cd activemq-samples/spring-scheduled-consumer
$ mvn clean install

giuseppe-urso-activemq-scheduled-message-consumer-01

 

giuseppe-urso-activemq-scheduled-message-consumer-02

Related posts

2 Comments

    1. Giuseppe

      The Quartz scheduler performs asynchronous actions and knows nothing of what happens while the messages are consumed. If you need to do something when consumption is finished, you should make programmatically in the Consumer class.
      Otherwise you can play with the cronExpression attribute which is responsible for starting and stopping of the message consuming:

        http://www.quartz-scheduler.org/api/2.1.7/org/quartz/CronTrigger.html?is-external=true#setCronExpression-org.quartz.CronExpression-

Leave a Reply

Your email address will not be published.