Sharing Our Passion for Technology
& Continuous Learning
Getting Started With Camel: Marshalling
In my last post on Camel, I spent some time introducing you two of its biggest players, the Context and the Route, by means of a simple problem - processing data coming in via file. If all we had to do was move the file, we could call our job done and move on, but what if we need to do something with that information?
Here's a quick recap of our problem from last time.
User puts file in directory.System reads file from directory.- System breaks each line into a separate record.
- System processes each record.
System archives the file.
I've marked off steps 1, 2, and 5 since we already took care of them with our simple implementation of the File Transfer pattern. Steps 3 and 4 offer us an opportunity to explore the Message Transformation Enterprise Integration Pattern (EIP), which covers interactions between systems that don't share a common API (e.g, our file and a Java-based API that takes a plain-old-java-object (POJO)).
The Exchange
EIPs run on messages. They are read, sent, manipulated, and stored based on the individual needs of the system, and Camel's API follows suit nicely. Remember our FileMover, the simple route we defined to grab a file out of a directory and park it somewhere else?@Component
public class FileMover extends SpringRouteBuilder {
@Value("${camel.ride.app.output.directory}")
private String outputDirectory;
@Value("${camel.ride.app.input.directory}")
private String inputDirectory;
@Override
public void configure() throws Exception {
from("file:" + inputDirectory).to("file:" + outputDirectory);
}
}
We already covered that this little route is building a connection between two of Camel's EndPoint classes, but what happens in the middle? To start, Camel builds an Exchange, which will facilitate the transfer of a Message from one endpoint to the other. The exchange contains all the interesting bits and pieces of your message. You can get at this content several ways, but the simplest by far is with a Processor.
The Processor Interface
Attaching a Processor to an Endpoint is an easy way to hook into a route and see what is going on because a Processor has one method, process, which takes an Exchange.@Component
public class SimpleProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println("There is an exchange going on.");
System.out.println(exchange.getIn().getHeader("CamelFileName"));
System.out.println(exchange.getIn().getBody());
System.out.println(exchange.getIn().getBody().getClass());
}
}
All that's left now is to inject this processor into our File Mover route.
@Component
public class FileMover extends SpringRouteBuilder {
@Value("${camel.ride.app.output.directory}")
private String outputDirectory;
@Value("${camel.ride.app.input.directory}") S
private String inputDirectory;
@Autowired
private SimpleProcessor simpleProcessor;
@Override
public void configure() throws Exception {
from("file:" + inputDirectory).process(simpleProcessor).to("file:" + outputDirectory);
}
}
The above class gives one example of how we can solve our remaining steps, by grabbing the Body of the In message and breaking it into different objects, but there are better options, namely letting Camel marshal (or transform) that object from it's incoming format into something we can use.
Marshalling to Spring Managed Beans
Camel's marshalling functionality allows us to transform the data coming from our file in a variety of ways. For this example, we'll look specifically at comma-separate-value (CSV) file processing, but there are others available, most notably the ability to marshal to and from XML.In a simple world, we might be handed a CSV file containing the following data:
John,Smith
Sue,Anderson
As you might guess, the values in this file represent the first and last name of a person. If we were to use a processor, like our example above, we would need stream the contents of the file and build Strings for each of the fields in our CSV, and then set them on the appropriate POJO. Instead, we'll let Camel do the work for us, so all we have to worry about is getting a list of Strings for each row.
CSV processing is a separate module in Camel. To use it, add the following to your Maven POM file.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-csv</artifactId>
<version>${camel.version}</version>
</dependency>
Before we hook CSV processing into our route, we'll need a bean to do something with the results. The Person class used here is just a POJO with a first and last name field.
public class CsvToPersonProcessor {
public void process(List<List> csvRows) {
for (List csvRow : csvRows) {
Person person = new Person();
person.setFirstName(csvRow.get(0));
person.setLastName(csvRow.get(1));
doSomethingTo(person);
}
}
}
One thing that's a little odd here is the process method, which takes a list of a list of Strings. Why that particular method signature? The answer lies within Camel's CSV library. Each row in our csv file is going to be broken into a list of Strings, and that list is going to be added into another list, so what you have at the end of the marshalling process is a list of all the rows from our CSV file. Yes, this means that your whole file will be in memory at once, which is bad, but hold on a second while we look at our new route definition, because we'll get to that problem in a minute.
@Component
public class FileMover extends SpringRouteBuilder {
@Value("${camel.ride.app.output.directory}")
private String outputDirectory;
@Value("${camel.ride.app.input.directory}")
private String inputDirectory;
@Override
public void configure() throws Exception {
from("file:" + inputDirectory).to("file:" + outputDirectory)
.unmarshal().csv().beanRef("csvToPersonProcessor", "process");
}
}
Like before, we are picking up a file from one directory and moving it to another, but now, at the end, we are unmarshalling the CSV and sending the results to the process method of the bean we defined above. And like I said before, it's going to do the entire file by default. So now, we need to stop doing that, before we run out of memory.
@Component
public class FileMover extends SpringRouteBuilder {
@Value("${camel.ride.app.output.directory}")
private String outputDirectory;
@Value("${camel.ride.app.input.directory}")
private String inputDirectory;
@Override
public void configure() throws Exception {
from("file:" + inputDirectory)
.to("file:" + outputDirectory)
.split(body().tokenize("\n")).streaming()
.unmarshal().csv()
.beanRef("csvToPersonProcessor", "process");
}
}
In the class above, we are splitting the body of the message on each new line, and then streaming the result to the CSV unmarshalling process we had before. The big change here is that now our CsvToPersonProcessor is going to be invoked one time for each row in the CSV, which means our looming memory issue is gone.
Before I wrap everything up, you probably want to know what to do about the pesky column headers that come along with many CSV files. These are dealt with through an instance of the DataFormat interface injected into the marshalling/unmarshalling portion of your route. You are welcome to roll your own DataFormat, but the camel-csv module also includes a CsvDataFormat class which, as it turns out, you were using the whole time with your call to the csv() method. All you need to do is break it out as a variable, flip the "skipFirstLine" property, and inject the DataFormat into your unmarshal call.
@Component
public class FileMover extends SpringRouteBuilder {
@Value("${camel.ride.app.output.directory}")
private String outputDirectory;
@Value("${camel.ride.app.input.directory}")
private String inputDirectory;
@Override
public void configure() throws Exception {
CsvDataFormat format = new CsvDataFormat();
csv.setSkipFirstLine(true);
from("file:" + inputDirectory)
.to("file:" + outputDirectory)
.split(body().tokenize("\n")).streaming()
.unmarshal(csv)
.beanRef("csvToPersonProcessor", "process");
}
}
It's probably worth noting here that because we were using Camel's Java DSL for all of the above examples, we didn't need to make any additional changes to the XML snippet we defined last time.