Apache Camel è un framework scritto in Java per l’integrazione sincrona e asincrona tra sistemi eterogenei. E’ basato sul libro Enterprise Integration Patterns di Gregor Hohpe e Bobby Woolf e implementa molti dei pattern di integrazione definiti nel libro.
In questo articolo utilizzo alcuni oggetti fondamentali di Camel come per esempio Route Builder, Processor e File Component per implementare un Consumer di file su una directory. Lo scenario tipico di utilizzo è quello in cui da un lato vi è un Sender che carica uno o più file all’interno di una cartella condivisa e dall’altro vi è un Receiver che effettua il polling della directory per verificare la presenza di file e fare delle operazioni di lettura e scrittura su di essi.

Camel mette a disposizione l’interfaccia PollingConsumer per istanziare oggetti di tipo Consumer direttamente sull’endpoint URI e per impostare opzioni e modalità di ricezione messaggi direttamente sull’istanza:
myCamelContext = new DefaultCamelContext();
myCamelContext.getEndpoint("activemq:my.queue").createPollingConsumer().receive();
Tuttavia in questo esempio non faccio uso dell’interfaccia. Le opzioni di ricezione messaggi vengono definite come parametri direttamente sull’endpoint URI. Ulteriori approfondimenti si possono trovare qui:
Stack
Apache Camel 2.17.3
JDK 1.8
Maven 3.2
Il codice sorgente utilizzato in questo articolo si trova nella directory camel-polling-directory.
ConsumerRouteBuilder.java
Definisce un endpoint di tipo File e fornisce al Consumer l’accesso su file system.
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.RouteDefinition;
public class ConsumerRouteBuilder extends RouteBuilder {
// A number of final static attributes here ...
public void configure() throws Exception {
System.out.println("Configuring Camel Endpoint with source dir: " + SOURCE_FOLDER);
RouteDefinition def = from("file://" + SOURCE_FOLDER +
"?initialDelay=" + CONSUMER_INITIAL_DELAY +
"&delay=" + CONSUMER_DELAY +
"&readLock="+ CONSUMER_LOCK +
"&readLockCheckInterval=" + CONSUMER_LOCK_INTERVAL);
def.process(new ConsumerProcessor());
}
// Some other useful methods here ...
}
CamelExecutor.java
Inizializza CamelContext e lancia un Thread.sleep per simulare un runtime di 30 secondi. In questo intervallo di tempo, il Consumer processerà i file dentro la directory in modalità polling.
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
public class CamelExecutor {
private int waitForFile = 30000;
private CamelContext camelCtx;
public void init() throws Exception {
camelCtx = new DefaultCamelContext();
try {
camelCtx.addRoutes(new ConsumerRouteBuilder());
camelCtx.start();
System.out.println(">>>> PLEASE TRY to put a file into the source dir within "+(waitForFile/1000)+" seconds and wait processing... ");
Thread.sleep(waitForFile);
} catch (Exception e) {
System.out.println("Unable to initialize CamelContext: " + e);
return;
}
}
}
ConsumerProcessor.java
Processa i file presenti nella directory di origine.
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
public class ConsumerProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
try {
Message message = exchange.getIn();
File fileToProcess = message.getBody(File.class);
System.out.println("Detected file name '" + fileToProcess.getAbsolutePath());
// Make something with your file here ...
} catch (Exception e) {
System.err.println("Exception while trying to import invoce: " + e);
}
}
}
Italiano
Inglese
