Dicembre 16, 2019

File Consumer su una cartella con Apache Camel

Apache Camel è un framework scritto in Java per l’integrazione sincrona e asincrona tra sistemi eterogenei. E’ basato sul libro Enterprise Integration Patterns di Gregor Hohpe e Bobby Woolf e implementa molti dei pattern di integrazione definiti nel libro.
In questo articolo utilizzo alcuni oggetti fondamentali di Camel come per esempio Route Builder, Processor e File Component per implementare un Consumer di file su una directory. Lo scenario tipico di utilizzo è quello in cui da un lato vi è un Sender che carica uno o più file all’interno di una cartella condivisa e dall’altro vi è un Receiver che effettua il polling della directory per verificare la presenza di file e fare delle operazioni di lettura e scrittura su di essi.

camel-polling-directory-01

Camel mette a disposizione l’interfaccia PollingConsumer per istanziare oggetti di tipo Consumer direttamente sull’endpoint URI e per impostare opzioni e modalità di ricezione messaggi direttamente sull’istanza:

myCamelContext = new DefaultCamelContext();
myCamelContext.getEndpoint("activemq:my.queue").createPollingConsumer().receive();

Tuttavia in questo esempio non faccio uso dell’interfaccia. Le opzioni di ricezione messaggi vengono definite come parametri direttamente sull’endpoint URI. Ulteriori approfondimenti si possono trovare qui:

Stack

Apache Camel 2.17.3
JDK 1.8
Maven 3.2

SOURCE CODE (/giuseu/activemq)

GIT
git clone https://gitlab.com/giuseppeurso-eu/activemq

Il codice sorgente utilizzato in questo articolo si trova nella directory camel-polling-directory.

ConsumerRouteBuilder.java

Definisce un endpoint di tipo File e fornisce al Consumer l’accesso su file system.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.RouteDefinition;

public class ConsumerRouteBuilder extends RouteBuilder {
 
   // A number of final static attributes here ...
	
  public void configure() throws Exception {
	System.out.println("Configuring Camel Endpoint with source dir: " + SOURCE_FOLDER);
	RouteDefinition def = from("file://" + SOURCE_FOLDER +
	            "?initialDelay=" + CONSUMER_INITIAL_DELAY +
				"&delay=" + CONSUMER_DELAY +
				"&readLock="+ CONSUMER_LOCK +
				"&readLockCheckInterval=" + CONSUMER_LOCK_INTERVAL);
	def.process(new ConsumerProcessor());
    }
   
// Some other useful methods here ...
}

CamelExecutor.java

Inizializza CamelContext e lancia un Thread.sleep per simulare un runtime di 30 secondi. In questo intervallo di tempo, il Consumer processerà i file dentro la directory in modalità polling.

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelExecutor {
	
	private int waitForFile = 30000;
	private CamelContext camelCtx;

 public void init() throws Exception {
		camelCtx = new DefaultCamelContext();
		try {
			camelCtx.addRoutes(new ConsumerRouteBuilder());
			camelCtx.start();
			System.out.println(">>>> PLEASE TRY to put a file into the source dir within "+(waitForFile/1000)+" seconds and wait processing... ");
			Thread.sleep(waitForFile);
		} catch (Exception e) {
			System.out.println("Unable to initialize CamelContext: " + e);
			return;
		}
	}
}

ConsumerProcessor.java

Processa i file presenti nella directory di origine.

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;

public class ConsumerProcessor implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		try {
			Message message = exchange.getIn();
			File fileToProcess = message.getBody(File.class);
			System.out.println("Detected file name '" + fileToProcess.getAbsolutePath());

              // Make something with your file here ...
			
		} catch (Exception e) {
			System.err.println("Exception while trying to import invoce: " + e);
		} 
	}
}

 

Related posts

Leave a Reply

Your email address will not be published.