Introduction

Kafka is an open source system and also a distributed system is built to use Zookeeper. The basic responsibility of Zookeeper is to build coordination between different nodes in a cluster. Since Zookeeper works as periodically commit offset so that if any node fails, it will be used to recover from previously committed to offset.

The ZooKeeper is also responsible for configuration management, leader detection, detecting if any node leaves or joins the cluster, synchronization, etc.

  • Topic (a stream of messages belonging to the same type)
  • Producer (can publish messages to a topic)
  • Brokers (a set of servers where the publishes messages are stored)
  • Consumer (that subscribes to various topics and pulls data from the brokers)
  • Consumer Group (Every Kafka consumer group consists of one or more consumers that jointly consume a set of subscribed topics)

Every partition in Kafka has one server which plays the role of a Leader, and none or more servers that act as Followers. The Leader performs the task of all read and write requests for the partition, while the role of the Followers is to passively replicate the leader. In the event of the Leader failing, one of the Followers will take on the role of the Leader. This ensures load balancing of the server.

Replicas are essentially a list of nodes that replicate the log for a particular partition irrespective of whether they play the role of the Leader. On the other hand, ISR stands for In-Sync Replicas. It is essentially a set of message replicas that are synced to the leaders.

If a Replica stays out of the ISR for a long time, It means that the Follower is unable to fetch data as fast as data accumulated by the Leader.

Partitions are used for fail-over and parallel processing.

Table of Contents

Recommendations

  • 限制集群数量
    • 简化系统架构
      • fewer integration points for data consumers
      • fewer things to operate
      • lower incremental cost for adding new applications
  • Pick A Single Data Format
    • Having a single, company-wide data format for events is critical.
  • The Mathematics of Simplicity
    • 原来系统之间的数据流耦合可以借助与中间的 streaming platform 解耦
  • Choose Your Clients Carefully
    • librdkafka (C) performance/battle-tested
  • Model Events Not Commands
    • the best data model tends to structure messages as events rather than commands.
  • Use Kafka Connect for Connecting Existing Systems and Applications
    • provides a simple plug-in API for reading from source systems or writing to destination systems
    • the connectors have a built-in scale-out model can easily connect very large scale systems like Hadoop
    • the connectors are fault-tolerant
      • if one instances failed and others will detect this and pick up the work
    • connect allows to manage many such connections simply with an easy-to-use REST api
    • Connect helps you to capture whatever metadata is present about data format
    • connectors scale over a collection of connect processes for fault-tolerance and scalability
  • Grokking the Stream Processing Landscape
    • streams api brings native stream processing capabilities to Kafka
    • brings state-of-the-art stream processing capabilities to normal Java applications
    • use this library can do simple transformations on data streams that are automatically made fault-tolerant and are transparently and elastically distributed over the instances of the application.
    • Streams App are just instances of a normal Java application
    • can be deployed and run just like any application would
    • steams library maintaining the state and processing logic in a way that scales dynamically

History

Prior to Kafka 0.8.1.1,

consumers commit their offsets to Zookeeper. Zookeeper does not scale extremely well (especially for writes) when there are a large number of offsets (i.e., consumer-count * partition-count).

Now,

Consumers can commit their offsets in Kafka by writing them to a durable (replicated) and highly available topic. Consumers can fetch offsets by reading from this topic (although we provide an in-memory offsets cache for faster access). i.e., offset commits are regular producer requests (which are inexpensive) and offset fetches are fast memory look ups.

Python

import kafka

def pbmsg_parser(value):
  msg = message_pb2.Message()
  if not msg.ParseFromString(value):
    logger.error('invalid message')
  return msg

consumer = kafka.KafkaConsumer(topic,
		group_id='consumer-x',
		bootstrap_servers='1.2.3.4',
		auto_offset_reset='earliest',
		enable_auto_commit=False,
		value_deserializer=pbmsg_parser,
		reconnect_backoff_ms=100)

while _run:
  try:
    batch = consumer.poll(timeout_ms=10000, max_records=10000)
    cnt = sum(map(len, batch.values()))
    if cnt == 0: continue
    process(batch)
    consumer.commit()
  except Exception as e:
    traceback.print_exc()

Arch

  • Kafka Connect
    • a tool for scalably and reliably streaming data between Apache Kafka and other data systems.
  • Kafka Streams
    • a lightweight library for creating stream processing applications.
  • ZooKeeper
    • Kafka uses Zookeeper to do leadership election of Kafka Broker and Topic Partition pairs.
    • Kafka uses Zookeeper to manage service discovery for Kafka Brokers that form the cluster
    • Zookeeper sends changes of the topology to Kafka, so each node in the cluster knows when a new broker joined, a Broker died, a topic was removed or a topic was added, etc.
    • Zookeeper provides an in-sync view of Kafka Cluster configuration.
  • Topic
    • A topic is a named stream of records
    • Kafka stores topics in logs
    • Topics are broken up into partitions for speed, scalability and size
    • A record is stored on a partition usually by record key or round-robin if the key is missing(default behavior).
    • The record key determines which partition a producer sends the record
    • Consumers consume records in parallel up to the number of partitions
    • The order guaranteed per partition
    • If partitioning by key then all records for the key will be on the same partition which is useful
    • Kafka spreads log’s partitions across multiple servers and disks
    • A topic is associated with a log which is data structure on disk. Kafka appends records from a producer(s) to the end of a topic log. A topic log consists of many partitions that are spread over multiple files which can be spread on multiple Kafka cluster nodes. Consumers read from Kafka topics at their cadence and can pick where they are (offset) in the topic log.
    • Each consumer group tracks offset from where they left off reading.
    • Kafka distributes topic log partitions on different nodes in a cluster for high performance with horizontal scalability. Spreading partitions aids in writing data quickly.
    • Topic log partitions are Kafka way to shard reads and writes to the topic log. Also, partitions are needed to have multiple consumers in a consumer group work at the same time.
    • Kafka replicates partitions to many nodes to provide failover.
  • Scale
    • Kafka writes to filesystem sequentially which is fast. On a modern fast drive, Kafka can easily write up to 700 MB or more bytes of data a second.
    • Kafka scales writes and reads by sharding topic logs into partitions.
    • Multiple producers can write to different partitions of the same topic. Multiple consumers from multiple consumer groups can read from different partitions efficiently.
  • Kafka Brokers
    • A Kafka cluster is made up of multiple Kafka Brokers. Each Kafka Broker has a unique ID (number). Kafka Brokers contain topic log partitions. Connecting to one broker bootstraps a client to the entire Kafka cluster. For failover, you want to start with at least three to five brokers. A Kafka cluster can have, 10, 100, or 1,000 brokers in a cluster if needed
  • Kafka Cluster, Failover, ISRs
    • Kafka supports replication to support failover. Recall that Kafka uses ZooKeeper to form Kafka Brokers into a cluster and each node in Kafka cluster is called a Kafka Broker. Topic partitions can be replicated across multiple nodes for failover. The topic should have a replication factor greater than 1 (2, or 3). For example, if you are running in AWS, you would want to be able to survive a single availability zone outage. If one Kafka Broker goes down, then the Kafka Broker which is an ISR (in-sync replica) can serve data.
  • Kafka Failover vs. Kafka Disaster Recovery
    • Kafka uses replication for failover. Replication of Kafka topic log partitions allows for failure of a rack or AWS availability zone (AZ). You need a replication factor of at least 3 to survive a single AZ failure. You need to use Mirror Maker, a Kafka utility that ships with Kafka core, for disaster recovery. Mirror Maker replicates a Kafka cluster to another data-center or AWS region. They call what Mirror Maker does mirroring as not to be confused with replication.
    • Note there is no hard and fast rule on how you have to set up the Kafka cluster per se. You could, for example, set up the whole cluster in a single AZ so you can use AWS enhanced networking and placement groups for higher throughput, and then use Mirror Maker to mirror the cluster to another AZ in the same region as a hot-standby.
  • Partition
    • A partition is an ordered, immutable record sequence
    • Kafka maintains record order only in a single partition.
    • Kafka continually appended to partitions using the partition as a structured commit log.
    • Records in partitions are assigned sequential id number called the offset.
    • The offset identifies each record location within the partition.
    • topic partitions are a unit of parallelism - a partition can only be worked on by one consumer in a consumer group at a time
    • If a consumer stops, Kafka spreads partitions across the remaining consumer in the same consumer group
    • Each partition has a leader server and zero or more follower servers. Leaders handle all read and write requests for a partition.
    • Followers replicate leaders and take over if the leader dies.
    • Kafka also uses partitions for parallel consumer handling within a group.
    • Kafka distributes topic log partitions over servers in the Kafka cluster. Each server handles its share of data and requests by sharing partition leadership.
  • Replication
    • Kafka chooses one broker’s partition’s replicas as leader using ZooKeeper.
    • The broker that has the partition leader handles all reads and writes of records for the partition.
    • Kafka replicates writes to the leader partition to followers (node/partition pair).
    • A follower that is in-sync is called an ISR (in-sync replica).
    • If a partition leader fails, Kafka chooses a new ISR as the new leader.
    • The record is considered “committed” when all ISRs for partition wrote to their log.
    • Only committed records are readable from consumer.
    • Another partition can be owned by another leader on another Kafka Broker.
    • A consumer can have more than one partition, but a partition can only be used by one consumer in a consumer group at a time. If you only have one partition, then you can only have one consumer.
    • If a consumer in a consumer group dies, the partitions assigned to that consumer is divided up amongst the remaining consumers in that group.
  • Producer
    • Kafka producers send records to topics. The records are sometimes referred to as messages.
    • The producer picks which partition to send a record to per topic.
    • The producer can send records round-robin.
    • The producer could implement priority systems based on sending records to certain partitions based on the priority of the record.
    • Producers write at their cadence so the order of Records cannot be guaranteed across partitions.
    • The producers get to configure their consistency/durability level (ack=0, ack=all, ack=1)
  • Kafka Consumer Groups
    • Consumer groups have names to identify them from other consumer groups.
    • One consumer group might be responsible for delivering records to high-speed, in-memory microservices while another consumer group is streaming those same records to Hadoop.
    • A consumer group has a unique id.
    • Each consumer group is a subscriber to one or more Kafka topics.
    • Each consumer group maintains its offset per topic partition.
    • If you need multiple subscribers, then you have multiple consumer groups.
    • A record gets delivered to only one consumer in a consumer group.
    • Consumers in a consumer group load balance record processing.
    • Consumer membership within a consumer group is handled by the Kafka protocol dynamically.
    • Each consumer in the consumer group is an exclusive consumer of a “fair share” of partitions. This is how Kafka does load balancing of consumers in a consumer group.
    • If new consumers join a consumer group, it gets a share of partitions. If a consumer dies, its partitions are split among the remaining live consumers in the consumer group. This is how Kafka does fail over of consumers in a consumer group.
    • Consumers notify the Kafka broker when they have successfully processed a record, which advances the offset.
    • If a consumer fails before sending commit offset to Kafka broker, then a different consumer can continue from the last committed offset.
    • Kafka implements the at least once behavior, and you should make sure the messages (record deliveries ) are idempotent.
    • Kafka stores offset data in a topic called __consumer_offset. These topics use log compaction, which means they only save the most recent value per key.
    • When a consumer has processed data, it should commit offsets.
    • If consumer process dies, it will be able to start up and start reading where it left off based on offset stored in __consumer_offset or as discussed another consumer in the consumer group can take over.
    • Consumers can’t read un-replicated data. Kafka consumers can only consume messages beyond the High Watermark offset of the partition.
    • High Watermark is the offset of the last record that was successfully replicated to all partition’s followers. Consumer only reads up to the “High Watermark”.
    • A consumer can see a record after the record gets fully replicated to all followers.
    • Log end offset is offset of the last record written to log partition and where producers writes to next.
    • Only a single consumer from the same consumer group can access a single partition. If consumer group count exceeds the partition count, then the extra consumers remain idle. Kafka can use the idle consumers for failover. If there are more partitions than consumer group, then some consumers will read from more than one partition.
    • Consumer with many threads make it hard to guarantee record delivery order without complex thread coordination
    • Thread per consumer does not have to worry about the offset ordering and simpler to manage failover

OP

Install

  1. Setup zookeeper
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181

Service

sudo vi /lib/systemd/system/kafka.service
[Unit]
Description=Kafka
Before=
After=network.target

[Service]
User=azuer_user
CHDIR=
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
Restart=on-abort

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo service kafka start|stop|restart

Commands

cd /opt/kafka/bin
# create topic
kafka-topics.sh -create -zookeeper zk:2181 -replication-factor 3 -partition 1 -topic test
# describe topic
./kafka-topics.sh -describe -zookeeper zk:2181 -topic test
# create message
./kafka-console-producer.sh -broker-list kafka-node:9092 -topic test
# consume message
./kafka-console-consumer.sh -zookeeper zk:2181 -from-begining -topic test

References