How-To: Write a Kafka Producer using Twitter Stream (Twitter HBC Client)
A step-by-step guide to building a Kafka producer that streams live tweets using Twitter’s Hosebird Client (HBC) and publishes them to a Kafka topic. This is a practical, developer-focused walkthrough with code, configuration, and troubleshooting tips. —
Table of Contents
- How-To: Write a Kafka Producer using Twitter Stream (Twitter HBC Client)
Introduction
Twitter’s Hosebird Client (HBC) is a robust Java HTTP library for consuming Twitter’s Streaming API. In this guide, you’ll learn how to use HBC to create a Kafka producer that streams tweets matching specific terms and publishes them to a Kafka topic. This data can then be used for analytics, real-time processing (e.g., with Storm), or further pipelined to HDFS.
You can find a complete sample project here.
Requirements
- Apache Kafka 2.6.0
- Twitter Developer Account (for API Key, Secret, etc.)
- Apache Zookeeper (required for Kafka)
- Oracle JDK 1.8 (64-bit)
Build Environment
- Eclipse (or your preferred IDE)
- Apache Maven 2/3
Generating Twitter API Keys
To access the Twitter Streaming API, you need API keys and tokens:
- Go to https://dev.twitter.com/apps/new and log in.
- Enter your Application Name, Description, and website address (callback URL can be left empty).
- Accept the Terms of Service and create your application.
- Copy the Consumer Key (API key) and Consumer Secret.
- Click Create my Access Token to generate your Access Token and Access Token Secret.
- You now have all four credentials needed for OAuth authentication.
Kafka & Zookeeper Setup
Start Zookeeper and Kafka servers. Replace $KAFKA_HOME
with your Kafka installation directory.
Start Zookeeper
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &
Verify Zookeeper is running (default port 2181):
netstat -anlp | grep 2181
Start Kafka
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &
Verify Kafka is running (default port 9092).
On macOS with Homebrew
brew install kafka # Installs Zookeeper too
brew services start zookeeper
kafka-server-start /usr/local/etc/kafka/server.properties
Creating a Kafka Topic
Create a topic named twitter-topic
:
$KAFKA_HOME/bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic twitter-topic
Validate the topic:
$KAFKA_HOME/bin/kafka-topics --describe --zookeeper localhost:2181 --topic twitter-topic
Kafka Producer with Twitter HBC
Now, let’s build a Kafka producer that streams tweets using HBC and publishes them to twitter-topic
.
Maven Dependencies
Add these dependencies to your pom.xml
:
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <!-- or hbc-twitter4j -->
<version>2.2.0</version> <!-- or latest -->
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
Kafka Producer Properties
Configure your Kafka producer:
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TwitterKafkaConfig.SERVERS);
properties.put(ProducerConfig.ACKS_CONFIG, "1");
properties.put(ProducerConfig.LINGER_MS_CONFIG, 500);
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Twitter HBC Client Setup
Set up the HBC client to track terms and authenticate:
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.trackTerms(Lists.newArrayList(term));
Authentication auth = new OAuth1(consumerKey, consumerSecret, token, secret);
Client client = new ClientBuilder()
.hosts(Constants.STREAM_HOST)
.endpoint(endpoint)
.authentication(auth)
.processor(new StringDelimitedProcessor(queue))
.build();
Producing Tweets to Kafka
Connect to the Twitter stream, fetch messages from the queue, and send them to Kafka:
client.connect();
try (Producer<Long, String> producer = getProducer()) {
while (true) {
ProducerRecord<Long, String> message = new ProducerRecord<>(TwitterKafkaConfig.TOPIC, queue.take());
producer.send(message);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
client.stop();
}
Running the Example
- Run the
TwitterKafkaProducer.java
class as a Java application in your IDE. - Pass your Twitter API keys and search terms as arguments (VM arguments or program arguments).
- For a complete runnable example and detailed instructions, see the GitHub repository.
References & Further Reading
- Kafka Quickstart
- Twitter HBC
- How to generate Twitter API keys
- Integrate Kafka with HDFS using Camus (blog)
Happy Learning!