How-To : Integrate Kafka with HDFS using Camus (Twitter Stream Example)
Simple String Example for Setting up Camus for Kafka-HDFS Data Pipeline
I came across Camus while building a Lambda Architecture framework recently. I couldn’t find a good Illustration of getting started with Kafk-HDFS pipeline , In this post we will see how we can use Camus to build a Kafka-HDFS data pipeline using a twitter stream produced by Kafka Producer as mentioned in last post .
What is Camus?
Camus is LinkedIn’s Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka. It includes the following features:
- Automatic discovery of topics
- Avro schema management / In progress
- Date partitioning
More details on overview of the projects is available on Camus READ ME page on github.
Setting up Camus
Requirements:
- Apache Hadoop 2+
- Apache Kafka 0.8
- Twitter Developer account ( for API Key, Secret etc.)
- Apache Zookeeper ( required for Kafka)
- Oracle JDK 1.7 (64 bit )
Build Environment:
- Eclipse
- Apache Maven 2/3
Building Camus Jar
To build Camus:
- Clone the Git Repo from https://github.com/linkedin/camus or download the complete project.
- Change the version of hadoop-client library in camus/pom.xml to match your hadoop version, In our case it’s 2.6.0 so we will change that to as follows –
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency>
- Build using
mvn clean package -DskipTests
And wait for BUILD SUCCESS message, be patient as it may take some time while building first time.
Camus Essentials – Decoder and RecordWriterProvider
Camus needs two main components for reading and decoding data from Kafka and writing data to HDFS –
- Decoding Messages read from Kafka – Camus has a set of Decoders which helps in decoding messages coming from Kafka, Decoders basically extends com.linkedin.camus.coders.MessageDecoder which implements logic to partition data based on timestamp. A set of predefined Decoders are present in this directory and you can write your own based on these. camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/
- Writing messages to HDFS – Camus needs a set of RecordWriterProvider classes which extends com.linkedin.camus.etl.RecordWriterProvider that will tell Camus what’s the payload that should be written to HDFS.A set of predefined RecordWriterProvider are present in this directory and you can write your own based on these.
camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common
Configuring and Running Camus
Camus needs Hadoop and JobHistory server to be running ,So let’s get it up –
- Start Hadoop and JobHistory Server Daemon , inside your Hadoop installation directory sbin folder –
$ start-dfs.sh $ start-yarn.sh $ mr-jobhistory-daemon.sh start historyserver
2. Also, setup Kafka and Twitter configuration as mentioned in previous post , which specifies topic as “twitter-topic” and client id as “camus” ( which we’ll configure in camus.properties too later ) and keep it running .
3. Configuring Camus Properties – We’ll use camus.properties as present here and customize as per our need and configuration
- Specify JsonStringMessageDecoder as decoder for messages on “twitter-topic” and StringRecordWriterProvider for writing output –
camus.message.decoder.class.twitter-topic=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
- Configure Kafka broker list, etl destination path ( destination where actual data from kafka will be written based on date partitioning) , base execution and execution history path.
kafka.client.name=camus kafka.brokers=localhost:9092 etl.destination.path = /user/hduser/topic/ etl.execution.base.path=/user/hduser/exec/ etl.execution.history.path=/user/hduser/camus/exec/history
- Specify “created_at” field present in twitter json as the timestamp field and timestamp format as ISO-8601
camus.message.timestamp.field=created_at camus.message.timestamp.format=ISO-8601
I am also presenting complete camus.properties files –
# Needed Camus properties, more cleanup to come # # Almost all properties have decent default properties. When in doubt, comment out the property. # # The job name. camus.job.name=Camus Job # final top-level data output directory, sub-directory will be dynamically created for each topic pulled etl.destination.path=/user/hduser/topics # HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files etl.execution.base.path=/user/hduser/exec # where completed Camus job output directories are kept, usually a sub-dir in the base.path etl.execution.history.path=/user/hduser/camus/exec/history # Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now) #camus.message.encoder.class=com.linkedin.camus.etl.kafka.coders.DummyKafkaMessageEncoder # Concrete implementation of the Decoder class to use. # Out of the box options are: # com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder - Reads JSON events, and tries to extract timestamp. # com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder - Reads Avro events using a schema from a configured schema repository. # com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder - Same, but converts event to latest schema for current topic. camus.message.decoder.class.twitter-topic=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder # Decoder class can also be set on a per topic basis. #camus.message.decoder.class.<topic-name>=com.your.custom.MessageDecoder # Used by avro-based Decoders (KafkaAvroMessageDecoder and LatestSchemaKafkaAvroMessageDecoder) to use as their schema registry. # Out of the box options are: # com.linkedin.camus.schemaregistry.FileSchemaRegistry # com.linkedin.camus.schemaregistry.MemorySchemaRegistry # com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry # com.linkedin.camus.example.schemaregistry.DummySchemaRegistry #kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry # Used by JsonStringMessageDecoder when extracting the timestamp # Choose the field that holds the time stamp (default "timestamp") camus.message.timestamp.field=created_at # What format is the timestamp in? Out of the box options are: # "unix" or "unix_seconds": The value will be read as a long containing the seconds since epoc # "unix_milliseconds": The value will be read as a long containing the milliseconds since epoc # "ISO-8601": Timestamps will be fed directly into org.joda.time.DateTime constructor, which reads ISO-8601 # All other values will be fed into the java.text.SimpleDateFormat constructor, which will be used to parse the timestamps # Default is "[dd/MMM/yyyy:HH:mm:ss Z]" #camus.message.timestamp.format=yyyy-MM-dd_HH:mm:ss camus.message.timestamp.format=ISO-8601 # Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all # topic that do not have a partitioner specified. # Out of the box options are (for all options see the source for configuration options): # com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner, groups files in hourly directories # com.linkedin.camus.etl.kafka.partitioner.DailyPartitioner, groups files in daily directories # com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner, groups files in very configurable directories # com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner, like HourlyPartitioner but less configurable # com.linkedin.camus.etl.kafka.partitioner.TopicGroupingPartitioner #etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner # Partitioners can also be set on a per-topic basis. (Note though that configuration is currently not per-topic.) #etl.partitioner.class.<topic-name>=com.your.custom.CustomPartitioner # all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks # hdfs.default.classpath.dir= # max hadoop tasks to use, each task can pull multiple topic partitions mapred.map.tasks=5 # max historical time that will be pulled from each partition based on event timestamp kafka.max.pull.hrs=1 # events with a timestamp older than this will be discarded. kafka.max.historical.days=3 # Max minutes for each mapper to pull messages (-1 means no limit) kafka.max.pull.minutes.per.task=-1 # if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled kafka.blacklist.topics= kafka.whitelist.topics= log4j.configuration=true # Name of the client as seen by kafka kafka.client.name=camus # The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port kafka.brokers=localhost:9092 # Fetch request parameters: #kafka.fetch.buffer.size= #kafka.fetch.request.correlationid= #kafka.fetch.request.max.wait= #kafka.fetch.request.min.bytes= #kafka.timeout.value= #Stops the mapper from getting inundated with Decoder exceptions for the same topic #Default value is set to 10 max.decoder.exceptions.to.print=5 #Controls the submitting of counts to Kafka #Default value set to true post.tracking.counts.to.kafka=true #monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka # everything below this point can be ignored for the time being, will provide more documentation down the road ########################## etl.run.tracking.post=false kafka.monitor.tier= etl.counts.path= kafka.monitor.time.granularity=10 etl.hourly=hourly etl.daily=daily # Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)? # `false` will fail the job, `true` will silently drop the event. etl.ignore.schema.errors=false # configure output compression for deflate or snappy. Defaults to deflate mapred.output.compress=false etl.output.codec=deflate etl.deflate.level=6 #etl.output.codec=snappy etl.default.timezone=America/Los_Angeles etl.output.file.time.partition.mins=60 etl.keep.count.files=false etl.execution.history.max.of.quota=.8 # Configures a customer reporter which extends BaseReporter to send etl data #etl.reporter.class mapred.map.max.attempts=1 kafka.client.buffer.size=20971520 kafka.client.so.timeout=60000 #zookeeper.session.timeout= #zookeeper.connection.timeout= etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider
One last thing, in my example I have set
mapred.output.compress=false
which, in real scenarios will be set to true and defaults to DEFLATE compression.
Running Camus
After building camus in first step, you should see in target folder of camus-example folder a jar named camus-example – camus-example-0.1.0-SNAPSHOT-shaded.jar
Put jar and camus.properties file in a folder and execute this command .
hadoop jar camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P camus.properties
That’s It !!
If you see above command to be successfully executed , you should see the records in HDFS at following path –
/user/hduser/topics/ or whatever path you have mentioned in your camus.properties file.
Please write back to me in comments if you face any issues while executing this one.
Happy Learning !!