Techmaster Việt Nam – Học là có việc
01 tháng 05, 2018 – 4157 lượt xem
Bài này hướng dẫn cách khởi động Kafka qua Docker và lập trình Golang kết nối vào Kafka tạo và nhận message. Kafka là một message broker được một nhóm kỹ sư LinkedIn viết bằng Scala để xây dựng hệ thống microservice vận hành theo sự kiện (event driven microservice architect).
Kafka khác biệt rất rõ với RabbitMQ ở điểm khi message trong queue của RabbitMQ được các receiver (consumer) đón nhận hết thì message này sẽ bị gỡ bỏ ra khỏi queue (hàng đợi). Ngược lại Kafka vẫn lưu message điều này có ích khi cần xử lý lại (replay) các sự kiện trong cơ chế event sourcing.
Rabbit MQ có cơ chế định tuyến event theo binding key và topic hết sức mềm dẻo, linh động. Kafka thì lại có KSQL để truy vấn các sự kiện theo cú pháp SQL và có các connector để bắt thay đổi diễn ra trong MySQL, Postgresql, MongoDB… để tạo ra các event message. Chi tiết so sánh giữa Kafka và RabbitMQ các bạn có thể tham khảo RabbitMQ vs Kafka của tác giả Jack Vanlightly
Bây giờ tôi sẽ nói đến các bước nhanh nhất để nghịch Kafka, rồi sau đó lập trình Golang kết nối vào Kafka. Thay vì tải Kafka và Zookeeper về máy rồi chạy, tôi sẽ chọn dùng Docker để máy host sạch sẽ sau nhiều lần thử nghiệm. Bài thực hành đã được kiểm thử trên hệ điều hành MacOSX 10.11, nếu bạn sử dụng Ubuntu, Zorin thì bạn không cần phải thay đổi gì nhiều. Còn Windows thì tôi không rõ, đã lâu rồi tôi không dùng.
1- Cài đặt Kafka và Zookeeper bằng Docker-compose
Tôi học theo hướng dẫn ở đây Confluent Kafka Docker Quickstart . Nội dung file docker-compose.yml
// Lấy mã nguồn ví dụ về
git clone https://github.com/confluentinc/cp-docker-images
//Chuyển vào thư mục examples
cd examples/kafka-single-node
//Khởi động docker-compose ở chế độ detach mode - chạy background
docker-compose up -d
Nên dùng portainer để quản lý docker containers và network cho trực quan. Nếu lệnh docker-compose thành công bạn sẽ nhìn thấy
Dùng Portainer để quan sát Kafka và Zookeeper
Chú ý hai docker container có tên là kafka và zookeeper.
Mục lục bài viết
Sử dụng host networking
Nếu quan sát kỹ file docker-compose.yml chúng ta sẽ thấy cả zookeeper và kafka sử dụng network mode là host. Có nghĩa các container sẽ tham gia vào network của host OS, chúng ta không cần binding port của container vào host nữa. Đó là nguyên nhân tại sao trong cách lệnh tiếp theo dưới đấy, các bạn sẽ thấy các container khác khi cần trỏ tới zookeeper luôn dùng localhost:32181 bởi cổng 32181 của container zookeeper
Cấu hình mạng trong ví dụ này
Tạo topic
Tiếp theo chúng ta tạo một topic vào Kafka tương đương với table trong cơ sở dữ liệu quan hệ. Chúng ta sẽ thực thi lệnh ngay trên container kafka tạo ra ở bước phía trên.
$ docker exec kafka \
kafka-topics --create --topic foo --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:32181
Created topic "foo".
Chạy lệnh này kiểm tra lại topic foo được tạo ra chưa?
$ docker exec kafka \
kafka-topics --describe --topic foo --zookeeper localhost:32181
Topic:foo PartitionCount:1 ReplicationFactor:1 Configs:
Topic: foo Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Gửi thử một số message vào. Chú ý option -it để chúng ta gõ các message thành từng dòng khác nhau
$ docker exec -it kafka \
kafka-console-producer --request-required-acks 1 --broker-list localhost:29092 --topic foo
> Hello
> World
Khởi động kafka consumer để đọc các message thuộc topic foo. Chú ý lựa chọn –from-beginning sẽ lấy tất cả các message từ trước tới giờ
$ docker exec kafka \
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --from-beginning
Hello
World
Để bỏ qua 10 message đầu tiên, chú ý bộ tham số –partition 0 –offset 10 . Nếu đã dùng tham số offset thì phải có tham số partition đi kèm
docker exec kafka \
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo --partition 0 --offset 10
Giờ hãy thử tạo ra các message đều đặn, liên tục.
$ docker exec -it kafka \
kafka-console-producer --request-required-acks 1 --broker-list localhost:29092 --topic foo
> Lan
> Long
> Linh
Rồi liên tục lấy những message mới nhất
$ docker exec kafka \
kafka-console-consumer --bootstrap-server localhost:29092 --topic foo
Lan
Long
Linh
Lập trình Golang kết nối vào Kafka.
Tạo một Dockerfile kế thừa từ golang:alpine. Trong Dockerfile này tôi cài thêm librdkafka và github.com/confluentinc/confluent-kafka-go/kafka
FROM golang:alpine
ARG LIBRDKAFKA_VERSION=0.11.4-r1
RUN apk update && \
apk add --upgrade apk-tools && \
apk add libressl --update-cache --repository http://nl.alpinelinux.org/alpine/edge/main && \
apk add librdkafka=${LIBRDKAFKA_VERSION} --update-cache --repository http://nl.alpinelinux.org/alpine/edge/community && \
apk add librdkafka-dev=${LIBRDKAFKA_VERSION} --update-cache --repository http://nl.alpinelinux.org/alpine/edge/community && \
apk add git openssh openssl yajl-dev zlib-dev cyrus-sasl-dev openssl-dev build-base coreutils && \
go get github.com/confluentinc/confluent-kafka-go/kafka
Sau đó build docker image có tên là gokafka
docker build -t gokafka .
Tiếp đó khởi động docker container từ docker image vừa mới build. Chú ý tôi tạo volume mapping từ thư mục PathToYourCode vào thư mục app trong container, sau đó đặt app làm thư mục làm việc, working directory. Container này cũng join vào network host cùng với zookeeper và kafka
docker run --net=host -it --name gokafka -v /PathToYourCode:/app -w /app gokafka:latest /bin/sh
Caption
Tạo hai file producer.go và consumer.go ở thư mục PathToYourCode. Chú ý đường dẫn đến kafka là localhost:29092
producer.go
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:29092"})
if err != nil {
panic(err)
}
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := "foo"
for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message deliveries
p.Flush(15 * 1000)
}
consumer.go
package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:29092",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"foo", "^aRegex.*[Tt]opic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
break
}
}
c.Close()
}
Ở dấu nhắc của docker run gokafka, chạy file consumer.go. Kết quả trả về có thể khác tuỳ vào nội dung thực tế trong các message đang nằm trong Kafka.
/app # go run consumer.go
Message on foo[0]@62: Lan
Message on foo[0]@63: Long
Message on foo[0]@64: Linh
Một số chú ý thêm
Chúng ta không nhất thiết phải chạy lệnh trên chính container kafka, mà chúng ta có thể dùng container khác ví dụ như binami/kafka
$ docker run -it --rm \
--network host \
-e KAFKA_ZOOKEEPER_CONNECT=localhost:32181 \
bitnami/kafka:latest kafka-topics.sh --list --zookeeper localhost:32181
__confluent.support.metrics
__consumer_offsets
bar
foo
Đơn giản hơn, ví dụ tạo mới topic order
$ docker run --net=host --rm confluentinc/cp-kafka:latest \
kafka-topics --create --topic order --partitions 1 \
--replication-factor 1 --if-not-exists --zookeeper localhost:32181
Created topic "order".
Liệt kê các topic
$ docker run --net=host --rm confluentinc/cp-kafka:latest \
kafka-topics --list --zookeeper localhost:32181
__confluent.support.metrics
__consumer_offsets
bar
foo
order