Techmaster Việt Nam – Học là có việc

01 tháng 05, 2018 – 4157 lượt xem

Bài này hướng dẫn cách khởi động Kafka qua Docker và lập trình Golang kết nối vào Kafka tạo và nhận message. Kafka là một message broker được một nhóm kỹ sư LinkedIn viết bằng Scala để xây dựng hệ thống microservice vận hành theo sự kiện (event driven microservice architect).

Kafka khác biệt rất rõ với RabbitMQ ở điểm khi message trong queue của RabbitMQ được các receiver (consumer) đón nhận hết thì message này sẽ bị gỡ bỏ ra khỏi queue (hàng đợi). Ngược lại Kafka vẫn lưu message điều này có ích khi cần xử lý lại (replay)  các sự kiện trong cơ chế event sourcing.
Rabbit MQ có cơ chế định tuyến event theo binding key và topic hết sức mềm dẻo, linh động. Kafka thì lại có KSQL để truy vấn các sự kiện theo cú pháp SQL và có các connector để bắt thay đổi diễn ra trong MySQL, Postgresql, MongoDB… để tạo ra các event message. Chi tiết so sánh giữa Kafka và RabbitMQ các bạn có thể tham khảo RabbitMQ vs Kafka của tác giả Jack Vanlightly

Bây giờ tôi sẽ nói đến các bước nhanh nhất để nghịch Kafka, rồi sau đó lập trình Golang kết nối vào Kafka. Thay vì tải Kafka và Zookeeper về máy rồi chạy, tôi sẽ chọn dùng Docker để máy host sạch sẽ sau nhiều lần thử nghiệm. Bài thực hành đã được kiểm thử trên hệ điều hành MacOSX 10.11, nếu bạn sử dụng Ubuntu, Zorin thì bạn không cần phải thay đổi gì nhiều. Còn Windows thì tôi không rõ, đã lâu rồi tôi không dùng.

1- Cài đặt Kafka và Zookeeper bằng Docker-compose

Tôi học theo hướng dẫn ở đây Confluent Kafka Docker Quickstart  . Nội dung file docker-compose.yml

// Lấy mã nguồn ví dụ về
git clone https://github.com/confluentinc/cp-docker-images

//Chuyển vào thư mục examples
cd examples/kafka-single-node

//Khởi động docker-compose ở chế độ detach mode - chạy background
docker-compose up -d

Nên dùng portainer để quản lý docker containers và network cho trực quan. Nếu lệnh docker-compose thành công bạn sẽ nhìn thấy
 Portainer xem container Zookeeper và KafkaDùng Portainer để quan sát Kafka và Zookeeper

Chú ý hai docker container có tên là kafka và zookeeper.

Sử dụng host networking

Nếu quan sát kỹ file docker-compose.yml chúng ta sẽ thấy cả zookeeper và kafka sử dụng network mode là host. Có nghĩa các container sẽ tham gia vào network của host OS, chúng ta không cần binding port của container vào host nữa. Đó là nguyên nhân tại sao trong cách lệnh tiếp theo dưới đấy, các bạn sẽ thấy các container khác khi cần trỏ tới zookeeper luôn dùng localhost:32181 bởi cổng 32181 của container zookeeper 
 kafka zookeeper dockerCấu hình mạng trong ví dụ này

Tạo topic

Tiếp theo chúng ta tạo một topic vào Kafka tương đương với table trong cơ sở dữ liệu quan hệ. Chúng ta sẽ thực thi lệnh ngay trên container kafka tạo ra ở bước phía trên.

$ docker exec kafka  \                                                                                                   
kafka-topics --create --topic foo --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181

Created topic "foo".

Chạy lệnh này kiểm tra lại topic foo được tạo ra chưa?

$ docker exec kafka  \                                                                                                  
     kafka-topics --describe --topic foo --zookeeper localhost:32181

Topic:foo	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: foo	Partition: 0	Leader: 1001	Replicas: 1001	Isr: 1001

Gửi thử một số message vào. Chú ý option -it để chúng ta gõ các message thành từng dòng khác nhau

$ docker exec -it kafka  \
  kafka-console-producer --request-required-acks 1 --broker-list localhost:29092 --topic foo
> Hello
> World

Khởi động kafka consumer để đọc các message thuộc topic foo. Chú ý lựa chọn –from-beginning sẽ lấy tất cả các message từ trước tới giờ 

$ docker exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --from-beginning

Hello
World

Để bỏ qua 10 message đầu tiên, chú ý bộ tham số –partition 0 –offset 10 . Nếu đã dùng tham số offset thì phải có tham số partition đi kèm

docker exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --partition 0 --offset 10

Giờ hãy thử tạo ra các message đều đặn, liên tục.

$ docker exec -it kafka  \
  kafka-console-producer --request-required-acks 1 --broker-list localhost:29092 --topic foo
> Lan
> Long
> Linh

Rồi liên tục lấy những message mới nhất 

$ docker exec kafka  \
  kafka-console-consumer --bootstrap-server localhost:29092 --topic foo
Lan
Long
Linh

Lập trình Golang kết nối vào Kafka.

Tạo một Dockerfile kế thừa từ golang:alpine. Trong Dockerfile này tôi cài thêm librdkafka và github.com/confluentinc/confluent-kafka-go/kafka

FROM golang:alpine

ARG LIBRDKAFKA_VERSION=0.11.4-r1

RUN apk update && \
	apk add --upgrade apk-tools && \
    apk add libressl --update-cache --repository http://nl.alpinelinux.org/alpine/edge/main && \
    apk add librdkafka=${LIBRDKAFKA_VERSION} --update-cache --repository http://nl.alpinelinux.org/alpine/edge/community && \
    apk add librdkafka-dev=${LIBRDKAFKA_VERSION} --update-cache --repository http://nl.alpinelinux.org/alpine/edge/community && \
    apk add git openssh openssl yajl-dev zlib-dev cyrus-sasl-dev openssl-dev build-base coreutils && \
    go get github.com/confluentinc/confluent-kafka-go/kafka

Sau đó build docker image có tên là gokafka

docker build -t gokafka .

Tiếp đó khởi động docker container từ docker image vừa mới build. Chú ý tôi tạo volume mapping từ thư mục PathToYourCode vào thư mục app trong container, sau đó đặt app làm thư mục làm việc, working directory. Container này cũng join vào network host cùng với zookeeper và kafka
 

docker run --net=host -it --name gokafka -v /PathToYourCode:/app -w /app gokafka:latest /bin/sh

Caption

Tạo hai file producer.go và consumer.go ở thư mục PathToYourCode. Chú ý đường dẫn đến kafka là localhost:29092

producer.go

package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:29092"})
	if err != nil {
		panic(err)
	}

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "foo"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries
	p.Flush(15 * 1000)
}

consumer.go

package main
import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost:29092",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"foo", "^aRegex.*[Tt]opic"}, nil)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else {
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
			break
		}
	}

	c.Close()
}

Ở dấu nhắc của docker run gokafka, chạy file consumer.go. Kết quả trả về có thể khác tuỳ vào nội dung thực tế trong các message đang nằm trong Kafka.

/app # go run consumer.go

Message on foo[0]@62: Lan
Message on foo[0]@63: Long
Message on foo[0]@64: Linh

Một số chú ý thêm

Chúng ta không nhất thiết phải chạy lệnh trên chính container kafka, mà chúng ta có thể dùng container khác ví dụ như binami/kafka

$ docker run -it --rm \
--network host \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
bitnami/kafka:latest kafka-topics.sh --list  --zookeeper localhost:32181

__confluent.support.metrics
__consumer_offsets
bar
foo

Đơn giản hơn, ví dụ tạo mới topic order

$ docker run --net=host --rm confluentinc/cp-kafka:latest \
kafka-topics --create --topic order --partitions 1 \
--replication-factor 1 --if-not-exists --zookeeper localhost:32181

Created topic "order".

Liệt kê các topic

$ docker run --net=host --rm confluentinc/cp-kafka:latest \
           kafka-topics --list --zookeeper localhost:32181

__confluent.support.metrics
__consumer_offsets
bar
foo
order