Strimzi Kafka Operator - Kafka 토픽 생성
지난 포스팅에 이어 이번에는 카프카 토픽을 만들어 보는 실습을 진행하였다.
실습 내역
- 쿠버네티스 Strimzi CRD를 이용하여 카프카 토픽 생성 및 조회
그럼, 카프카 토픽을 YAML 파일로 만들어본다. 지난 시간에 말한 Operator가 편리한게 카프카 토픽 같은 쿠버네티스 리소스가 아닌 것도 Custom으로 YAML 파일로 만들 수 있는 것이다. 쿠버네티스에 친숙한 YAML 파일 형식이라 거부감이 없고 명령어가 아닌 소스 코드 기반으로 재 사용성이 훨씬 낫다.
## visualcode 등으로 아래 내용을 kafakatopic-crd.yaml 파일에 저장한다.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: mytopic1
labels:
strimzi.io/cluster: "my-cluster"
spec:
## 토픽을 3개로 만든다.
partitions: 3
## 카프카 클러스터 생성 시 replicas 설정과 동일
replicas: 3
config:
## 로그 메시지 보관 기간 설정
retention.ms: 7200000
segment.bytes: 1073741824
min.insync.replicas: 2
- metadata.name: mytopic1
- 토픽 이름을 mytopic1로 지정하였다. mytopic1 이름으로 속성등이 조회 가능하다.
- spec.partitions: 3
- 하나의 토픽은 3개의 파티션으로 나누어져서 브로커에 저장된다. 3개로 나누어지니 메시지를 받는 consume 입장에서 3개를 병렬로 받을 수 있으니 빠르게 메시지를 받아갈 수 있다.
- spec.replicas: 3
- 하나의 토픽이 3개로 복사해서 저장된다. 3벌 복사하니 1,2개의 브로커가 fail 되어도 정상적으로 메시지는 전달 가능하다.
- spec.config.retention.ms: 7200000
- 메시지를 로그 파일에 보관하는 시간이다. millisecond 단위라 7,200,000ms는 120분 2시간 저장하도록 하였다. 설정에 따라 변경하면 된다.
- https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms
해당 YAML로 리소스를 생성한다.
## 일반 쿠버네티스 리소스 생성하듯이 YAML 파일을 apply 한다.
$ (⎈ |ubun01:kafka) k apply -f kafkatopic-crd.yaml
kafkatopic.kafka.strimzi.io/mytopic1 created
## 기본 설치 시 제공하는 kafkatopic과 함께 우리가 생성한 mytopic1 kafkatopic이 만들어졌다.
# jerry @ Jerrys-MacBook-Pro in ~/01.works/01.k8s_switch_helm/kafka on git:master x [6:05:11]
$ (⎈ |ubun01:kafka) k get kafkatopics.kafka.strimzi.io
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 3 True
mytopic1 my-cluster 3 3 True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55 my-cluster 1 3 True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b my-cluster 1 3 True
카프카 토픽도 리소스로 조회되니 참 편리하다.
$ (⎈ |ubun01:kafka) k describe kafkatopics.kafka.strimzi.io mytopic1
(생략)
Spec:
Config:
min.insync.replicas: 2
retention.ms: 7200000
segment.bytes: 1073741824
Partitions: 3
Replicas: 3
다른 리소스와 동일하게 describe로 중요 정보를 조회할 수 있다.
이제 카프카 클라이언트 파드를 생성해서 해당 토픽을 정상적으로 조회되는지 확인해본다. 카프카 클라이언트 파드는 bitnami 이미지를 사용하였다.
## kafka-client-pod.yaml 이름으로 만든다.
apiVersion: v1
kind: Pod
metadata:
name: myclient1
namespace: kafka
spec:
containers:
- name: kafka-client
## bitnami/kafka 이미지 사용하면 다양한 kafka 명령어를 사용할 수 있다.
image: bitnami/kafka
# image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
command:
- sh
- -c
- "exec tail -f /dev/null"
카프카 클라이언트 파드를 만든다.
$ (⎈ |ubun01:kafka) k apply -f kafka-client-pod.yaml
pod/myclient1 created
$ (⎈ |ubun01:kafka) k get pod
(...)
myclient1 1/1 Running 0 47s
카프카 클라이언트에서 사용할 카프카 접속 주소는 서비스 이름으로 확인할 수 있다.
$ (⎈ |ubun01:kafka) k get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 10.233.63.250 <none> 9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 38h
my-cluster-zookeeper-client ClusterIP 10.233.39.51 <none> 2181/TCP 38h
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 38h
최초 설정 시 사용한 포트번호 9092가 TLS 없이 접속 시 사용 가능한 번호이다. 주소는 'my-cluster-kafka-bootstrap', 'my-cluster-kafka-brokers' 2개 모두 사용 가능하다. 편의 상 환경 변수에 등록한다.
$ (⎈ |ubun01:kafka) SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
이제 카프카 클라이언트 파드에서 토픽을 확인한다.
$ (⎈ |ubun01:kafka) kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe
Topic: mytopic1 TopicId: T6Yi5oGuQgOC8-yQSDDa1w PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: mytopic1 Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: mytopic1 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
위와 같이 정상적으로 카프카 토픽과 파티션, replicas 설정을 확인할 수 있다. Leader를 보면 0,1,2 인데, 이는 브로커 3대에 분산되어 있고 각각의 메시지는 3벌 복제되어 각 파티션마다 다른 브로커에 저장되어 있는 걸 알 수 있다.
카프카 클라이언트를 하나 더 생성해서 양 쪽 파드에서 메시지가 정상적으로 주고 받는지 확인한다.
apiVersion: v1
kind: Pod
metadata:
name: myclient2
namespace: kafka
spec:
containers:
- name: kafka-client
image: bitnami/kafka
# image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
command:
- sh
- -c
- "exec tail -f /dev/null"
$ (⎈ |ubun01:kafka) k apply -f kafka-client-pod.yaml
pod/myclient2 created
# jerry @ Jerrys-MacBook-Pro in ~/01.works/01.k8s_switch_helm/kafka on git:master x [6:38:46]
$ (⎈ |ubun01:kafka) k get pod
NAME READY STATUS RESTARTS AGE
myclient1 1/1 Running 0 19m
myclient2 1/1 Running 0 42s
## kafka-console-producer.sh 명령어로 카프카 토픽을 만들 수 있다.
$ (⎈ |ubun01:kafka) kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
1
2
3
## kafka-console-consumer.sh 메시지로 생성한 토픽을 확인할 수 있다. 터미널을 2개로 나누고 확인한다.
$ (⎈ |ubun01:kafka) kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
1
2
3
터미널을 분리하면 아래와 같이 2개의 화면에서 메시지가 주고 받는 걸 확인할 수 있다.