07.Kafka环境搭建

3y 2021-11-24 08:37:58
Categories: Tags:

大家好,我是3y

今天更新下austin的番外篇#02

前几天在朋友圈和群吐槽了下:“我搭Kafka一个晚上都还没成功”。有好多同学都发出了自己亲生经历的教程给我(甚至在发送之前还主动验证了下该内容有没有问题),我依靠着这么多教程也顺利搭出来了。(十分感谢这些大佬

之前刚建项目的时候有很多同学都说要跟着我一起做austin,我嘴上没答应。但是真正直接过来提pull request的,我是没办法拒绝的(只要你的代码是合理的,对项目整个结构或流程是有帮助,我是非常欢迎的)

这个项目的初衷一方面是能够成为Java小白的项目,另一方面是我也能从中学到东西。每个人都有自己熟悉的领域,我对很多东西可能都是不了解或者说是片面的(不要以为我写文章多了,我就啥都懂,这是不可能的

上周有个在评论区讨论得挺多的问题:MySQL数据库中创建时间/更新时间字段为什么用int类型?

当我被问到,我在建表的时候为什么用int,而不是datetime或者timestamp这种MySQL提供的日期字段类型呢?我当时的第一反应是:“存储时间戳进去不是很正常吗?我一直都这样干的”

于是我抱着好奇的心态问了一遍相关留言的大佬,用datetime有什么好处,总结大概两点吧:

  1. 在数据库上看时间直观(不需要转义)
  2. 可以自动更新(updated字段可以不由程序员控制,当更新记录时,框架自动更新该自动)

至于int和datetime的存储大小和处理速度其实并不在我的考虑范围内,但最终还是没能让我将int改成datetime。

一方面,我在以前的公司几乎所有的表都是int类型(我用习惯了,我承认了,我是懒惰的),另一方面我认为int是最通用的(跨平台,跨数据库,跨语言,也不需要管时区什么的)。

每个应用程序下都会有对应的TimeUtils工具类,获取一个时间戳和格式化一个时间戳对于程序而言就只是调用一个方法,用得也不糟心。而提出最主要的直观,这个确实会带来好处,但按照我以往的工作经历上貌似也好像不会提高很多效率。

如果有更好的观点,欢迎在评论区继续留言探讨,并且我希望:提出疑问或者反对的同时,最好能说明理由(这样会提高我们交流的效率)。

00、搭建Kafka环境

下周要在austin项目下引入kafka组件了,今天也来记录下搭建kafka的过程,好让跟着做项目的同学可以快速搭建起来。了解过Kafka的同学可能就知道,Kafka还得依赖Zookeeper组件,这要是在原生的环境下搭建Kafka和Zookeeper还是比较麻烦的。

我个人是不喜欢把时间耗费在搭建环境上的,所以就直接上Docker/Docker Compose吧。

环境:CentOS 7.6 64bit

01、Docker环境

首先我们需要安装GCC相关的环境:

yum -y install gcc

yum -y install gcc-c++

安装Docker需要的依赖软件包:

yum install -y yum-utils device-mapper-persistent-data lvm2

设置国内的镜像(提高速度)

yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

更新yum软件包索引:

yum makecache fast

安装DOCKER CE(注意:Docker分为CE版和EE版,一般我们用CE版就够用了)

yum -y install docker-ce

启动Docker:

systemctl start docker

下载回来的Docker版本::

docker version

来一发HelloWorld:

docker run hello-world

02、Docker compose环境

Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务

运行以下命令以下载 Docker Compose 的当前稳定版本:

sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose

将可执行权限应用于二进制文件:

sudo chmod +x /usr/local/bin/docker-compose

创建软链:

sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose

测试是否安装成功:

docker-compose --version

03、compose 文件

新建搭建kafka环境的docker-compose.yml文件,内容如下:

version: '3'services:  zookepper:    image: wurstmeister/zookeeper                    # 原镜像`wurstmeister/zookeeper`    container_name: zookeeper                        # 容器名为'zookeeper'    volumes:                                         # 数据卷挂载路径设置,将本机目录映射到容器目录      - "/etc/localtime:/etc/localtime"    ports:                                           # 映射端口      - "2181:2181"  kafka:    image: wurstmeister/kafka                                # 原镜像`wurstmeister/kafka`    container_name: kafka                                    # 容器名为'kafka'    volumes:                                                 # 数据卷挂载路径设置,将本机目录映射到容器目录      - "/etc/localtime:/etc/localtime"    environment:                                                       # 设置环境变量,相当于docker run命令中的-e      KAFKA_BROKER_ID: 0                                               # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 将kafka的地址端口注册给zookeeper      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092                        # 配置kafka的监听端口      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181                # zookeeper地址      KAFKA_CREATE_TOPICS: "hello_world"    ports:                              # 映射端口      - "9092:9092"    depends_on:                         # 解决容器依赖启动先后问题      - zookepper  kafka-manager:    image: sheepkiller/kafka-manager                         # 原镜像`sheepkiller/kafka-manager`    container_name: kafka-manager                            # 容器名为'kafka-manager'    environment:                        # 设置环境变量,相当于docker run命令中的-e      ZK_HOSTS: zookeeper:2181  #  zookeeper地址      APPLICATION_SECRET: xxxxx      KAFKA_MANAGER_AUTH_ENABLED: "true"  # 开启kafka-manager权限校验      KAFKA_MANAGER_USERNAME: admin       # 登陆账户      KAFKA_MANAGER_PASSWORD: 123456      # 登陆密码    ports:                              # 映射端口      - "9000:9000"    depends_on:                         # 解决容器依赖启动先后问题      - kafka

文件内**// TODO 中的ip**需要改成自己的,并且如果你用的是云服务器,那需要把端口给打开。

04、启动kafka

在存放docker-compose.yml的目录下执行启动命令:

docker-compose up -d

可以查看下docker镜像运行的情况:

docker ps 

进入kafka 的容器:

docker exec -it kafka sh

创建一个topic(这里我的topicName就叫austin,你们可以改成自己的)

$KAFKA_HOME/bin/kafka-topics.sh --create --topic austin --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1 

查看刚创建的topic信息:

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic austin

启动一个消费者:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic austin

新增一个窗口,启动一个生产者:

docker exec -it kafka sh$KAFKA_HOME/bin/kafka-console-producer.sh --topic=austin --broker-list kafka:9092

05、Java程序验证Kafka

引入Kafka依赖(SpringBoot有默认的版本,不需要写version)

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId></dependency>

定义一个实体类:

@Data@Accessors(chain = true)public class UserLog {    private String username;    private String userid;    private String state;}

定义生产者(austin是topicName):

@Componentpublic class UserLogProducer {    @Autowired    private KafkaTemplate kafkaTemplate;    /**     * 发送数据        * @param userid     */    public void sendLog(String userid){        UserLog userLog = new UserLog();        userLog.setUsername("jhp").setUserid(userid).setState("0");        System.err.println("发送用户日志数据:"+userLog);        kafkaTemplate.send("austin", JSON.toJSONString(userLog));    }}

定义消费者:

@Component@Slf4jpublic class UserLogConsumer {    @KafkaListener(topics = {"austin"},groupId = "austinGroup1")    public void consumer(ConsumerRecord<?,?> consumerRecord){        //判断是否为null        Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());        log.info(">>>>>>>>>> record =" + kafkaMessage);        if(kafkaMessage.isPresent()){            //得到Optional实例中的值            Object message = kafkaMessage.get();            System.err.println("消费消息:"+message);        }    }}

定义接口:

@RestControllerpublic class KafkaTestController {    @Autowired    private UserLogProducer userLogProducer;    /**     * test insert     */    @GetMapping("/kafka/insert")    public String insert(String userId) {        userLogProducer.sendLog(userId);        return null;    }}

测试:http://localhost:8080/kafka/insert?userId=3y

06、总结

搭建环境什么的没什么需要总结的,按着教程一步一步执行应该就完事了?下周会具体实现austin-apiaustin-api-impl模块的内容,可以继续关注下。

今天又是被白嫖的一天,哎😌。

Gitee链接:https://gitee.com/zhongfucheng/austin

GitHub链接:https://github.com/ZhongFuCheng3y/austin