Python Taster Course
A fun and interactive introduction to both the Python programming language and basic computing concepts using programmable robots.
Build a high-throughput, scalable, reliable and fault-tolerant data pipeline capabale of fetching event-based data and streaming those events to Apache Spark.
Jillur Quddus • Founder & Chief Data Scientist • 7th June 2017
It was in 2012 when I first heard the terms 'Hadoop' and 'Big Data'. At the time, the two words were almost synonymous with each other - I would frequently attend meetings where clients wanted a 'Big Data' solution simply because it had become the latest buzz word, with little or no consideration as to whether their requirements and data actually warranted one. 'Big Data' is of course more than just Hadoop and as scalable technologies in both batch and real-time became more mature, so did our knowledge of them. However, for those of you that are now taking your first footsteps into the world of 'Big Data' technologies like I was back in 2012, you may be asking yourselves a similar question to what I was asking - namely, just what on earth is the difference between this plethora of new technologies and when should I choose one over the other?
To name but a few that you may of heard of before stumbling onto this article (in no particular order, just as I recall them): Apache Hadoop, Apache Hive, Apache Pig, Apache HBase, Apache Spark, Apache Storm, Apache Kafka, Apache Flume, Apache Cassandra, MongoDB, Redis, Oracle Berkeley, Akka, Spray (supersed by Akka HTTP), Apache TinkerPop, Apache Giraph, Apache Mahout, Apache ZooKeeper, Couchbase, Apache Flink, Elastic Search, Elassandra, SOLR, Voldermort (yes, you read that correctly), MemcacheDB, DynamoDB…..and so forth.
Over the coming weeks and months, I hope to write about each one of these technologies in a series of Knowledge Base articles, focusing on real-world situations in which you may decide to use them and most importantly using hands-on examples. As the first article in the series, I would like to dedicate it to those of you taking your first steps into the world of big-data technologies by covering a typical real-time data processing scenario whilst introducing some very important streaming technologies, namely Apache Flume, Apache Kafka and Apache Spark.
Our goal in this article will be to create a high-throughput, scalable, reliable and fault-tolerant data pipeline capabale of fetching event-based data and streaming those events to Apache Spark which will parse their content, all of which will be done in near real-time. (In my next article, I will discuss how we can use our real-time data pipeline to perform analytics and build predictive models using this data stream in Apache Spark!)
The examples that I describe below will use tweets as their real-time data source. Twitter is a great example to start with as it is free, it produces an abundant amount of data in real-time, and Twitter users can setup their own applications to access the stream of tweets made available by Twitter. So before continuing with this article, please make sure that you have a Twitter account. Once you have setup your Twitter account, head over to Twitter's Application Management page to create and register your application. Assuming that you have done this correctly, you will be provided with the following authentication information that you will need later on:
Apache Flume
So let us begin with one of the relatively older technologies, Apache Flume. As stated on its website, "Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application". So what does this all mean? Well in practical terms, Flume allows us to effectively collect (ingest), aggregate and move large amounts of streaming data from multiple sources (such as the Twitter stream of tweets or log data) into Hadoop (to which Flume is tightly integrated) where we can store it using Hadoop's Distributed File System (HDFS) and from where we can analyse it. A common use case of Flume is to act as a data pipeline to ingest simple event-based data into Hadoop and HBase, but it also supports other technologies and centralised data stores out-of-the-box. Let us break it further by studying its core components:
One of the great things about Flume is that Sinks can forward event data to the Flume Source of another Flume Agent i.e. Agents can be chained together to form complex data flows. Sources and Sinks within the same Agent run asynchronously with the events stored in the Channel. Furthermore, Flume guarantees delivery of messages from one Flume Agent to the next by starting separate transactions on both the delivering and receiving Agents respectively. Finally, Flume can be scaled horizontally with ease as there is no central co-ordinator node and Flume Agents run independently of each other with no single point of failure. All this makes Apache Flume a powerful service for high-throughput streams of real-time event-based data to feed your Big Data system.
We are now ready to configure and deploy our first Flume Agent! Our first Flume Agent will fetch tweets from the Twitter Stream via the Twitter Application that you created above using a demo Flume Twitter Source class that is bundled with Flume out-of-the-box. This demo Twitter Source connects to the Twitter Stream and continuously downloads a sample of tweets, converts them to Avro format and sends these Avro events to our Flume Sink. We will be using a Memory Channel from which a Logger Sink will consume the tweets and output them to the Console. Note that I am using a CentOS 7 Minimal Installation Server to perform the commands below, but Flume should work equally well with other Linux distributions.
# Unpack the Flume Binary to a directory of choice
tar -xzf apache-flume-1.7.0-bin.tar.gz
# Create a new Flume Configuration File to configure our Flume Twitter Agent
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = Logger
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
# Channel Configuration - Memory Channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100
# Sink Configuration - Logger Sink
TwitterAgent.sinks.Logger.type = logger
TwitterAgent.sinks.Logger.channel = MemChannel
# Launch the Flume Agent to sink to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console
Assuming that all goes well, the Logger Sink should output the Tweets in near real-time to the Console in JSON formatting as follows (since it is election time here in the UK, I used election themed keywords such as ge2017, election and so forth):
Currently we have setup a Logger Sink and have directed the logger to output to the console in our launch command using -Dflume.root.logger=DEBUG,console. Inside the conf folder, you will find a log4j.properties to update the logger attributes as required. By default, it will log to the logs directory inside the Flume Home Directory and to a file called flume.log. To write to the HDFS, all we need to do is amend our Sink as follows:
# Update the Sink to write to your HDFS
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS
...
# Sink Configuration - HDFS Sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.channel = MemChannel
# Ensure that the owner of the Flume Agent has permission to write to this HDFS Directory
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:9000/user/flume/twitter/data/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
# Because we have used %Y/%m/%d/%H formatting in our HDFS Path
# we must specify how Flume should get the timestamp of the tweet.
# To use the timestamp of the destination i.e. the HDFS Sink, we can use
# TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true
# Alternatively, to use the timestamp of the actual event (tweet), we can use Source interceptors
TwitterAgent.sources.Twitter.interceptors = interceptor1
TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp
# Ensure that the Hadoop Libraries are on the Flume Classpath so that Flume knows how to access and write to your HDFS
cp conf/flume-env.sh.template conf/flume-env.sh
vi conf/flume-env.sh
# Update the Flume Classpath to point to the correct directories on your system
# Alternatively, copy the Hadoop Common and HDFS libraries to the the Flume lib folder
FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/common/lib/*:$HADOOP_HOME/share/hadoop/hdfs/*"
# Launch the Flume Agent to sink to the HDFS
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf
Note that we do not need to explicitly define the Root Logger in our launch command anymore as Flume with automatically read log4j.properties and flume-env.sh from the conf directory. If all goes well, tweets will be written in near real-time to your HDFS at the path you specified above.
To learn more about Apache Flume, including creating complex chained data flows and its other Source, Channel and Sink types, please refer to the Apache Flume Documentation.
Apache Kafka
So far we have built a data pipeline to ingest simple event-based data i.e. tweets, into our system, writing to either a log file, the console or to your HDFS. Our goal is to get them to Apache Spark so that we can parse them in near real-time, and eventually build predictive models in Spark (which I will cover in the next article). The next technology that we will discuss in our real-time data pipeline is Apache Kafka.
As stated on its website "Apache Kafka is a distributed streaming platform". So why do we need it in our data pipeline? Well, Kafka is very well suited when you need to build real-time streaming data pipelines, like we are in this article, that get data between systems in reliable, scalable and fault-tolerant way. Let us explore some of the core concepts behind Apache Kafka before trying to integrate it into our data pipeline. The official Apache Kafka Documentation is a great introduction to Apache Kafka, so I strongly recommend that you read that in the first instance. For those of you that are already familiar with publish-subscribe messaging systems, some of the concepts may be familiar to you but nevertheless I would recommend reading the official documentation first.
What makes Kafka great is that it is a distributed, reliable and fault tolerant streaming platform. That is for Topics with a replication factor N, N-1 server failures can occur without losing any records. Furthermore, as each Topic is an ordered sequence of records that are appended, each message in a Partition is assigned a unique offset - Kafka does not record which messages have been read by which Consumers so as to retain only the unread messages but instead retains all messages for a configurable amount of time, whether or not they have been consumed. Therefore, Kafka can easily handle large numbers of Consumer instances, as it is up to the Consumers themselves to track their read positions, and can store large amounts of of data very well with low latency, high performance and added replication.
As well as working as a replacement for traditional message brokers, Kafka allows us to construct low-latency data pipelines as we can develop Consumers to subscribe to real-time event-based data in a reliable and fault-tolerant way, which is especially useful for critical real-time data where delivery of data must be guaranteed for integration and onward purposes. Whilst our example of processing tweets in real-time is not exactly critical, deploying Kafka into our real-time data pipeline now can teach us valuable lessons for when we are implementing mission-critical real-time production systems for our clients.
Kafka Channel
So how do we integrate Apache Kafka into our fledgling data pipeline? Well, we will be implementing the following design:
Before we proceed, let's take a moment to discuss the advantages and disadvantages of this approach. As I described earlier, Flume Channels are buffers that sit between Flume Sources and Flume Sinks, allowing the Sources to collect data without worrying about the Sinks, which are potentially operating at different rates anyhow. Furthermore, writing to and reading from Channels is achieved using Transactions - only after the transaction is committed will the batch of events within that Transaction be available to the Sinks. Flume supports a variety of Channels out-of-the-box:
Other out-of-the-box Channels include JDBC Channels (events are persisted to a Database) and Spillable Memory Channels (events are stored in-memory and disk). Flume also supports Kafka Channels where events are stored in a Kafka cluster, providing high availability, reliability and replication. In our case, the advantage of using a Kafka Channel directly from our Flume Twitter Source is that there is no need for additional buffering, it increases the reliability of our data pipeline and by using Source Interceptors with no explicit Sink, it allows us to write events into a Kafka Topic for use by other applications, like our eventual Spark Streaming Application. Depending on your requirements, you may design your Flume Agent and data pipeline differently. For example, if high performance is a requirement and the data is not mission-critical, you may decide to write events to a Topic using a Kafka Sink instead.
Now that we have covered some of the theory, let's get started implementing the rest of our data pipeline. As before, I will be using a single CentOS 7 Minimal Installation server to perform the commands below. In a production environment you would have a multi-node cluster, but for the sake of simple development purposes I will be using a single-node cluster. Note that ZooKeeper is out of scope for this article, however I will be coming back to it at a later date. For now, just think of ZooKeeper as a centralised service for configuration (e.g. bootstrapping cluster configuration from a central source) and distributed cluster management (e.g. node status in real-time) - that is Kafka uses ZooKeeper to help form its cluster of producer, consumer and broker nodes.
# Unpack the Kafka Binary to a directory of choice
tar -xzf kafka_2.12-0.10.2.1.tgz
# Basic Configuration of the internal ZooKeeper Service
vi config/zookeeper.properties
# Absolute path to the ZooKeeper Data Directory and Client Port
dataDir=<ZooKeeper Data Directory>
clientPort=2181
# Basic Configuration of the Kafka Server
vi config/server.properties
# Kafka Server Log Directory and ZooKeeper Co-ordinator Service
log.dirs=<Kafka Log Directory>
zookeeper.connect=<Hostname>:2181
# Start the internal ZooKeeper Service
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# Start the Kafka Server
bin/kafka-server-start.sh config/server.properties
# Create the Twitter Topic for our Tweets
bin/kafka-topics.sh --create --zookeeper <ZooKeeper Hostname>:2181 --replication-factor 1 --partitions 2 --topic twitter
Created topic "twitter".
Now that Kafka is up and running and we have created our Twitter Topic, we need to update our Flume Agent to write to the Kafka Channel. For now, we will keep the Flume Logger Sink outputting to the Console so that we can confirm that our newly designed Flume Agent is still working. In the next section of this article, we will discuss Apache Spark and how to configure it to read from the Kafka Channel.
# Update the Flume Agent to act as the Kafka Producer and Consumer
cd $FLUME_HOME
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = Kafka
TwitterAgent.sinks = Logger
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = Kafka
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
TwitterAgent.sources.Twitter.interceptors = interceptor1
TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp
# Channel Configuration - Kafka Channel
TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
TwitterAgent.channels.Kafka.capacity = 10000
TwitterAgent.channels.Kafka.transactionCapacity = 100
TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
TwitterAgent.channels.Kafka.topic = twitter
TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
TwitterAgent.channels.Kafka.parseAsFlumeEvent = true
# Sink Configuration - Logger Sink
TwitterAgent.sinks.Logger.type = logger
TwitterAgent.sinks.Logger.channel = Kafka
# Launch the Flume Agent using the Kafka Channel and sinking to the Console
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf -Dflume.root.logger=DEBUG,console
When you run the final command above, a lot of output will be produced in your console, including the tweets as before. If you would like to explicitly check that Kafka is receiving the messages from the Flume Producer, we can run a Kafka Consumer via the command line to subscribe to the Twitter Topic and display all messages from the beginning with a header.
# Run a Kafka Consumer subscribed to the Twitter Topic
cd $KAFKA_HOME
bin/kafka-console-consumer.sh --zookeeper <ZooKeeper Hostname>:2181 --topic twitter --from-beginning
Apache Spark
We have now setup a Flume Agent to collect tweets using our Twitter Application, publish those tweets to a Kafka Topic and finally consume those tweets from the Kafka Channel and sink them to a Logger or the HDFS. The last section of this article will describe how we can connect Apache Spark to Kafka to consume the tweets from Kafka using the Spark Streaming API. (My next article will describe how we can then use Apache Spark to perform some analytics and build predictive models using our real-time stream of tweets!)
Apache Spark is a big-data processing engine and cluster-computing framework capable of performing large-scale data processing and analytics, including Machine Learning, in both batch and real-time. Spark Streaming extends the core Spark API by letting us reuse the same code for batch processing on real-time streaming data flows, hence allowing us to perform real-time data processing and analytics. As always, the official Apache Spark Documentation is the best place to start to get up to speed with Apache Spark.
For the remainder of this article, we will be concentrating on the Spark Streaming API and its core components.
To finish this article, we are going to integrate Apache Spark with Kafka using Spark's Streaming API to consume the tweets from Kafka. Obviously this does not illustrate the full analytical power of Spark, but in my next article we will build upon our real-time data pipeline by performing analytics on the stream of tweets in Spark!
To start, we are going to configure and deploy a very basic single-node Spark cluster using Spark's own cluster manager and our CentOS 7 Minimal Installation Server as before.
# Unpack the Spark Binary to a directory of choice
tar -xzf spark-2.1.0-bin-hadoop2.7.tgz
# Basic Single-Node Spark Deployment Environment Configuration
# Note that Spark properties can be set directly using SparkConf passed to your SparkContext
# You can also specify relevant default configuration values using conf/spark-defaults.conf as follows
> vi spark-defaults.conf
# Spark Deployment Environment Properties
# In this example, we will be using Spark's own cluster manager
spark.master spark://<Cluster Manager Hostname>:7077
spark.driver.cores 1
spark.driver.maxResultSize 1g
spark.driver.memory 1g
spark.executor.memory 2g
spark.local.dir <Spark Scratch Directory>
# Networking Properties
spark.driver.host <Driver Hostname>
spark.network.timeout 120s
# Hadoop Properties
spark.hadoop.dfs.replication 1
# Start the Spark Master
sbin/start-master.sh
# Start a Spark Slave
sbin/start-slave.sh spark://<Cluster Manager Hostname>:7077
You can access the Spark Master UI at
# Update the Flume Agent so that Spark can pull data from a Custom Sink using a Flume Receiver
cd $FLUME_HOME
vi conf/flume-twitter.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'TwitterAgent'
# Flume Instance - Twitter Agent
TwitterAgent.sources = Twitter
TwitterAgent.channels = Kafka
# Source Configuration - Inbuilt TwitterSource
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = Kafka
TwitterAgent.sources.Twitter.consumerKey = <Your Twitter App Consumer Key>
TwitterAgent.sources.Twitter.consumerSecret = <Your Twitter App Consumer Secret>
TwitterAgent.sources.Twitter.accessToken = <Your Twitter App Access Token>
TwitterAgent.sources.Twitter.accessTokenSecret = <Your Twitter App Access Token Secret>
TwitterAgent.sources.Twitter.keywords = <Comma Delimited List of Keywords to filter the Tweets>
# Channel Configuration - Kafka Channel
TwitterAgent.channels.Kafka.type = org.apache.flume.channel.kafka.KafkaChannel
TwitterAgent.channels.Kafka.capacity = 10000
TwitterAgent.channels.Kafka.transactionCapacity = 100
TwitterAgent.channels.Kafka.brokerList = <Kafka Broker Hostname>:9092
TwitterAgent.channels.Kafka.topic = twitter
TwitterAgent.channels.Kafka.zookeeperConnect = <ZooKeeper Hostname>:2181
TwitterAgent.channels.Kafka.parseAsFlumeEvent = true
We are now ready to write our Spark Streaming Application to read tweets from Kafka and perform some simple processing on them in Spark. As I described above, Spark exposes a higher-level API so that developers can write their Spark applications in either Java, Scala or Python. As a Java Developer I will naturally choose Java or Scala where I can!
Spark Streaming supports two approaches to reading data from Kafka:
We will be using the Direct Approach to consume messages from Kafka and process them in Spark. Since I will be writing my Spark Streaming Application in Java, I will first create an Apache Maven project to handle all the build dependencies. I will configure Maven to build a fat JAR, meaning that the final JAR that we will submit to Apache Spark will include not only our Spark Streaming Application but all the required dependencies that it needs to run. A couple of things to note in my POM file below are:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- Maven Coordinates -->
<groupId>ai.hyperlearning.knowledgebase.spark.streaming</groupId>
<artifactId>hyperlearningai-spark-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<!-- Project Information -->
<name>Spark Kafka Direct Integration</name>
<description>HyperLearning AI Knowledgebase Spark Streaming Project - Kafka Direct Integration</description>
<url>https://hyperlearning.ai</url>
<organization>
<name>HyperLearning AI</name>
<url>https://hyperlearning.ai</url>
</organization>
<developers>
<developer>
<id>jillurquddus</id>
<name>Jillur Quddus</name>
<email>[email protected]</email>
<url>https://hyperlearning.ai</url>
<organization>HyperLearning AI</organization>
<organizationUrl>https://hyperlearning.ai</organizationUrl>
<roles>
<role>Chief Data Scientist</role>
<role>Principal Software Engineer</role>
</roles>
<timezone>Europe/London</timezone>
</developer>
</developers>
<!-- Repositories -->
<repositories>
<!-- Confluent Repository for KafkaAvroDecoder -->
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<!-- Properties -->
<properties>
<apache.avro.version>1.8.1</apache.avro.version>
<apache.spark.core.2.11.version>2.1.0</apache.spark.core.2.11.version>
<apache.spark.streaming.2.11.version>2.1.0</apache.spark.streaming.2.11.version>
<apache.spark.streaming.kafka-0-8_2.11.version>2.1.0</apache.spark.streaming.kafka-0-8_2.11.version>
<confluent.kafka.avro.serializer.version>3.2.1</confluent.kafka.avro.serializer.version>
<jdk.version>1.8</jdk.version>
<maven.plugins.maven-assembly-plugin.version>3.0.0</maven.plugins.maven-assembly-plugin.version>
<maven.plugins.maven-compiler-plugin.version>3.6.1</maven.plugins.maven-compiler-plugin.version>
<output.directory>/hyperlearningai/knowledgebase/spark/jars</output.directory>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<!-- Dependencies -->
<dependencies>
<!-- Apache Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<!-- Apache Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${apache.spark.core.2.11.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${apache.spark.streaming.2.11.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>${apache.spark.streaming.kafka-0-8_2.11.version}</version>
</dependency>
<!-- Confluent Kafka Avro Serializer -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.kafka.avro.serializer.version}</version>
</dependency>
</dependencies>
<!-- Build -->
<build>
<!-- Plugins -->
<plugins>
<!-- Maven Compiler: Compile the Sources of the Project -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugins.maven-compiler-plugin.version}</version>
<configuration>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
</plugin>
<!-- Maven Assembly: Aggregate project output with its dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven.plugins.maven-assembly-plugin.version}</version>
<configuration>
<!-- Final JAR Filename -->
<finalName>hyperlearningai-spark-kafka-${project.version}</finalName>
<!-- Include all Project Dependencies -->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<!-- JAR with dependencies Output Target Directory -->
<outputDirectory>${output.directory}</outputDirectory>
</configuration>
<executions>
<!-- Bind the assembly to the package phase -->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
We are now ready to start developing our Spark Streaming Application. To create an input DStream, we need to import KafkaUtils and create a Direct Stream using a Spark Streaming Context, the hostname and the port that the Kafka Broker is listening on and the Kafka Topic from which we want to consume messages from.
Plain-Text Processing
In the example below, we simply consume the tweets from Kafka, decode the values as Strings and output them to the Spark Executor Standard Out.
package ai.hyperlearning.knowledgebase.spark.streaming.kafka;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
/**
* Example Spark Streaming Application with Kafka Direct Integration
*
* Periodically query Kafka for the latest offsets in each Topic and Partition.
* Consume the tweets using a String Decoder and display them in the console.
*
* Usage: StreamingKafkaDirectStringDecoder <broker>
* broker: The hostname and port at which the Kafka Broker is listening
*
* @author jillur.quddus
* @since 0.0.1
*
*/
public class StreamingKafkaDirectStringDecoder {
public static void main(String[] args) throws InterruptedException {
if ( args.length != 1 ) {
System.err.println("Usage: StreamingKafkaDirectStringDecoder <broker>");
System.exit(1);
}
// Create a Java Streaming Context with a Batch Interval of 5 seconds
SparkConf conf = new SparkConf().setAppName("Kafka Direct String Decoder");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
// Specify the Kafka Broker Options and set of Topics
String broker = args[0];
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list", broker);
Set<String> topics = Collections.singleton("twitter");
// Create an input DStream using KafkaUtils and simple plain-text message processing
JavaPairInputDStream<String, String> kafkaDirectStream = KafkaUtils.createDirectStream(jssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParameters, topics);
kafkaDirectStream.foreachRDD(rdd -> {
rdd.foreach(record -> System.out.println(record._2));
});
// Start the computation
jssc.start();
// Wait for the computation to terminate
jssc.awaitTermination();
}
}
Avro Processing
Note that the Flume Twitter Source that we are using in our data pipeline actually converts the tweets into Avro format and sends these Avro messages downstream. Avro is a data serialisation schema-based and language-neutral system that uses JSON to declare data structures and schema. Instead of using the StringDecoder that simply decodes the values as Strings in our Streaming Application above, you could either use the DefaultDecoder that returns the raw array of Bytes and then decode it using Avro's Binary Decoder, or Confluent's KafkaAvroDecoder to receive messages with Avro records as their values.
Run the Streaming App
We are now ready to run our Spark Streaming Application! Use Maven to build the fat JAR and submit it to your Spark cluster. Since for the purposes of this article we are only using a single-node Spark cluster for development and debugging purposes, it makes sense to deploy our application in client mode.
# Submit our Spark Streaming Application
cd $SPARK_HOME
bin/spark-submit --class ai.hyperlearning.knowledgebase.spark.streaming.kafka.StreamingKafkaDirectStringDecoder --deploy-mode client /hyperlearningai/knowledgebase/spark/jars/hyperlearningai-spark-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar <Kafka Broker Hostname:Port>
# Launch the Flume Agent using the Twitter Source and Kafka Channel
cd $FLUME_HOME
bin/flume-ng agent --name TwitterAgent --conf conf --conf-file conf/flume-twitter.conf
If you now examine the Spark Executor output, you should be able to see the tweets being streamed from Kafka to Spark in near real-time!
We have now successfully developed a near real-time, high-throughput, reliable and fault-tolerant data pipeline. Apache Flume is used to collect the event-based data (tweets in our example) which are published to Apache Kafka. Apache Spark is then used to consume the data from Apache Kafka and perform near real-time data processing. Obviously, simply ingesting and printing data in no way demonstrates the capabilities of Apache Spark! So in my next article, I will be discussing how to build predictive models and peform data analytics in real-time on our stream of event-based data in Spark.
A fun and interactive introduction to both the Python programming language and basic computing concepts using programmable robots.
An introductory course to the Python 3 programming language, with a curriculum aligned to the Certified Associate in Python Programming (PCAP) examination syllabus (PCAP-31-02).
Automated parsing, and ontological & machine learning-powered semantic similarity modelling, of the Digital, Data and Technology (DDaT) profession capability framework website.