go语言kafka,Go语言之父

golang的回调和接口

最近写了个kafka的接收消息的功能,需要使用回调处理收到的消息。

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名雅安服务器托管、营销软件、网站建设、沁县网站维护、网站推广。

一个是基本的回调,一个是使用接口功能实现回调,对接口是个很好的学习。

1.正常回调

kafka的接收消息处。收到消息后,使用传入的Onmessage进行处理。

调用kafka接收消息的单元,并在调用方写好回调

在调用方实现回调需要执行的方法

感觉还是使用基本回调相对简单点,接口就当学习了。

另外跨包的接口的方法要大写!定位了好久发现个入门的问题。

不要再苦没有合适的kafka管理平台,给你分享10款kafka管理工具

这10款工具如下:

AKHQ

Kowl

Kafdrop

UI for Apache Kafka

Lenses

CMAK

Confluent CC

Conduktor

LogiKM

kafka-console-ui

如果上面这个地址可以打开,可以直接去看介绍,下文也不再重复说明。

关于前8款的对比,可以看下面这张图片,图片也是于上面,我直接copy过来了(可能有好多同学打不开上面这个链接,就直接看这张图片了解了下吧)

关于这8款工具的介绍,人家说的很清晰了,这里就不再重复说明了,并且这些工具,大部分我也没用过,也没资格评价太多。

考虑到很多同学可能打开github太慢,我下面会把相关基本信息整理一下,供大家快速了解,方便选型。

概览

AKHQ (previously known as KafkaHQ)

开发语言:后端是java为主

Kowl - A Web UI for Apache Kafka

p.s. github上完整的动图这里上传失败,就只放一个静态的截图了,如果可以打开github,建议打开下面的地址直接看吧。

但是这个并不是所有功能都是免费,有部分功能是商业版才有:

开发语言:后端是go为主

Kafdrop – Kafka Web UI

开发语言:后端以java为主

要求jdk11或更高版本

UI for Apache Kafka – Free Web UI for Apache Kafka

开发语言:后端以java为主

要求jdk13或更高版本

Lenses.io

Apache Kafka 和 Kubernetes 的实时应用程序和数据操作 #DataOps 门户。

CMAK (Cluster Manager for Apache Kafka, previously known as Kafka Manager)

这个想必很多同学都知道,原来的名字就是kafka manager。

开发语言:后端以scala为主

Confluent Inc.

Apache

Conduktor

一个商业版本的桌面客户端

官网找到一个这样的图片,凑合看吧:

LogiKM

滴滴开源的一站式Apache Kafka集群指标监控与运维管控平台。

也是分社区版和商业版的。

这个建议直接看github说明吧,都是中文,内容清晰,相关的资料也都有。

我也简单的了解了下,有个逻辑集群的概念,对于规模比较大的kafka集群管理还是挺好的,不过,这里比较高端的特性都是不开源的,必须商业版才能用。

开发语言:后端以java为主

kafka-console-ui(kafka可视化管理平台)

一款轻量级的kafka可视化管理平台,安装配置快捷、简单易用。界面风格有点类似rocketmq-console。

这款权当是“王婆卖瓜,自卖自夸”吧,一个小工具,如果刚接触kafka的同学或者是中小型集群,想找个简单易用的,可以考虑一下。

开发语言:后端以java和scala为主

参考链接:

Golang kafka简述和操作(sarama同步异步和消费组)

一、Kafka简述

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三方库

github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

3.3. 消费者

单个消费者

funcconsumer(){varwg sync.WaitGroup  consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{      fmt.Println("Failed to start consumer: %s", err)return}  partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{      fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {      pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{        fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}      wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))        }deferpc.AsyncClose()        wg.Done()      }(pc)  }  wg.Wait()}funcmain(){  consumer()}

消费组

funcconsumerCluster(){  groupID :="group-1"config := cluster.NewConfig()  config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second  config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{      glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){      errors := c.Errors()      noti := c.Notifications()for{select{caseerr := -errors:            glog.Errorln(err)case-noti:        }      }  }(c)formsg :=rangec.Messages() {      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))      c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){  config := sarama.NewConfig()  config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}  msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")  client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{      fmt.Println("producer close err, ", err)return}deferclient.Close()  pid, offset, err := client.SendMessage(msg)iferr !=nil{      fmt.Println("send message failed, ", err)return}  fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

异步生产者

funcasyncProducer(){  config := sarama.NewConfig()  config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second  p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){      errors := p.Errors()      success := p.Successes()for{select{caseerr := -errors:iferr !=nil{              glog.Errorln(err)            }case-success:        }      }  }(p)for{      v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))      fmt.Fprintln(os.Stdout, v)      msg := sarama.ProducerMessage{        Topic: topics,        Value: sarama.ByteEncoder(v),      }      p.Input() - msg      time.Sleep(time.Second *1)  }}funcmain(){goasyncProducer()select{      }}

3.5. 结果展示-

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:Hello World!

异步生产打印:

async:7272async:7616async:998

消费打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998

开源数据统计平台 -- GoAnalytics

本项目用于移动端的数据统计,项目地址: 。开源的数据统计countly做的很好,但是基础免费版的功能实在不够看,因此我就决定用go语言来写了这个项目,一来可以在实践中学习go语言,二来也可以开发功能完整的开源平台。该项目正在开发中,欢迎有兴趣的gopher一起参与。

数据存储方面使用的是mongodb。由于数据统计业务几乎不涉及到事务以及严格的一致性场景,而且mongodb的自动分片功能可以支撑较大的数据量。使用大数据的存储组件的话就太过于重了。因此选用mongodb。

业务逻辑整体基于事件的发布订阅。当收到客户端请求, frontend 会对请求数据进行处理,然后发布响应的事件。 backend 收到事件后进行统计处理。

后台展示基于Vue-Admin-Template开发,本人前端能力基本就是依葫芦画瓢,希望有前端大神来开发后台页面,项目地址:

目前客户端API仅有2个。一个是上报 openApp 打开APP时间,一个是上报 usageTime 一次启动使用时长事件。SDK方面也需要移动端的大神开发,感兴趣的大佬可以一起开发。

下面放一点后台页面的效果图:

GoAnalytics是基于go实现的一个数据统计平台,用于统计移动端的数据指标,比如启动次数、用户增长、活跃用户、留存等指标分析。前端数据展示项目是 goanalytics-web 。目前正在积极开发中,欢迎提交新的需求和pull request。

Go版本需要支持module,本地开发测试

cmd/goanalytics_kafka 和 goanalytics_rmq 是分别基于 kafka 和 rocketmq 的发布订阅功能做的数据发布

和订阅处理,横向扩展能力比 local 高。另外由于 rocketmq 还没有原生基于 go 的客户端(原生客户端正在开发中

2.0.0 road map ),可能会存在问题。

项目结构

├── README.md

├── api

│ ├── authentication 用户认证、管理API

│ ├── middlewares GIN 中间件

│ └── router API route

├── cmd

│ ├── account 生成admin账号命令

│ ├── analytic_local 不依赖消息系统的goanalytics

│ ├── goanalytics_kafka 基于kafak的goanalytics

│ ├── goanalytics_rmq 基于rocketmq的goanalytics

│ └── test_data 生成测试数据命令

├── common

│ └── data.go

├── conf 配置

│ └── conf.go

├── event

│ ├── codec 数据编解码

│ └── pubsub 消息发布订阅

├── go.mod

├── go.sum

├── metric 所有的统计指标在这里实现

│ ├── init.go

│ └── user 用户相关指标的实现

├── schedule

│ └── schedule.go 定时任务调度

├── storage 存储模块

│ ├── counter.go 计数器接口

│ ├── data.go

│ └── mongodb 基于mongodb实现的存储及计数器

└── utils

├── date.go

├── date_test.go

├── errors.go

└── key.go

filebeat采集日志到kafka配置及使用

Filebeat是elastic公司beats系列工具中的一个,主要用于收集本地日志。

在服务器上安装后,filebeat会监控日志目录或者指定的日志文件,追踪读取这些文件(追踪文件的变化,不停的读),并且转发这些信息到配置文件中指定的输出端(例如:elasticsearch,logstarsh或kafka)。

Filebeat使用go语言开发,使用时没有其他依赖,比logstash-forworder轻量,不会占用部署服务器太多的资源。

filebeat的工作流程:当你开启filebeat程序的时候,它会启动一个或多个探测器(prospectors)去检测你指定的日志目录或文件,对于探测器找出的每一个日志文件,filebeat启动收割进程(harvester),每一个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到你指定的地点。

2.配置filebeat

配置filebeat需要编辑filebeat的配置文件,不同安装方式,配置文件的存放路径有一些不同, 对于 rpm 和 deb的方式, 配置文件路径的是 /etc/filebeat/filebeat.yml,对于压缩包的方式,配置文件存在在解压目录下(例如:我是在home目录下进行的解压,那么配置文件的路径就应该是~/filebeat-6.2.4-linux-x86_64/filebeat.yml)。

由于我的预期目标是将filebeat收集的日志发送到kafka,所以配置output就选择了kafka。读者可根据自己的使用场景,配置output。

例子中的配置将对/var/log目录下所有以.log结尾的文件进行采集。

3.启动

本文中只是为满足需求对filebeat进行了最基本的配置。filebeat的很多重要的配置和特性并没有体现(例如:模块,多行消息),读者如果需要更深入的了解请参考: 。

欢迎大家在评论区讨论使用过程的心得和疑惑。

聊聊golang的zap的ZapKafkaWriter

本文主要研究一下golang的zap的ZapKafkaWriter

WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。


网站栏目:go语言kafka,Go语言之父
URL分享:http://ybzwz.com/article/hescoo.html