Simple String Example for Setting up Camus for Kafka-HDFS Data Pipeline

I came across Camus while building a Lambda Architecture framework recently. I couldn’t find a good Illustration of getting started with Kafk-HDFS pipeline ,  In this post we will see how we can use Camus to build a Kafka-HDFS data pipeline using a twitter stream produced by Kafka Producer as mentioned in last post .

What is Camus?

Camus is LinkedIn’s Kafka->HDFS pipeline. It is a mapreduce job that does distributed data loads out of Kafka. It includes the following features:

  • Automatic discovery of topics
  • Avro schema management / In progress
  • Date partitioning

More details on overview of the projects is available on Camus READ ME page on github.


 Setting up Camus

Requirements:

  • Apache Hadoop 2+
  • Apache Kafka 0.8
  • Twitter Developer account ( for API Key, Secret etc.)
  • Apache Zookeeper ( required for Kafka)
  • Oracle JDK 1.7 (64 bit )

Build Environment:

  • Eclipse
  • Apache Maven 2/3

Building Camus Jar

To build Camus:

  • Clone the Git Repo from https://github.com/linkedin/camus or download the complete project.
  • Change the version of hadoop-client library in camus/pom.xml to match your hadoop version, In our case it’s 2.6.0 so we will change that to as follows –

  • Build using

    And wait for BUILD SUCCESS message, be patient as it may take some time while building first time.

Camus Essentials – Decoder and RecordWriterProvider

Camus needs two main components for reading and decoding data from Kafka and writing data to HDFS –

  1. Decoding Messages read from Kafka  –  Camus has a set of Decoders which helps in decoding messages coming from Kafka,  Decoders basically extends com.linkedin.camus.coders.MessageDecoder which implements logic to partition data based on timestamp. A set of predefined Decoders are present in this directory and you can write  your own based on these.                                                                          camus/camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/
  1. Writing messages  to HDFS – Camus needs a set of RecordWriterProvider classes  which extends com.linkedin.camus.etl.RecordWriterProvider that will tell Camus what’s the payload that should be written to HDFS.A set of predefined RecordWriterProvider are present in this directory and you can write your own based on these.

camus-etl-kafka/src/main/java/com/linkedin/camus/etl/kafka/common

Configuring and Running Camus

Camus needs Hadoop and JobHistory server to be running ,So let’s get it up –

  1. Start Hadoop and JobHistory Server Daemon , inside your Hadoop installation directory sbin folder –

2.  Also, setup Kafka and Twitter configuration as mentioned in previous post  , which specifies topic as “twitter-topic” and client id as “camus” ( which we’ll configure in camus.properties too later ) and keep it running .

3. Configuring Camus Properties –  We’ll use camus.properties as present here and customize as per our need and configuration

  • Specify JsonStringMessageDecoder as decoder for messages on “twitter-topic” and StringRecordWriterProvider for writing output –

  • Configure  Kafka broker list, etl destination path ( destination where actual data from kafka will be written based on date partitioning) , base execution and execution history path.

  •  Specify “created_at” field present in twitter json as the timestamp field and timestamp format as ISO-8601

I am also presenting complete camus.properties files –

One last thing, in my example I have  set

which, in real scenarios will be set to true and defaults to DEFLATE compression.

Running Camus

After building camus  in first step, you should see in target folder of camus-example folder a jar named camus-example – camus-example-0.1.0-SNAPSHOT-shaded.jar

Put jar and camus.properties file in a folder and execute this command .

That’s It !!

If you see above command to be successfully executed , you should see the records in HDFS at following path –

/user/hduser/topics/ or whatever path you have mentioned in your camus.properties file.

camus_kafka_hdfs

 

 

Please write back to me in comments if you face any issues while executing this one.

Happy Learning !!

 


  • Ashish Dutt

    Thanks for this elaborate writeup. I have configured the camus.properties file as

    # Needed Camus properties, more cleanup to come

    # Almost all properties have decent default properties. When in doubt, comment out the property.

    # The job name.

    camus.job.name=Camus Job

    fs.defaultFS=hdfs://BDA01:8020

    etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.StringRecordWriterProvider

    #etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider

    # final top-level data output directory, sub-directory will be dynamically created for each topic pulled

    etl.destination.path=/user/kafka/topics

    # HDFS location where you want to keep execution files, i.e. offsets, error logs, and count files

    etl.execution.base.path=/user/kafka/exec

    # where completed Camus job output directories are kept, usually a sub-dir in the base.path

    etl.execution.history.path=/user/kafka/camus/exec/history

    # Concrete implementation of the Encoder class to use (used by Kafka Audit, and thus optional for now)

    #camus.message.encoder.class=com.linkedin.camus.etl.kafka.coders.DummyKafkaMessageEncoder

    # Concrete implementation of the Decoder class to use.

    # Out of the box options are:

    com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder – Reads JSON events, and tries to extract timestamp.

    # com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder – Reads Avro events using a schema from a configured schema repository.

    # com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder – Same, but converts event to latest schema for current topic.

    camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.JsonStringMessageDecoder

    #camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.LatestSchemaKafkaAvroMessageDecoder

    # Decoder class can also be set on a per topic basis.

    #camus.message.decoder.class.=com.your.custom.MessageDecoder

    # Used by avro-based Decoders (KafkaAvroMessageDecoder and LatestSchemaKafkaAvroMessageDecoder) to use as their schema registry.

    # Out of the box options are:

    # com.linkedin.camus.schemaregistry.FileSchemaRegistry

    # com.linkedin.camus.schemaregistry.MemorySchemaRegistry

    # com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry

    # com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

    kafka.message.coder.schema.registry.class=com.linkedin.camus.example.schemaregistry.DummySchemaRegistry

    # Used by JsonStringMessageDecoder when extracting the timestamp

    # Choose the field that holds the time stamp (default “timestamp”)

    camus.message.timestamp.field=created_at

    # What format is the timestamp in? Out of the box options are:

    # “unix” or “unix_seconds”: The value will be read as a long containing the seconds since epoc

    # “unix_milliseconds”: The value will be read as a long containing the milliseconds since epoc

    # “ISO-8601”: Timestamps will be fed directly into org.joda.time.DateTime constructor, which reads ISO-8601

    # All other values will be fed into the java.text.SimpleDateFormat constructor, which will be used to parse the timestamps

    # Default is “[dd/MMM/yyyy:HH:mm:ss Z]”

    #camus.message.timestamp.format= “unix”

    #yyyy-MM-dd_HH:mm:ss

    camus.message.timestamp.format=ISO-8601

    # Used by the committer to arrange .avro files into a partitioned scheme. This will be the default partitioner for all

    # topic that do not have a partitioner specified.

    # Out of the box options are (for all options see the source for configuration options):

    # com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner, groups files in hourly directories

    # com.linkedin.camus.etl.kafka.partitioner.DailyPartitioner, groups files in daily directories

    # com.linkedin.camus.etl.kafka.partitioner.TimeBasedPartitioner, groups files in very configurable directories

    # com.linkedin.camus.etl.kafka.partitioner.DefaultPartitioner, like HourlyPartitioner but less configurable

    # com.linkedin.camus.etl.kafka.partitioner.TopicGroupingPartitioner

    #etl.partitioner.class=com.linkedin.camus.etl.kafka.partitioner.HourlyPartitioner

    # Partitioners can also be set on a per-topic basis. (Note though that configuration is currently not per-topic.)

    #etl.partitioner.class.=com.your.custom.CustomPartitioner

    # all files in this dir will be added to the distributed cache and placed on the classpath for hadoop tasks

    # hdfs.default.classpath.dir=

    # max hadoop tasks to use, each task can pull multiple topic partitions

    mapred.map.tasks=30

    # max historical time that will be pulled from each partition based on event timestamp

    kafka.max.pull.hrs=1

    # events with a timestamp older than this will be discarded.

    kafka.max.historical.days=3

    # Max minutes for each mapper to pull messages (-1 means no limit)

    kafka.max.pull.minutes.per.task=-1

    # if whitelist has values, only whitelisted topic are pulled. Nothing on the blacklist is pulled

    #kafka.blacklist.topics=

    kafka.whitelist.topics=test

    log4j.configuration=true

    # Name of the client as seen by kafka

    kafka.client.name=camus

    # The Kafka brokers to connect to, format: kafka.brokers=host1:port,host2:port,host3:port

    kafka.brokers=BDA01:9092,BDA02:9092, BDA03:9092, BDA04:9092

    # Fetch request parameters:

    #kafka.fetch.buffer.size=

    #kafka.fetch.request.correlationid=

    #kafka.fetch.request.max.wait=

    #kafka.fetch.request.min.bytes=

    #kafka.timeout.value=

    #Stops the mapper from getting inundated with Decoder exceptions for the same topic

    #Default value is set to 10

    max.decoder.exceptions.to.print=5

    #Controls the submitting of counts to Kafka

    #Default value set to true

    post.tracking.counts.to.kafka=true

    #monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka

    # everything below this point can be ignored for the time being, will provide more documentation down the road

    ##########################

    etl.run.tracking.post=false

    #kafka.monitor.tier=

    #etl.counts.path=

    kafka.monitor.time.granularity=10

    etl.hourly=hourly

    etl.daily=daily

    # Should we ignore events that cannot be decoded (exception thrown by MessageDecoder)?

    # false will fail the job, true will silently drop the event.

    etl.ignore.schema.errors=true

    # configure output compression for deflate or snappy. Defaults to deflate

    mapred.output.compress=false

    #etl.output.codec=deflate

    #etl.deflate.level=6

    #etl.output.codec=snappy

    #etl.default.timezone=America/Los_Angeles

    etl.default.timezone=Singapore

    etl.output.file.time.partition.mins=60

    etl.keep.count.files=false

    etl.execution.history.max.of.quota=.8

    # Configures a customer reporter which extends BaseReporter to send etl data

    #etl.reporter.class

    mapred.map.max.attempts=1

    kafka.client.buffer.size=20971520

    kafka.client.so.timeout=60000

    #zookeeper.session.timeout=

    #zookeeper.connection.timeout=

    My problem is that when i execute the command hadoop jar /opt/camus/camus-example/target/camus-example-0.1.0-SNAPSHOT-shaded.jar com.linkedin.camus.etl.kafka.CamusJob -P /opt/camus/camus.properties the command succeeds. i then check in Hue and find that topic is not written at the specified location. Please suggest what should I do?

    Thanks for your help

    • Did you check in this location that you have specified – etl.destination.path=/user/kafka/topics ? Also, can you try setting your decoder on your topic once like this – camus.message.decoder.class.twitter-topic=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder

  • Harshwardhan Kulkarni

    Nice article…

    I am getting the Class not found exception for StringMessageDecoder class…

    I have set the property as,
    camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder

    Can you please help me on this?

    • Can you share how you are trying to build it?

      • Harshwardhan Kulkarni

        mvn clean package -DskipTests

        I also tried to give topic name as per your prev comment, but no luck
        camus.message.decoder.class.topic-test=com.linkedin.camus.etl.kafka.coders.StringMessageDecoder

  • Alex McLintock

    I used Camus also in 2015. Would you say it is still the way to go or has anything overtaken it for Kafka/Hadoop integration?

  • Mahesh Kolla

    For me Log4j.xml not found error