arrow_upward
본문 바로가기
DevOps

아파치 카프카 기본 설정

by dawncode 2025. 1. 7.

 

아파치 카프카를 실행하기 위해서는 주키퍼의 설정 파일인 zookeeper.properties와, 카프카 브로커 설정 파일인 server.properties 파일을 설정해야 한다. 세부적인 프로퍼티까진 설명하지는 않지만 설치했을 때 기본적으로 설정된 프로퍼티들을 확인할 것이다.

 

아파치 카프카를 설정하기 전에, 카프카의 설치는 아파치 카프카 공식 페이지에서 다운로드 할 수 있으며 Binary downloads의 tgz 파일의 링크를 wget으로 다운로드하면 된다. 포스팅에서 사용한 버전은 아파치 카프카 3.1.2 버전을 사용했다.

 

다운로드 시 Source download로 소스 파일을 다운로드한다면, 실행 가능한 바이너리 파일로 컴파일 되지 않은 상태이므로 소스 코드를 직접 빌드해서 사용해야 한다. 소스 코드를 수정해서 사용할 것이 아니라면 바이너리 파일을 다운로드하면 된다.

 

아파치 카프카는 카프카를 압축 해제하고 bin 디렉토리의 쉘 파일을 통해서 실행할 수 있으며 각각의 실행 쉘파일은 다음과 같다.

  • bin/zookeeper-server-start.sh conflg/zookeeper.properties: 주키퍼 서버를 실행하기 위한 쉘 스크립트 파일이다. zookeeper.properties 설정 파일을 사용한다.
  • bin/kafka-server-start.sh conflg/server.properties: 카프카 브로커를 시작하기 위한 스크립트로 server.properties 설정 파일을 사용한다.

쉘을 실행할 때 주키퍼를 먼저 실행한 뒤 카프카를 실행해야 한다. 백그라운드로 실행하기 위해서는 쉘 스크립트 파일을 실행할 때 -daemon 옵션을 사용한다. vim으로 쉘 파일을 확인하면 스크립트가 어떻게 동작하는지 간단하게 확인해볼 수 있다.

 

포스팅에서 사용한 버전은 아파치 카프카 3.1.2 버전을 사용했다. 카프카 공식문서에서는 3.5 버전 이상에서는 주키퍼를 Deprecated 표시한다. 아직 카프카 3.5 이하의 버전이 많이 사용되며 여기서는 KRaft가 아닌 주키퍼를 사용하기 위해서는 3.5 이하의 버전을 사용하는 것이 좋다.

 

 

Zookeeper

주키퍼는 분산 시스템에서 여러 서버 간의 코디네이션(coordination)을 위한 중앙 집중형 서비스이다. 주키퍼는 카프카 클러스터의 메타데이터를 관리하고, 클러스터의 상태를 모니터링하며, 분산 환경에서 여러 서버 간의 협업을 조정, 클러스터의 리더 선출, 파티션의 할당과 재조정, 메시지의 오프셋 등을 관리하는 역할을 한다.

 

주키퍼는 일반적으로 최소 3개의 인스턴스를 사용한다. 주키퍼가 과반수 투표를 통해 리더를 선출하고, 과반수 이상의 인스턴스가 살아있어야 정상 동작하기 때문이다. 주키퍼는 보통 홀수 개의 인스턴스를 권장한다.

 

주키퍼를 한 대만 실행하는 주키퍼를 ‘Quick-and-dirty-single-node’라고 부르며, 주키커를 한 대만 실행하여 사용하는 것은 비정상적인 운영임을 뜻하므로 실제 서비스 운영환경에서는 1대만 실행하여 사용하면 안되고 테스트 목적으로만 사용해야 한다.

 

Zookeeper 설정 파일

zookeeper.properties 파일은 주키퍼 서버의 설정 파일이다. 커프카는 클러스터 간의 메타데이터와 상태를 관리하기 위해 주키퍼를 사용한다. 이 파일은 주키퍼 인스턴스를 설정하는 데 필요한 다양한 속성을 포함한다.

 

다음은 zookeeper.properties 파일의 내용을 부분 생략하고 출력한 것이다.

# ... 생략

dataDir=/home/ubuntu/log/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
  • dataDir: 주키퍼가 데이터를 저장하는 디렉토리를 지정한다. 이 경로에 주키퍼의 스냅샷 파일과 로그 파일이 저장된다. 기본은 /tmp/zookeeper로 설정되어 있으며, 이 디렉토리는 주키퍼 서버가 작동하는 동안 생성되는 데이터 파일을 포함한다. /tmp 디렉토리는 서버 재시작 시 초기화 되므로 디렉토리 경로를 커스터마이징 하는 것이 좋다.
  • clientPort: 클라이언트가 주키퍼 서버에 연결하는 데 사용하는 포트를 설정한다. 기본값은 2181이며, 클라이언트는 이 포트를 통해 주키퍼 서버와 통신한다.
  • maxClientCnxns: 특정 IP 주소에서 허용되는 최대 동시 클라이언트 연결 수를 정의한다. 값이 0으로 설정되면 IP 기반 연결 제한이 비활성화되어 모든 클라이언트가 무제한으로 연결할 수 있다. 이는 비생산 환경 구성에서 유용하다.
  • admin.enableServer: 주키퍼의 관리 서버를 활성화할지 여부를 설정한다. 기본값은 false로 설정되어 있으며, 이는 관리 서버가 비활성화되어 포트 충돌을 피하게 한다. 만약 관리 서버를 활성화하고자 한다면, admin.serverPort 프로퍼티를 사용하여 관리 서버의 포트를 설정해야 한다.
  • admin.serverPort: 관리 서버가 사용할 포트를 정의하는 프로퍼티이다. 기본적으로 주석 처리되어 있어 사용되지 않지만, 필요에 따라 설정할 수 있다.

 

 

Kafka 브로커

카프카 브로커는 카프카 클러스터 내에서 메시지를 저장하고 관리하는 서버다. 브로커는 클러스터 내에서 데이터를 분산 저장하고, 프로듀서로부터 데이터를 받아서 토픽에 저장하며, 컨슈머에게 데이터를 제공한다.

 

카프카 클러스터는 여러 브로커로 구성되며, 각 브로커는 고유한 ID를 가진다. 브로커는 주키퍼를 통해 상호 통신하고 메타데이터 정보를 공유한다. 각 브로커는 특정 파티션의 리더 역할을 하며, 다른 브로커들은 해당 파티션의 팔로워 역할을 한다.

 

카프카의 브로커는 복제(Replication)을 통해 데이터 손실을 방지하기 위해서 최소 3개 이상을 사용하는 것을 권장한다.

 

Kafka 브로커 설정 파일

server.properties 파일은 Kafka 서버의 주요 설정 파일로, Kafka 브로커의 동작을 정의한다. 카프카 브로커는 카프카 클러스터 내에서 메시지를 주고받고 저장하는 서버 역할을 하며, 이 파일을 통해 브로커의 동작을 세밀하게 조정할 수 있다.

 

다음은 server.properties 파일의 내용을 부분 생략하고 출력한 것이다. 브로커 설정 파일에 있는 프로퍼티 외에도 많은 프로퍼티들이 있는데, 여기서는 server.properties에서 기본적으로 작성된 부분들에 대해서만 알아볼 것이다.

# ... 생략

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # (4)

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/home/ubuntu/log/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################
# ... 생략

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# ... 생략

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000 

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
  • broker.id: 카프카 브로커의 고유 ID를 설정한다. 카프카 클러스터 내에서 각 브로커는 고유한 ID를 가져야 한다.
  • listeners: 카프카 브로커가 수신할 네트워크 포트를 설정한다. 기본값은 모든 네트워크 인터페이스에서 9092 포트로 들어오는 비암호화된 연결을 수신한다. PLAINTEXT://hostname:9092, SSL://hostname:9092 등, PLAINTEXT는 암호화되지 않은 통신을, SSL은 보안 통신을 설정한다.
  • advertised.listeners: 클라이언트에게 카프카 브로커에 접근 가능한 주소를 지정한다. 클라이언트는 이 주소로 브로커에 연겲한다. 예를 들어, EC2 서버를 사용중이라면 EC2 서버의 advertised.listeners=PLAINTEXT://EC서버IP:포트로 작성하면 된다.
  • listener.security.protocol.map: 각 리스너에 대한 보안 프로토콜을 지정하는데 사용한다. 콤마로 구분해서 [listener_name]:[security_protocol] 형식으로 설정할 수 있다.
    • PLAINTEXT: 암호화나 인증 없이 평문으로 데이터를 전송
    • SSL: SSL/TLS를 사용하여 암호화된 통신을 지원
    • SASL_PLAINTEXT: SASL 인증을 사용하지만 데이터는 암호화되지 않음
    • SASL_SSL: SASL 인증과 SSL/TLS 암호화를 모두 사용
  • num.network.threads: 클라이언트와의 네트워크 I/O 작업을 처리할 스레드 수를 설정한다.
  • num.io.threads: 카프카 브로커 내부에서 디스크 I/O 요청을 처리하기 위한 스레드 수를 설정한다.
  • socket.send.buffer.bytes: 클라이언트와 브로커 간의 소켓에서 송신하는 버퍼 크기를 설정한다.
  • socket.receive.buffer.bytes: 클라이언트와 브로커 간의 소켓에서 수신하는 버퍼 크기를 설정한다.
  • socket.request.max.bytes: 카프카 서버가 한 번에 수락할 수 있는 요청의 최대 크기를 설정한다.
  • log.dirs: 카프카의 로그 파일이 저장될 디렉토리를 설정한다. 기본은 /tmp/kafka-logs 로 되어있으며 이는 서버 재시작 시 초기화 되므로 커스터마이징하는 것이 좋다.
  • num.partitions: 토픽을 생성할 때 기본적으로 생성되는 파티션 수를 설정한다. 파티션 개수가 많아지면 병렬처리 데이터양이 늘어난다.
  • num.recovery.threads.per.data.dir: 각 로그 디렉토리 당 복구 작업에 사용할 스레드 수를 설정한다.
  • offsets.topic.replication.factor: 카프카의 오프셋 정보를 저장하는 __consumer_offsets 토픽의 복제 개수를 설정한다.
  • transaction.state.log.replication.factor: 카프카의 트랜잭션 상태를 기록하기 위한 로그의 복제 개수를 설정한다.
  • transaction.state.log.min.isr: 트랜잭션 커밋을 위해 필요한 최소 복제본 수를 설정한다. ISR은 "In-Sync Replica"를 말한다.
  • log.retention.hours: 카프카 브로커가 저장한 로그가 유지되는 시간을 설정한다. 시간(h) 단위로 기본값은 일주일 간 로그를 유지하고, 지나면 삭제한다.
  • log.retention.bytes: 세그먼트를 삭제하기 위한 로그 파일의 최대 크기를 설정한다. 이 크기를 초과하면 오래된 로그부터 삭제된다. 기본값은 -1로 무한대다.
  • log.segment.bytes: 세그먼트(로그) 파일의 최대 크기를 설정한다. 이 크기를 초과하면 새로운 세그먼트가 생성된다.
  • log.retention.check.interval.ms: 카프카 브로커가 세그먼트(로그)를 삭제하기 위해서 체크하는 주기를 설정한다. ms 단위로 기본값은 5분이다.
  • zookeeper.connect: 카프카 브로커가 사용할 주키퍼 서버의 주소를 설정한다. 여러 주키퍼 서버는 콤마로 구분해서 지정할 수 있다.
  • zookeeper.connection.timeout.ms: 주키퍼를 연결할 때 세션의 타임아웃 시간을 설정한다.
  • group.initial.rebalance.delay.ms: 컨슈머 그룹을 리밸런스하기 전 초기 지연 시간을 설정한다.

 

여기서는 카프카를 설치했을 때 기본적으로 server.properties 파일에서 제공되는 프로퍼티들만 설명했지만 많은 설정이 존재한다.

 

properties 파일에서 설정한 것은 브로커에서 생성하는 모든 토픽에 적용되므로 전역적으로 적용할 내용만 프로퍼티에서 수정하고, 각 토픽마다 적용해야할 옵션들은 kafka-configs.sh 스크립트 파일을 사용해서 동적으로 설정을 변경하는 것이 좋다.

 

카프카 브로커의 모든 설정은 공식문서의 브로커 설정 부분을 참조한다.