Python Taster Course
A fun and interactive introduction to both the Python programming language and basic computing concepts using programmable robots.
Build a custom Kafka producer in Java using the Twitter Hosebird client and Apache Avro.
Jillur Quddus • Founder & Chief Data Scientist • 16th June 2017
In my last article Real-Time Data Pipeline with Apache Kafka and Spark, I used Apache Flume to fetch tweets from the Twitter Stream using the demo Flume Twitter Source that is bundled with Flume out-of-the-box. The demo Twitter Source connects to the Twitter Stream and continuously downloads a sample of tweets. The tweets were then published to a Topic in the Kafka Channel that we setup. In this article, we will be writing a custom Kafka Producer in Java using Twitter's Hosebird Client to collect tweets from Twitter's Streaming API, parse and convert them to Avro format before publishing them to a Topic in Apache Kafka. The high-level real-time data pipeline will therefore be as follows:
First let us setup our Apache Maven project to handle the build dependencies for our Kafka Producer, including Twitter's Hosebird Client.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- Maven Coordinates -->
<groupId>ai.hyperlearning.knowledgebase.kafka.producers</groupId>
<artifactId>hyperlearningai-kafka-producers</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<!-- Project Information -->
<name>Example Kafka Producers</name>
<description>HyperLearning AI Knowledgebase Kafka Producers Project - Example Kafka Producers</description>
<url>https://hyperlearning.ai</url>
<organization>
<name>HyperLearning AI</name>
<url>https://hyperlearning.ai</url>
</organization>
<developers>
<developer>
<id>jillurquddus</id>
<name>Jillur Quddus</name>
<email>[email protected]</email>
<url>https://hyperlearning.ai</url>
<organization>HyperLearning AI</organization>
<organizationUrl>https://hyperlearning.ai</organizationUrl>
<roles>
<role>Chief Data Scientist</role>
<role>Principal Software Engineer</role>
</roles>
<timezone>Europe/London</timezone>
</developer>
</developers>
<!-- Properties -->
<properties>
<apache.avro.version>1.8.1</apache.avro.version>
<apache.kafka.2.11.version>0.8.2.2</apache.kafka.2.11.version>
<bijection.avro.2.11.version>0.9.5</bijection.avro.2.11.version>
<jdk.version>1.8</jdk.version>
<maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
<maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
<output.directory>/hyperlearningai/knowledgebase/kafka/jars</output.directory>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<twitter.hbc.core.version>2.2.0</twitter.hbc.core.version>
<twitter.hbc.twitter4j.version>2.2.0</twitter.hbc.twitter4j.version>
</properties>
<!-- Dependencies -->
<dependencies>
<!-- Apache Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${apache.kafka.2.11.version}</version>
</dependency>
<!-- Bijection -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.11</artifactId>
<version>${bijection.avro.2.11.version}</version>
</dependency>
<!-- Twitter Hosebird Client -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>${twitter.hbc.core.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-twitter4j</artifactId>
<version>${twitter.hbc.twitter4j.version}</version>
</dependency>
</dependencies>
<!-- Build -->
<build>
<!-- Plugins -->
<plugins>
<!-- Maven Compiler: Compile the Sources of the Project -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugins.maven-compiler-plugin.version}</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<!-- Maven Assembly: Aggregate project output with its dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.plugins.maven-assembly-plugin.version}</version>
<configuration>
<!-- Final JAR Filename -->
<finalName>hyperlearningai-kafka-producers-${project.version}</finalName>
<!-- Include all Project Dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- Main Class -->
<archive>
<manifest>
<mainClass>ai.hyperlearning.knowledgebase.kafka.producers.TwitterProducer</mainClass>
</manifest>
</archive>
<!-- JAR with dependencies Output Target Directory -->
<outputDirectory>${output.directory}</outputDirectory>
</configuration>
<executions>
<!-- Bind the assembly to the package phase -->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
We are now ready to create our Twitter Producer Java Class. I will start off by defining the Avro schema which will be used to serialize and publish the messages to our Kafka Topic. For the full list of available fields, please refer to https://dev.twitter.com/overview/api/tweets.
// Avro Schema to use to serialise messages to the Kafka Topic
// For the full list of Tweet fields, please refer to
// https://dev.twitter.com/overview/api/tweets
private static Schema schema;
private static Injection<GenericRecord, byte[]> recordInjection;
public static final String TWEET_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"tweet\","
+ "\"fields\":["
+ " { \"name\":\"id\", \"type\":\"string\" },"
+ " { \"name\":\"user_name\", \"type\":\"string\" },"
+ " { \"name\":\"text\", \"type\":\"string\" }"
+ "]}";
The next method that I will define will generate an Avro Record from a data interface representing a single Tweet using our newly defined Avro Schema.
/**
* Parse and convert the Tweet Status into an Avro Record for serialising
* and publishing to the Kafka Topic.
* @param avroSchema
* @param status
* @return
*/
private static GenericData.Record createRecord(Status status) {
User user = status.getUser();
GenericData.Record doc = new GenericData.Record(schema);
doc.put("id", String.valueOf(status.getId()));
doc.put("user_name", user.getName());
doc.put("text", status.getText());
return doc;
}
We can now move onto establishing a connection to the Twitter Streaming API and creating our Kafka Producer. To create a basic Kafka Producer is very simple:
Note that we will be using Twitter's Bijection Library to serialise and deserialise our Avro messages as it has a more user-friendly API than Avro's native API.
As I mentioned earlier, we will be using Twitter's Hosebird Client to collect tweets from Twitter's Streaming API. To create a Hosebird Client, we define a First In First Out (FIFO) Queue to collect messages from the stream using a Filtered Endpoint allowing us to filter the tweets based on a defined list of terms. Finally we need the Twitter Application Authentication attributes that we generated in the last article to provision us access to the Twitter Stream.
We will then use the Twitter4j Library to create a Twitter4j Client that will wrap around the Hosebird Client exposing a custom Status Listener. It is within this custom Status Listener that we will parse the Status and generate the Avro Record that we will publish to the Kafka Topic via our Producer. Finally, we will create an Executor Service to spawn Threads to parse the actual messages that we collect - Runnables are submitted to the Executor Service to process messages in the FIFO messaging queue that we defined earlier when we call the process method on our Twitter4j Client Wrapper. This method looks as follows:
/**
* Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
* and an Executor Service to spawn threads to parse the messages received
* @param kafkaBroker
* @throws InterruptedException
*/
public static void run(String kafkaBroker) throws InterruptedException {
// Kafka Producer Properties
Properties producerProperties = new Properties();
// Bootstrapping
producerProperties.put("bootstrap.servers", kafkaBroker);
// Serializer Class for Keys
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Serializer Class for Values
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// When a Produce Request is considered completed
producerProperties.put("request.required.acks", "1");
// Create the Kafka Producer
producer = new KafkaProducer<>(producerProperties);
// Twitter Connection and Filtering Properties
BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(100000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.stallWarnings(false);
endpoint.trackTerms(Lists.newArrayList("brexit", "#vote2017"));
Authentication authentication = new OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET);
// Build a Twitter Hosebird Client
ClientBuilder hosebirdClientBuilder = new ClientBuilder()
.name("HyperLearning AI Knowledgebase Twitter Hosebird Client")
.hosts(Constants.STREAM_HOST)
.authentication(authentication)
.endpoint(endpoint)
.processor(new StringDelimitedProcessor(messageQueue));
BasicClient hosebirdClient = hosebirdClientBuilder.build();
// Create an Executor Service to spawn threads to parse the messages
// Runnables are submitted to the Executor Service to process the Message Queue
int numberProcessingThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(numberProcessingThreads);
// Wrap a Twitter4j Client around the Hosebird Client using a custom Status Listener
Twitter4jStatusClient twitter4jClient = new Twitter4jStatusClient(
hosebirdClient, messageQueue, Lists.newArrayList(statusListener), service);
// Connect to the Twitter Streaming API
twitter4jClient.connect();
// Twitter4jStatusClient.process must be called for every Message Processing Thread to be spawned
for (int threads = 0; threads < numberProcessingThreads; threads++) {
twitter4jClient.process();
}
// Run the Producer for 60 seconds for DEV purposes
// Note that this is NOT a graceful exit
Thread.sleep(60000);
producer.close();
hosebirdClient.stop();
}
Finally we need to define our custom Status Listener which will be called for every new Tweet. Our custom Status Listener will parse and generate an Avro Record from the Status using the createRecord method that we defined above. It will then use the Bijection Library to create a Byte Array from this Avro Record that our Kafka Producer will then send to Kafka for publishing to a Kafka Topic.
/**
* Custom Status Listener
* The onStatus method gets called for every new Tweet. It is here where we
* will parse the incoming messages and generate the Avro Record which will be
* serialised and sent using our Kafka Producer.
*/
private static StatusListener statusListener = new StatusStreamHandler() {
@Override
public void onStatus(Status status) {
// Convert the Status object into an Avro Record for serialising and publishing to the Kafka Topic
GenericData.Record avroRecord = createRecord(status);
byte[] avroRecordBytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(KAFKA_TOPIC, avroRecordBytes);
// Send the Message to Kafka
producer.send(record);
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
@Override
public void onTrackLimitationNotice(int limit) {}
@Override
public void onScrubGeo(long user, long upToStatus) {}
@Override
public void onStallWarning(StallWarning warning) {}
@Override
public void onException(Exception e) {}
@Override
public void onDisconnectMessage(DisconnectMessage message) {}
@Override
public void onStallWarningMessage(StallWarningMessage warning) {}
@Override
public void onUnknownMessageType(String s) {}
};
Putting it all together, our final Kafka Twitter Producer looks as follows:
package ai.hyperlearning.knowledgebase.kafka.producers;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.BasicClient;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import com.twitter.hbc.twitter4j.Twitter4jStatusClient;
import com.twitter.hbc.twitter4j.handler.StatusStreamHandler;
import com.twitter.hbc.twitter4j.message.DisconnectMessage;
import com.twitter.hbc.twitter4j.message.StallWarningMessage;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.User;
/**
* Example Kafka Twitter Producer
*
* Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
* to connect to the Twitter Streaming API. Parse and convert the messages (tweets)
* to Avro format and publish them to a Kafka Topic.
*
* Usage: TwitterProducer
* broker: The hostname and port at which the Kafka Broker is listening
*
* @author jillurquddus
* @since 0.0.1
*
*/
public class TwitterProducer {
// Kafka Producer - note that Kafka Producers are Thread Safe and that sharing a Producer instance
// across threads is generally faster than having multiple Producer instances
private static KafkaProducer<String, byte[]> producer;
// Note that for a Production Deployment, do not hard-code your Twitter Application Authentication Keys
// Instead, derive from a Configuration File or Context
private static final String CONSUMER_KEY = "<Your Twitter Application Consumer Key>";
private static final String CONSUMER_SECRET = "<Your Twitter Application Consumer Secret>";
private static final String ACCESS_TOKEN = "<Your Twitter Application Access Token>";
private static final String ACCESS_TOKEN_SECRET = "<Your Twitter Application Access Token Secret>";
private static final String KAFKA_TOPIC = "twitter";
// Avro Schema to use to serialise messages to the Kafka Topic
// For the full list of Tweet fields, please refer to
// https://dev.twitter.com/overview/api/tweets
private static Schema schema;
private static Injection<GenericRecord, byte[]> recordInjection;
public static final String TWEET_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"tweet\","
+ "\"fields\":["
+ " { \"name\":\"id\", \"type\":\"string\" },"
+ " { \"name\":\"user_name\", \"type\":\"string\" },"
+ " { \"name\":\"text\", \"type\":\"string\" }"
+ "]}";
/**
* Wrap a Twitter4j Client around a Hosebird Client using a custom Status Listener
* and an Executor Service to spawn threads to parse the messages received
* @param kafkaBroker
* @throws InterruptedException
*/
public static void run(String kafkaBroker) throws InterruptedException {
// Kafka Producer Properties
Properties producerProperties = new Properties();
// Bootstrapping
producerProperties.put("bootstrap.servers", kafkaBroker);
// Serializer Class for Keys
producerProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Serializer Class for Values
producerProperties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
// When a Produce Request is considered completed
producerProperties.put("request.required.acks", "1");
// Create the Kafka Producer
producer = new KafkaProducer<>(producerProperties);
// Twitter Connection and Filtering Properties
BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(100000);
StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
endpoint.stallWarnings(false);
endpoint.trackTerms(Lists.newArrayList("brexit", "#vote2017"));
Authentication authentication = new OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET);
// Build a Twitter Hosebird Client
ClientBuilder hosebirdClientBuilder = new ClientBuilder()
.name("HyperLearning AI Knowledgebase Twitter Hosebird Client")
.hosts(Constants.STREAM_HOST)
.authentication(authentication)
.endpoint(endpoint)
.processor(new StringDelimitedProcessor(messageQueue));
BasicClient hosebirdClient = hosebirdClientBuilder.build();
// Create an Executor Service to spawn threads to parse the messages
// Runnables are submitted to the Executor Service to process the Message Queue
int numberProcessingThreads = 2;
ExecutorService service = Executors.newFixedThreadPool(numberProcessingThreads);
// Wrap a Twitter4j Client around the Hosebird Client using a custom Status Listener
Twitter4jStatusClient twitter4jClient = new Twitter4jStatusClient(
hosebirdClient, messageQueue, Lists.newArrayList(statusListener), service);
// Connect to the Twitter Streaming API
twitter4jClient.connect();
// Twitter4jStatusClient.process must be called for every Message Processing Thread to be spawned
for (int threads = 0; threads < numberProcessingThreads; threads++) {
twitter4jClient.process();
}
// Run the Producer for 60 seconds for DEV purposes
// Note that this is NOT a graceful exit
Thread.sleep(60000);
producer.close();
hosebirdClient.stop();
}
/**
* Custom Status Listener
* The onStatus method gets called for every new Tweet. It is here where we
* will parse the incoming messages and generate the Avro Record which will be
* serialised and sent using our Kafka Producer.
*/
private static StatusListener statusListener = new StatusStreamHandler() {
@Override
public void onStatus(Status status) {
// Convert the Status object into an Avro Record for serialising and publishing to the Kafka Topic
GenericData.Record avroRecord = createRecord(status);
byte[] avroRecordBytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>(KAFKA_TOPIC, avroRecordBytes);
// Send the Message to Kafka
producer.send(record);
}
@Override
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
@Override
public void onTrackLimitationNotice(int limit) {}
@Override
public void onScrubGeo(long user, long upToStatus) {}
@Override
public void onStallWarning(StallWarning warning) {}
@Override
public void onException(Exception e) {}
@Override
public void onDisconnectMessage(DisconnectMessage message) {}
@Override
public void onStallWarningMessage(StallWarningMessage warning) {}
@Override
public void onUnknownMessageType(String s) {}
};
/**
* Parse and convert the Tweet Status into an Avro Record for serialising
* and publishing to the Kafka Topic.
* @param avroSchema
* @param status
* @return
*/
private static GenericData.Record createRecord(Status status) {
User user = status.getUser();
GenericData.Record doc = new GenericData.Record(schema);
doc.put("id", String.valueOf(status.getId()));
doc.put("user_name", user.getName());
doc.put("text", status.getText());
return doc;
}
public static void main(String[] args) {
if ( args.length != 1 ) {
System.err.println("Usage: TwitterProducer <broker>");
System.exit(1);
}
try {
// Create the Avro Schema
Schema.Parser parser = new Schema.Parser();
schema = parser.parse(TWEET_SCHEMA);
recordInjection = GenericAvroCodecs.toBinary(schema);
// Connect to the Twitter Streaming API and start the Producer
TwitterProducer.run(args[0]);
} catch (Exception e) {
System.out.println(e);
}
}
}
As per my last article, you can check that Kafka is receiving messages from our Kafka Producer by running a Kafka Consumer via the command line to subscribe to the Twitter Topic and consume messages as they arrive:
# Run a Kafka Consumer subscribed to the Twitter Topic
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper <ZooKeeper Hostname>:2181 --topic twitter
In my next article, we will be using this updated real-time event-based streaming data pipeline to build predictive models using Apache Spark's Streaming API and Machine Learning Library.
A fun and interactive introduction to both the Python programming language and basic computing concepts using programmable robots.
An introductory course to the Python 3 programming language, with a curriculum aligned to the Certified Associate in Python Programming (PCAP) examination syllabus (PCAP-31-02).
Automated parsing, and ontological & machine learning-powered semantic similarity modelling, of the Digital, Data and Technology (DDaT) profession capability framework website.