Kafka是什么?

官方给出的解释

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
  • 官方如此解释,在技术应用和应聘职位上,我们称之为消息队列。Kafka是目前很主流的应用,在前些年可能使用各种MQ。我对Kafka的理解就是,他目前是最稳定、效率最快的一个消息处理平台。
  • 当我们开发一个项目,Kafka也可以帮助我们减少耦合度,主要需要处理的东西和次要需要处理的事务分开,在处理消息时,中间多了一个步骤,就是让Kafka来进行处理,这样来确保主程序不会受到干扰。
  • Kafka集群的另外一个优点在于,他可以是很多个,如果有一个所谓的leader挂掉了后,其他的follow将会承担leader的职责继续运行程序,他也有副本的功能。所以总而言之,他作为集群来说,是目前简易而又稳定的平台。

如何下载和安装

简单下载

Kafka官方链接:点击此处前往官网网站
左上角点击Download进入下载界面:点击直接进入官方网站下载界面
Kafka-download-1.png

我们要先选择Kafka的对应版本,然后通过提示一共有3个链接(分为两块),圈起来的第一部分是src源代码,第二部分是我们要下载的,一般选择第一个。点击 kafka_2.12-3.2.3.tgz 即可完成下载。

下载后进行解压,解压后可以看见里面的内容如下(这里使用的是MAC系统演示的)
Kafka-Study-1.png

需要注意的是 binconfig 文件夹,bin文件夹中存放着各种 .sh 启动脚本文件,config文件夹中之存放着各种配置文件,libs中存放的是各种依赖的包,我们可以看见Kafka为我们准备了很多依赖的包。

配置文件

进入config文件夹,找到 server.properties 配置文件,使用文本编辑器打开,这里推荐Sublime或者NotePad++(轻量级),当然VSCode、idea等也可以作为编辑器使用。

server.properties (Kafka默认配置文件)

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
#

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

上面的就是Kafka的整个配置文件了,可能由于版本的不同,有些东西会删减或者增加,当前使用的是Kafka 3.2.3版本。
我们需要更改的内容有以下几项

broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181

这三项几乎上来讲是必须要进行更改的

  • broker.id=0指的是该Kafka唯一的id,因为我们要设置集群,开多个Kafka,所以就有点类似唯一标识一样,当我们开了多个Kafka,要保证每一个id都是不同的,概念有点类似uuid,需要注意的是,一定不能出现一样的id。
  • log.dirs指的是Kafka的log目录,默认存放在/tmp里面,这个肯定是不行的,tmp是临时文件存储目录,每次重启都会被刷新掉,我们需要自己设定一个目录,任意位置都可以,只要可以存放Kafka的数据即可。(如果开多个Kafka,需要确保每一个Kafka的log目录都是不一样的)
  • zookeeper.connect指的是需要连接zk(zookeeper)的地址,在另外一个帖子我会写如何安装zk,点击这里前往 Zookeeper 的帖子。我设置的是zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka,规则就是 zookeeper.connect=zk的IP:zk的端口,第二个zk的IP:第二个zk的端口,...:.../kafka,有几个zk就写几个IP:PORT,然后用逗号隔开,最后加一个 /kafka,一般我们做测试都使用 3个Kafka + 3个zookeeper 集群进行操作。

完成了以上的三项修改,我们就已经设置好了配置文件,保存即可。

启动Kafka

在启动Kafka之前,请首先确保你的ZooKeeper已经安装并且启动完毕,如果没有启动,请先启动zk,如果还没安装zk,点击这里前往 Zookeeper 的帖子
如果是windows系统,打开cmd即可,mac或linux打开对应的console控制台即可。
首先进入Kafka的根目录,就是存在bin、config、libs的目录,然后输入 bin/./kafka-server-start.sh -daemon config/server.properties 需要注意的是windows和linux允许start.sh不太一样,linux需要./start.sh。最后面的config/server.properties是需要写的,它用来指向你启动所需要的配置文件。

td@UncleHangTD-deMacBook-Air ~ % cd Desktop/kafkaStudy/kafka_1
td@UncleHangTD-deMacBook-Air kafka_1 % bin/./kafka-server-start.sh -daemon config/server.properties
td@UncleHangTD-deMacBook-Air kafka_1 % jps
1061 QuorumPeerMain
997 QuorumPeerMain
1320 Kafka
1322 Jps
974 QuorumPeerMain
kafka_1 是我第一个kafka的根目录,第二行命令就是用来start Kafka的,第三行命令jps是用来查看你开启的进程。1320pid对应显示的就是我们刚启动的Kafka进程。
通过以上的步骤,我们的Kafka已经启动完毕

Kafka 常用命令

创建topic命令

bin/./kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --create --topic first --replication-factor 1

  • 这里需要注意,老版本2.2以下,命令使用 --zookeeper ,高版本使用 --bootstrap-server
  • 由于我所在的公司在使用 1.1.0 版本的Kafka,所以上面创建topic的命令使用的是 --zookeeper,2.2以上版本的Kafka需要将其替换为--bootstrap-server
  • IP段和配置文件中的zookeeper.connect统一即可
  • first指的就是topic创建出来的名字 first可以修改成任意其他内容
  • 命令不是死板的,也可以直接执行./kafka-topics.sh查看参数,或者查看其他教程贴看具体参数

查看已经创建的topic

bin/./kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --list

  • 和创建topic命令大体上没什么区别,需要注意的还是版本的问题,是用--zookeeper还是--bootstrap-server,之后的其他命令将不会再解释版本问题
  • --list指的就是列表

查看某个topic的描述

bin/./kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --topic first --describe

  • 注意版本问题
  • first代指的就是查看first topic的描述

创建某个topic的消费者

bin/./kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --topic first

  • 注意版本问题
  • first代指的是具体消费哪个topic

结合SpringBoot运用

建立一个SpringBoot项目,需要使用到maven,组件中选择web、kafka和lombok组件,组件如果报红记得刷新
application.properties配置文件中,设置以下属性

# kafka connection
#spring.kafka.bootstrap-servers=localhost:2181,localhost:2182,localhost:2183
spring.kafka.zookeeper=localhost:2181,localhost:2182,localhost:2183

# key-value producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# key-value consumer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# group-id
spring.kafka.consumer.group-id=domcer
说明:
kafka connection提供了两种连接方式,由于我使用的是旧版的Kafka,所以用第二种连接方式
如果Kafka版本大于2.2,使用第一种连接方式
key-value的producer和consumer都需要按照上面的方式设定
group-id的值可以为任意,只是一个组名

生产者使用

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author unclehang
 */
@RestController
public class ProducerController {

    @Autowired
    KafkaTemplate<String,String> kafka;

    @RequestMapping("/msgTest")
    public String data(String msg) {
        // 通过 kafka 发送消息
        kafka.send("first", msg);
        return "success";
    }

}
网页中输入 localhost:8080/msgTest?msg=xxx 返回success则代表测试成功
kafka.send("first",msg)中的first指具体的哪一个topic

消费者使用

在测试之前,请确保已经创建了消费者,bin/./kafka-console-consumer.sh --zookeeper localhost:2181,localhost:2182,localhost:2183/kafka --topic first

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

/**
 * @author unclehang
 */
@Configuration
public class KafkaConsumer {

    @KafkaListener(topics="first")
    public void consumerTopic(String msg) {
        System.out.println("getMessage " + msg);
    }

}
@KafkaListner是起用了Kafka的监听事件,监听了first topic
当收到了msg信息,则sout打印出来该信息内容

以上就是Kafka的安装和基础使用了


应当坚信,只要认真地努力向前,肯定会有好结果,应当保持心情舒畅,满怀信心,大步向前。 ——稻盛和夫
最后修改:2023 年 01 月 09 日
如果觉得我的文章对你有用,请随意赞赏