쿠버네티스 일반

Strimzi Kafka Operator - Kafka 토픽 생성

Jerry_이정훈 2022. 6. 12. 06:44
728x90

지난 포스팅에 이어 이번에는 카프카 토픽을 만들어 보는 실습을 진행하였다.

실습 내역

  • 쿠버네티스 Strimzi CRD를 이용하여 카프카 토픽 생성 및 조회

그럼, 카프카 토픽을 YAML 파일로 만들어본다. 지난 시간에 말한 Operator가 편리한게 카프카 토픽 같은 쿠버네티스 리소스가 아닌 것도 Custom으로 YAML 파일로 만들 수 있는 것이다. 쿠버네티스에 친숙한 YAML 파일 형식이라 거부감이 없고 명령어가 아닌 소스 코드 기반으로 재 사용성이 훨씬 낫다.

## visualcode 등으로 아래 내용을 kafakatopic-crd.yaml 파일에 저장한다. 
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: mytopic1
  labels:
    strimzi.io/cluster: "my-cluster"
spec:
  ## 토픽을 3개로 만든다.
  partitions: 3
  ## 카프카 클러스터 생성 시 replicas 설정과 동일
  replicas: 3
  config:
  ## 로그 메시지 보관 기간 설정 
    retention.ms: 7200000
    segment.bytes: 1073741824
    min.insync.replicas: 2
  • metadata.name: mytopic1
  • 토픽 이름을 mytopic1로 지정하였다. mytopic1 이름으로 속성등이 조회 가능하다.
  • spec.partitions: 3
  • 하나의 토픽은 3개의 파티션으로 나누어져서 브로커에 저장된다. 3개로 나누어지니 메시지를 받는 consume 입장에서 3개를 병렬로 받을 수 있으니 빠르게 메시지를 받아갈 수 있다.
  • spec.replicas: 3
  • 하나의 토픽이 3개로 복사해서 저장된다. 3벌 복사하니 1,2개의 브로커가 fail 되어도 정상적으로 메시지는 전달 가능하다.
  • spec.config.retention.ms: 7200000
  •   메시지를 로그 파일에 보관하는 시간이다. millisecond 단위라 7,200,000ms는 120분 2시간 저장하도록 하였다. 설정에 따라 변경하면 된다.
  • https://kafka.apache.org/documentation/#brokerconfigs_log.retention.ms

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

해당 YAML로 리소스를 생성한다.

## 일반 쿠버네티스 리소스 생성하듯이 YAML 파일을 apply 한다.
$ (⎈ |ubun01:kafka) k apply -f kafkatopic-crd.yaml
kafkatopic.kafka.strimzi.io/mytopic1 created

## 기본 설치 시 제공하는 kafkatopic과 함께 우리가 생성한 mytopic1 kafkatopic이 만들어졌다.
# jerry @ Jerrys-MacBook-Pro in ~/01.works/01.k8s_switch_helm/kafka on git:master x [6:05:11]
$ (⎈ |ubun01:kafka) k get kafkatopics.kafka.strimzi.io
NAME                                                                                               CLUSTER      PARTITIONS   REPLICATION FACTOR   READY
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                                        my-cluster   50           3                    True
mytopic1                                                                                           my-cluster   3            3                    True
strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55                                     my-cluster   1            3                    True
strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b   my-cluster   1            3                    True

카프카 토픽도 리소스로 조회되니 참 편리하다.

$ (⎈ |ubun01:kafka) k describe kafkatopics.kafka.strimzi.io mytopic1
(생략)
Spec:
  Config:
    min.insync.replicas:  2
    retention.ms:         7200000
    segment.bytes:        1073741824
  Partitions:             3
  Replicas:               3

다른 리소스와 동일하게 describe로 중요 정보를 조회할 수 있다.

이제 카프카 클라이언트 파드를 생성해서 해당 토픽을 정상적으로 조회되는지 확인해본다. 카프카 클라이언트 파드는 bitnami 이미지를 사용하였다.

## kafka-client-pod.yaml 이름으로 만든다.
apiVersion: v1
kind: Pod
metadata:
  name: myclient1
  namespace: kafka
spec:
  containers:
  - name: kafka-client
    ## bitnami/kafka 이미지 사용하면 다양한 kafka 명령어를 사용할 수 있다.
    image: bitnami/kafka
    # image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"

카프카 클라이언트 파드를 만든다.

$ (⎈ |ubun01:kafka) k apply -f kafka-client-pod.yaml
pod/myclient1 created

$ (⎈ |ubun01:kafka) k get pod
(...)
myclient1                                     1/1     Running   0             47s

카프카 클라이언트에서 사용할 카프카 접속 주소는 서비스 이름으로 확인할 수 있다.

$ (⎈ |ubun01:kafka) k get svc
NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.233.63.250   <none>        9091/TCP,9092/TCP,9093/TCP            38h
my-cluster-kafka-brokers      ClusterIP   None            <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   38h
my-cluster-zookeeper-client   ClusterIP   10.233.39.51    <none>        2181/TCP                              38h
my-cluster-zookeeper-nodes    ClusterIP   None            <none>        2181/TCP,2888/TCP,3888/TCP            38h

최초 설정 시 사용한 포트번호 9092가 TLS 없이 접속 시 사용 가능한 번호이다. 주소는 'my-cluster-kafka-bootstrap', 'my-cluster-kafka-brokers' 2개 모두 사용 가능하다. 편의 상 환경 변수에 등록한다.

$ (⎈ |ubun01:kafka) SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092

이제 카프카 클라이언트 파드에서 토픽을 확인한다.

$ (⎈ |ubun01:kafka) kubectl exec -it myclient1 -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe

Topic: mytopic1	TopicId: T6Yi5oGuQgOC8-yQSDDa1w	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
	Topic: mytopic1	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1
	Topic: mytopic1	Partition: 1	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
	Topic: mytopic1	Partition: 2	Leader: 1	Replicas: 1,0,2	Isr: 1,0,2

위와 같이 정상적으로 카프카 토픽과 파티션, replicas 설정을 확인할 수 있다. Leader를 보면 0,1,2 인데, 이는 브로커 3대에 분산되어 있고 각각의 메시지는 3벌 복제되어 각 파티션마다 다른 브로커에 저장되어 있는 걸 알 수 있다.

카프카 클라이언트를 하나 더 생성해서 양 쪽 파드에서 메시지가 정상적으로 주고 받는지 확인한다.

apiVersion: v1
kind: Pod
metadata:
  name: myclient2
  namespace: kafka
spec:
  containers:
  - name: kafka-client
    image: bitnami/kafka
    # image: quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
    command:
      - sh
      - -c
      - "exec tail -f /dev/null"
$ (⎈ |ubun01:kafka) k apply -f kafka-client-pod.yaml
pod/myclient2 created

# jerry @ Jerrys-MacBook-Pro in ~/01.works/01.k8s_switch_helm/kafka on git:master x [6:38:46]
$ (⎈ |ubun01:kafka) k get pod
NAME                                          READY   STATUS              RESTARTS      AGE
myclient1                                     1/1     Running   0             19m
myclient2                                     1/1     Running   0             42s
## kafka-console-producer.sh 명령어로 카프카 토픽을 만들 수 있다.
$ (⎈ |ubun01:kafka) kubectl exec -it myclient1 -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
1
2
3

## kafka-console-consumer.sh 메시지로 생성한 토픽을 확인할 수 있다. 터미널을 2개로 나누고 확인한다.
$ (⎈ |ubun01:kafka) kubectl exec -it myclient2 -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
1
2
3

터미널을 분리하면 아래와 같이 2개의 화면에서 메시지가 주고 받는 걸 확인할 수 있다.

반응형