Introduction

This blog series focuses on how Kafka can be used to create fast and fault-tolerant systems for processing data streams. The overall goal of this multi-part guide is to take a practical learning approach where we build a simple web application that produces data on click events and a Kafka cluster that can process this data. Part 1 of the series serves as an introduction to Kafka.

Goals

The goals of this post center around implementing a Kafka cluster for click streaming using pykafka. By the end of this post you will…

  • gain familiarity with the pykafka client.
  • build a kafka producer that processes click events.

Why pykafka?

For me, getting up and running is always quicker in a less verbose language, like Python. For this reason, I chose to use Parse.ly’s pykafka package to create my cluster. Kafka’s API documentation and the kafka-storm-starter repo could assist those looking to use Java. Confluent also keeps a list of Kafka clients implemented in a variety of languages.

Getting Set Up

To get our Kafka cluster to intake click events from a website, we will eventually need to have a dedicated producer that receives click events and forwards them to the Kafka broker. For now, we will start with two very simple implementations largely taken from the pykafka documentation. To follow along below, go to the Kafka Clickstreamer repo and follow the “Setup” instructions.

Note: Make sure you checkout Kafka Clickstreamer’s part2 branch so that you can follow along with the code below.

Once you have followed the installation instructions and activated the kafka environment using conda activate kafka, our pykafka producer and consumer can be run.

Simple Producers and Consumers

1. Spin up ZooKeeper and the Kafka Broker

As we did earlier, make sure that Zookeper and the Kafka broker are running. These commands are copied below. You can either issue them in separate terminal sessions or use the setup.sh script included in the repository, which will run both in one command.

cd kafka_<my-kafka-version>
bin/zookeeper-server-start.sh config/zookeeper.properties  # run zookeeper
bin/kafka-server-start.sh config/server.properties  # run kafka broker

2. Run producer.py

In another terminal window, run python producer.py. You should see a long stream of numbers being produced to the topic test. Feel free to stop this producer soon after you begin running it. You’ll see why in a moment.

Let’s take a second to inspect the most important aspects of the producer’s code. Once we instantiate a KafkaClient with the desired IP address and port, we can inspect that broker’s topics using the .topics attribute. In Part 1, we created a topic called ‘test’, and this producer will produce to that same topic. We use the .get_sync_producer() method to create our producer. A synchronous producer will wait to receive confirmation from the Kafka broker that its message was delivered before sending the next message. In most production systems this is not desired behavior because Kafka can achieve greater throughput by sending batches of messages together if they are allowed to queue up.

from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")

print client.topics
topic = client.topics['test']
producer = topic.get_sync_producer()

count = 0
while True:
    count += 1
    producer.produce('test msg', partition_key='{}'.format(count))

3. Run consumer.py

In yet another terminal window, run python consumer.py. You should see all of the messages that our producer created, from the beginning. As we learned in the previous post, the Kafka broker stores all produced messages to disk. When the consumer subscribes to a topic, it can opt to receive all messages from the beginning or just the new messages. Once your consumer has consumed all of the messages, feel free to keep it running and issue the python producer.py command again in another terminal to verify that we can produce and consume streams at the same time.

Our consumer attaches to the topic “test” in a similar fashion to our producer. We use .get_balanced_consumer() to create a consumer that will balance consumption of messages with any other consumers that are part of the consumer group testgroup.

from pykafka import KafkaClient
client = KafkaClient(hosts="127.0.0.1:9092")

topic = client.topics['test']

balanced_consumer = topic.get_balanced_consumer(
    consumer_group='testgroup',
    managed=True
)

for message in balanced_consumer:
    if message is not None:
        print message.offset, message.value

Feel free to shut down ZooKeeper, the Kafka broker, producer, and consumer a this point.

Turning Clicks into Streams

To accomplish our goal of transforming website clicks into a stream of messages, we need to turn our producer into a web server that can handle POST requests. We will run a Tornado web server that uses the pykafka producer API to send a message to the Kafka broker every time it receives a POST request. The Kafka Clickstreamer repo includes an HTML file with the web page that we will use to issue POST requests.

1. Inspecting clickstreamer.html

Opening clickstreamer.html reveals a very simple UI that will be used to generate click events that are sent to our Kafka cluster. The page is set up as a sort of “User Dashboard” where we will be able to simulate the click events of two separate users. The two events that we will be simulating, which will each become a Kafka topic, are clicks on a banner ad and clicking on a link to share an article.

Clickstreamer UI

The only notable aspects of the HTML/JavsScript are the following:

  • Button ids: Each button has an id attribute that identifies both the user and the type of button. This contrived example includes user identification in the id for the sake of simplicity.
  • POST Request’s Data: The POST request will send over the id of the button that was clicked on.
  • POST Request’s Target: The POST request is set to deliver its message to localhost:8000, so that’s where our Tornado server should be set up.

2. Inspecting stream_producer.py

The Tornado server that we will use to produce messages for our Kafka cluster is specified in stream_producer.py. The majority of this code handles standard web server setup, such as setting the port to run on (8000), the default headers, and which paths to serve.

First, we instantiate the KafkaClient as we did in previous examples, and then we make a producer for the topic test. Note that instead of calling get_sync_producer() on our topic, we call get_producer(). This asynchronous producer will not wait for confirmation from the Kafka broker before producing a message. This makes producing messages inexpensive enough to run within a request handler method.

client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
producer = topic.get_producer()

The post() method within MainHandler defines the logic for actually handling incoming POST requests at localhost:8000. Note that data in this example is a byte stream. All messages produced for Kafa should be sent as bytes. Decoding and reencoding these byte arrays is one of the more delicate aspects of handling messages headed to or from a Kafka broker. Note that conversions to and from byte arrays is different in Python 2 and Python 3.

@tornado.web.asynchronous
def post(self):
    """Forward contents of POST requests to Kafka."""
    data = self.request.body
    print json.loads(self.request.body.decode('utf-8'))
    producer.produce(data)
    self.write(json.dumps('{}'))
    self.finish()

Running Our Clickstreamer

At this point, we can fire up ZooKeeper, our Kafka broker, as well as our producer and consumer scripts. Run each of the commands in the list below in different terminal sessions. Ensure that the kafka conda environment is activated in each terminal session (source activate kafka) before running each command:

  1. Start up ZooKeeper and the Kafka broker: ./setup.sh
  2. Run the clickstream producer: python stream_producer.py
  3. Run the consumer: python consumer.py

Once your consumer has consumed all of the messages created for the topic ‘test’, open up clickstreamer.html and click the buttons and watch for the button ids to populate consumer.py‘s terminal window.

It may appear that it takes a long time for messages to propagate from a click to the Kafka consumer. This is due to the asynchronous producer. To show why this is advantageous, click one of the buttons in clickstreamer.html rapidly. The whole block of messages should appear at once in the consumer window because the producer sends them as soon as they are received rather than waiting for acknowledgement from the Kafka broker. This allows the consumer to pull down batches of messages from the broker.

Up Next: Extending Our Clickstreamer

We now have a dedicated producer listening to our client application and a balanced consumer that has subscribed to our Kafka cluster. In the next post we will:

  1. Add multiple message brokers to our cluster for fault tolerance.
  2. Make multiple topics, one for each user, to partition our data streams.