December 13, 2019

Consuming files from folders with Apache Camel

Apache Camel is a an open-source integration framework written in Java that helps the user implement many of the design patterns defined in the book Enterprise Integration Patterns by Gregor Hohpe and Bobby Woolf. The book discusses 65 design patterns for asynchronous messaging systems and Camel implements many of the design patterns in the EIP book.
Aim of Camel is to make easier the integration of different applications which use several protocols and technologies.
In this article I make use of some fundamental Camel objects like Route Builder, Processor and File Component to poll a given directory and consume from a file endpoint. You can find further details here:

camel-polling-directory-01

Stack

Apache Camel 2.17.3
JDK 1.8
Maven 3.2

SOURCE CODE (/giuseu/activemq)

GIT
git clone https://gitlab.com/giuseppeurso-eu/activemq

The source code covered in this post is located under the subfolder camel-polling-directory.

ConsumerRouteBuilder.java

The endpoint URI builder. The File component provides access to file systems, allowing files to be processed by a Consumer.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.RouteDefinition;

public class ConsumerRouteBuilder extends RouteBuilder {
 
   // A number of final static attributes here ...
	
  public void configure() throws Exception {
	System.out.println("Configuring Camel Endpoint with source dir: " + SOURCE_FOLDER);
	RouteDefinition def = from("file://" + SOURCE_FOLDER +
	            "?initialDelay=" + CONSUMER_INITIAL_DELAY +
				"&delay=" + CONSUMER_DELAY +
				"&readLock="+ CONSUMER_LOCK +
				"&readLockCheckInterval=" + CONSUMER_LOCK_INTERVAL);
	def.process(new ConsumerProcessor());
    }
   
// Some other useful methods here ...
}

CamelExecutor.java

The CamelContext initializer. This class make use of Thread.sleep to simulate a runtime system of 30 seconds. During the poll operation, the ConsumerRouteBuilder is used to monitor the state of the source directory and then start message exchanges.

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelExecutor {
	
	private int waitForFile = 30000;
	private CamelContext camelCtx;

 public void init() throws Exception {
		camelCtx = new DefaultCamelContext();
		try {
			camelCtx.addRoutes(new ConsumerRouteBuilder());
				camelCtx.start();
				System.out.println(">>>> PLEASE TRY to put a file into the source dir within "+(waitForFile/1000)+" seconds and wait processing... ");
				Thread.sleep(waitForFile);
		} catch (Exception e) {
			System.out.println("Unable to initialize CamelContext: " + e);
			return;
		}
	}
}

ConsumerProcessor.java

The processor of files from the source directory.

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;

public class ConsumerProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		try {
			Message message = exchange.getIn();
			File fileToProcess = message.getBody(File.class);
			System.out.println("File detected: '" + fileToProcess.getAbsolutePath());

              // Make something with your file here ...
			
		} catch (Exception e) {
			System.err.println("Exception while trying to process file: " + e);
		} 
	}
}

 

Related posts

Leave a Reply

Your email address will not be published.