Kafka Notes

7 minute read

Kafka

Data streaming in real-time:

  • Metrics - collect and aggregate monitoring data
  • Log aggregation
  • Stream processing
  • Commit log - external commit log for any distributed system, disaster recovery - saved state
  • Website activity tracking
  • Click information of a page
  • Product suggestions
  • Credit card transactions

Functional Requirements

  • Data Streaming - the system should be able to collect and deliver data efficiently
  • Low latency
  • High throughput
  • Batch processing

Non-Functional Requirements

  • Distributed / Parallel architecture
  • Scalability - support horizontal scaling
  • Data Retention - should be able to retain data durably so that the data is not lost even if consumers are not extracting and data

Message

  • Basic unit of data
  • Ex: row in a csv file or a record in db
  • Contains a payload of bytes (could also have a payload of metadata such as key)
  • Key is used to write messages to a specific partition in a topic

NOTE: A topic can have multiple partitions

Batch

  • Messages are exported to Kafka in batches to increase throughput
  • A batch is a set of messages that exist in the same partition of a topic
  • Batching messages eliminates the need for each message to require a full TCP/IP roundtrip

Topic

  • Messages are categorized into a particular type, those particular type of messages collectively are called a topic
  • We can think of topics as a table in a db or a folder in a filesystem
  • Topics are composed of partitions.
  • Messages can only be appended to partitions of a topic, which are read from beginning to end.
  • Messages from a single partition are read in an orderly fashion.
  • However, because there are multiple partitions in a topic, there is no guarantee of ordered messages across the whole topic
  • A topic provides scalability to Kafka because its partitions can be saved in different servers, making it horizontally scalable across several servers

Architecture

Three main components

  • Producer
    • Publish messages to the brokers in the form of topics
    • Can publish multiple messages (batch) in a single request
    • Any producer can interact with multiple servers
  • Broker
    • They are a set of servers
    • They store the messages published by the producers
  • Consumer
    • They subscribe to a set of messages in the brokers
    • They consume the subscribed set of messages from the brokers
    • Any consumer can interact with multiple brokers
    • Consumer API
      • Unlike conventional iterators, the iterator in consumers does not terminate when there are no more messages to consume. It stops iterating until new messages are published.
      • Kafka supports both the point-to-point (any consumer can fetch data) and publish-subscribe delivery methods.

Simple Storage

Implementation of a partition

  • A partition can be implemented as a large file
  • A partition in a topic is implemented like a logical log that comprises a set of segment files approximately of the same size
  • These segment files are flushed to the disk. However, to achieve better performance, the system waits till a segment file has gathered a certain amount of data or if a certain amount of time has passed before writing it to the disk, whichever happens first

How does Kafka clear up space to make space for more data?

  • Two configurable parameters can be set to delete messages from the brokers, the quantity of data that has been gathered (1 GB by default) or the time it has been retained (7 days by default).
  • If data exceeds 1 GB in a topic, or a topic’s data is retained for a week, Kafka starts deleting messages in the oldest segment file of that topic. Moreover, Kafka cannot delete messages from an active segment.

What offset will the consumer use for the first pull request on a partition?

  • A partition’s offset starts with 0. So, the 0 offset will be used by the consumer in the first pull request.

Where is the consumer offset stored?

  • Zookeeper - incase a consumer fails
  • Maintain the consumption from brokers and track the offset of the last consumed messages of a partition.

Efficient Transfer

  • Data flow in batches

  • Caching

The benefits that page cache and caching heuristics provide are as follows:

  • We avoid double buffering by avoiding writing messages explicitly into the memory of a user-land Kafka process. (It is called double buffering because the data is buffered firsts by Kafka application and second by the implicit cache managed by the file system’s page cache.)
  • Kafka takes advantage of OS’s implicit cache and does not create its own cache (kafka process)

  • Optimized network access
    • sendfile() api - send bytes of a file directly to a remote socket

Stateless broker

Information at the broker

  • The consumers keep track of the offset with zookeeper, so the broker can scaled horizontally

Deletion of messages

  • Maintaining consumption details by consumers creates a void of information at the broker. It makes it difficult to delete a message at the broker
  • For this sole purpose, Kafka employs a time-based service level agreement (SLA) to retain messages. Kafka saves 1 GB of data or the data of a week, whichever is small in the log segment files, and after a segment of the file has become seven days old or has 1 GB of data in it (whichever condition is met first), it is deleted and replaced by a new segment, if available.

Distributed Coordination in Kafka

Consumption of messages

  • Partitions are made to be the smallest unit of parallelism
  • Each partition is only consumed by one consumer
  • Otherwise we would need locking and state maintenance which would be an overhead
  • To balance the load with fewer consumers, one thing that can be done is over-partitioning a topic and assigning multiple partitions to each consumer

Zookeeper

  • Distributed key-value store and is used for coordination and storing configurations
  • Highly optimized for reads
  • Coordinates b/w kafka brokers (multiple deployed over servers)

  • Topics → can be split into partitions for scalability, multiple subscribers can read from the same topic
  • Replay

Consumer registry: A consumer saves its information in the consumer registry whenever it starts. The information includes the consumer group it belongs to and its subscribed topics.

Broker registry: A broker saves its information in the broker registry whenever it starts. The information includes the hostname, the port of the broker, and the set of partitions stored in it.

Ownership registry: It has the information related to each subscribed partition in a broker. The information includes the consumer ID that is consuming messages from a partition. So, it tells which partition is being consumed by which consumer.

Offset registry: The offset registry also stores information related to each subscribed partition. The information includes just the offset of the last message consumed in a partition.

ZooKeeper watcher

A broker and consumer registry can also have a watcher registered on them by each consumer or by each broker. The watcher notifies about the changes that occur in the broker and consumer registry.

  • A change in the broker registry occurs when a topic that a consumer is consuming is modified.
  • A change in the consumer registry occurs whenever a consumer fails, or a new consumer is added.

The watcher can also trigger a load re-balance whenever a new consumer is initialized or when a change occurs in the broker or consumer.

Re-balancing process

  • The consumers divides the un-owned partitions evenly among themselves

Delivery Guarantees of Kakfa

  • At least once delivery
    • Default delivery guarantee
    • Happens when a consumer fails to persist the offset after reading the data, the new consumer will consume from the last offset
  • Exactly once delivery
    • We can use unique keys in the messages and write the results to a system that supports these keys.
    • idempotent writes
  • In order delivery
    • Broker writes all the messages to a single partition in the given order
    • The order cannot be guaranteed if those messages are written to multiple partitions

Replication

  • Leader partition - consumers consumes and producers produce to this partition
  • In sync partitions
    • Replica partitions that sent a heartbeat to the ZooKeeper in the last 6 seconds (can be configured by the user).
    • Replica partitions that fetched messages from the leader in the last 10 seconds (can be configured by the user).
  • The in-sync replicas that are lagging a bit slow down the production and consumption of messages.
  • If the producers and consumers do not wait for the in-sync replicas for a certain amount of time and keep producing and consuming messages, they will become out-of-sync and if all replicas become out-of-sync and we lose the broker with the leader partition

Batching

  • Makes kafka more efficient
  • In general, batching can be used to send messages and save RPC overhead. We can send bulk messages in one call.
    • Two params:
      • Batch size
      • Linger time (the time producer waits before sending the collected messages)
      • Which ever happens first

Updated: