如何从零开始搭建Kafka+SpringBoot分布式消息系统

这期内容当中小编将会给大家带来有关如何从零开始搭建Kafka+SpringBoot分布式消息系统,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

创新互联基于成都重庆香港及美国等地区分布式IDC机房数据中心构建的电信大带宽,联通大带宽,移动大带宽,多线BGP大带宽租用,是为众多客户提供专业服务器托管报价,主机托管价格性价比高,为金融证券行业联通服务器托管,ai人工智能服务器托管提供bgp线路100M独享,G口带宽及机柜租用的专业成都idc公司。

前言

由于kafka强依赖于zookeeper,所以需先搭建好zookeeper集群。由于zookeeper是由java编写的,需运行在jvm上,所以首先应具备java环境。 (ps:默认您的centos系统可联网,本教程就不教配置ip什么的了) (ps2:没有wget的先装一下:yum install wget) (ps3:人啊,就是要条理。东边放一点,西边放一点,过段时间就不知道自己装在哪里了。本教程所有下载均放在/usr/local目录下) (ps4:kafka可能有内置zookeeper,感觉可以越过zookeeper教程,但是这里也配置出来了。我没试过)

一、配置jdk

因为oracle 公司不允许直接通过wget 下载官网上的jdk包。所以你直接wget以下地址下载下来的是一个只有5k的网页文件而已,并不是需要的jdk包。(垄断地位就是任性)。 (请通过java -version判断是否自带jdk,我的没带)

1、官网下载

下面是jdk8的官方下载地址:

https://www.oracle.com/technetwork/java/javase/downloads/java-archive-javase8u211-later-5573849.html

如何从零开始搭建Kafka+SpringBoot分布式消息系统

2、上传解压

这里通过xftp上传到服务器指定位置:/usr/local

如何从零开始搭建Kafka+SpringBoot分布式消息系统

运行命令使环境生效

source /etc/profile

如何从零开始搭建Kafka+SpringBoot分布式消息系统

等待下载完成之后解压:

tar -zxvf zookeeper-3.4.6.tar.gz

从零开始搭建Kafka+SpringBoot分布式消息系统

重命名为zookeeper1

mv zookeeper-3.4.6 zookeeper1
cp -r zookeeper1 zookeeper2
cp -r zookeeper1 zookeeper3

2、创建data、logs文件夹

在zookeeper1目录下创建

如何从零开始搭建Kafka+SpringBoot分布式消息系统

在data目录下新建myid文件。内容为1

如何从零开始搭建Kafka+SpringBoot分布式消息系统

3、修改zoo.cfg文件

cd /usr/local/zookeeper/zookeeper1/conf/
cp zoo_sample.cfg zoo.cfg

进行过上面两步之后,有zoo.cfg文件了,现在修改内容为:

从零开始搭建Kafka+SpringBoot分布式消息系统

dataDir=/usr/local/zookeeper/zookeeper1/data
dataLogDir=/usr/local/zookeeper/zookeeper1/logs
server.1=192.168.233.11:2888:3888
server.2=192.168.233.11:2889:3889
server.3=192.168.233.11:2890:3890

4、搭建zookeeper2

首先,复制改名。

cd /usr/local/zookeeper/
cp -r zookeeper1 zookeeper2

然后修改具体的某些配置:

vim zookeeper2/conf/zoo.cfg

将下图三个地方1改成2

如何从零开始搭建Kafka+SpringBoot分布式消息系统

vim zookeeper2/data/myid

同时将myid中的值改成2

如何从零开始搭建Kafka+SpringBoot分布式消息系统

vim zookeeper3/conf/zoo.cfg

修改为3

如何从零开始搭建Kafka+SpringBoot分布式消息系统

6、测试zookeeper集群

cd /usr/local/zookeeper/zookeeper1/bin/

由于启动所需代码比较多,这里简单写了一个启动脚本:

vim start

start的内容如下

cd /usr/local/zookeeper/zookeeper1/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper2/bin/
./zkServer.sh start ../conf/zoo.cfg
cd /usr/local/zookeeper/zookeeper3/bin/
./zkServer.sh start ../conf/zoo.cfg

下面是连接脚本:

vim login

login内容如下:

./zkCli.sh -server 192.168.233.11:2181,192.168.233.11:2182,192.168.233.11:2183

脚本编写完成,接下来启动:

sh start
sh login

启动集群成功,如下图:

如何从零开始搭建Kafka+SpringBoot分布式消息系统

三、搭建kafka集群

1、下载kafka

首先创建kafka目录:

mkdir /usr/local/kafka

然后在该目录下载

cd /usr/local/kafka/
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz

下载成功之后解压:

tar -zxvf kafka_2.11-1.1.0.tgz

2、修改集群配置

首先进入conf目录下:

cd /usr/local/kafka/kafka_2.11-1.1.0/config

修改server.properties 修改内容:

broker.id=0
log.dirs=/tmp/kafka-logs
listeners=PLAINTEXT://192.168.233.11:9092

复制两份server.properties

cp server.properties server2.properties
cp server.properties server3.properties

修改server2.properties

vim server2.properties

修改主要内容为:

broker.id=1
log.dirs=/tmp/kafka-logs1
listeners=PLAINTEXT://192.168.233.11:9093

如上,修改server3.properties 修改内容为:

broker.id=2
log.dirs=/tmp/kafka-logs2
listeners=PLAINTEXT://192.168.233.11:9094

3、启动kafka

这里还是在bin目录编写一个脚本:

cd ../bin/
vim start

脚本内容为:

./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties &

通过jps命令可以查看到,共启动了3个kafka。

从零开始搭建Kafka+SpringBoot分布式消息系统

4、创建Topic

cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

如何从零开始搭建Kafka+SpringBoot分布式消息系统

kafka打印了几条日志

如何从零开始搭建Kafka+SpringBoot分布式消息系统

查看kafka状态

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

如何从零开始搭建Kafka+SpringBoot分布式消息系统

6、启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

可以看出,启动消费者之后就会自动消费。

如何从零开始搭建Kafka+SpringBoot分布式消息系统

消费者自动捕获成功!

如何从零开始搭建Kafka+SpringBoot分布式消息系统

不满足的话启动springboot的时候会抛异常的!!!ps:该走的岔路我都走了o(╥﹏╥)o (我的kafka-clients是1.1.0,spring-kafka是2.2.2,中间那列暂时不用管)

从零开始搭建Kafka+SpringBoot分布式消息系统

回归正题,搞了两个小时,终于搞好了,想哭… 遇到的问题基本就是jar版本不匹配。 上面的步骤我也都会相应的去修改,争取大家按照本教程一遍过!!!

1、pom文件



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.1.RELEASE
         
    
    com.gzky
    study
    0.0.1-SNAPSHOT
    study
    Demo project for Spring Boot

    
        1.8
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
            
                
                    org.junit.vintage
                    junit-vintage-engine
                
            
        

        
            org.springframework.boot
            spring-boot-starter-redis
            1.3.8.RELEASE
        

        
            redis.clients
            jedis
        

        
        
            org.springframework.kafka
            spring-kafka
            2.2.0.RELEASE
        
        
        
            org.apache.kafka
            kafka-clients
        

    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

pom文件中,重点是下面这两个版本。


       org.springframework.boot
       spring-boot-starter-parent
       2.1.1.RELEASE
        
   

      org.springframework.kafka
      spring-kafka
      2.2.0.RELEASE

2、application.yml

spring:
  redis:
    cluster:
      #设置key的生存时间,当key过期时,它会被自动删除;
      expire-seconds: 120
      #设置命令的执行时间,如果超过这个时间,则报错;
      command-timeout: 5000
      #设置redis集群的节点信息,其中namenode为域名解析,通过解析域名来获取相应的地址;
      nodes: 192.168.233.11:9001,192.168.233.11:9002,192.168.233.11:9003,192.168.233.11:9004,192.168.233.11:9005,192.168.233.11:9006
  kafka:
    # 指定kafka 代理地址,可以多个
    bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 指定默认消费者group id
      group-id: test-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

server:
  port: 8085
  servlet:
    #context-path: /redis
    context-path: /kafka

没有配置Redis的可以把Redis部分删掉,也就是下图: 想学习配置Redis集群的可以参考:《Redis集群redis-cluster的搭建及集成springboot》

如何从零开始搭建Kafka+SpringBoot分布式消息系统

3、生产者

package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * kafka生产者工具类
 *
 * @author biws
 * @date 2019/12/17
 **/
@Component
public class KfkaProducer {

    private static Logger logger = LoggerFactory.getLogger(KfkaProducer.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 生产数据
     * @param str 具体数据
     */
    public void send(String str) {
        logger.info("生产数据:"> 
 4、消费者package com.gzky.study.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消费者监听消息
 *
 * @author biws
 * @date 2019/12/17
 **/
@Component
public class KafkaConsumerListener {

    private static Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);

    @KafkaListener(topics = "testTopic")
    public void onMessage(String str){
        //insert(str);//这里为插入数据库代码
        logger.info("监听到:" + str);
        System.out.println("监听到:" + str);
    }

}  5、对外接口package com.gzky.study.controller;

import com.gzky.study.utils.KfkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

/**
 * kafka对外接口
 *
 * @author biws
 * @date 2019/12/17
 **/
@RestController
public class KafkaController {

    @Autowired
    KfkaProducer kfkaProducer;

    /**
     * 生产消息
     * @param str
     * @return
     */
    @RequestMapping(value = "/sendKafkaWithTestTopic",method = RequestMethod.GET)
    @ResponseBody
    public boolean sendTopic(@RequestParam String str){
        kfkaProducer.send(str);
        return true;
    }
}  6、postman测试这里首先应该在服务器启动监听器(kafka根目录),下面命令必须是具体的服务器ip,不能是localhost,是我踩过的坑:推荐此处重启一下集群 关闭kafka命令:cd /usr/local/kafka/kafka_2.11-1.1.0/bin
./kafka-server-stop.sh ../config/server.properties &
./kafka-server-stop.sh ../config/server2.properties &
./kafka-server-stop.sh ../config/server3.properties & 此处应该jps看一下,等待所有的kafka都关闭(关不掉的kill掉),再重新启动kafka:./kafka-server-start.sh ../config/server.properties &
./kafka-server-start.sh ../config/server2.properties &
./kafka-server-start.sh ../config/server3.properties & 等待kafka启动成功后,启动消费者监听端口:cd /usr/local/kafka/kafka_2.11-1.1.0
bin/kafka-console-consumer.sh --bootstrap-server 192.168.233.11:9092 --from-beginning --topic testTopic  曾经我乱输的测试信息全部被监听过来了!启动springboot服务 然后用postman生产消息: 然后享受成果,服务器端监听成功。 项目中也监听成功!

上述就是小编为大家分享的如何从零开始搭建Kafka+SpringBoot分布式消息系统了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。


分享文章:如何从零开始搭建Kafka+SpringBoot分布式消息系统
分享地址:http://ybzwz.com/article/jiegoh.html