V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
xiangdong1987
V2EX  ›  Go 编程语言

折腾 kafka

  •  
  •   xiangdong1987 · 2019-08-01 18:56:07 +08:00 · 2916 次点击
    这是一个创建于 1966 天前的主题,其中的信息可能已经有所发展或是发生改变。

    Apache Kafka

    Apache Kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。

    • 特点
      • 分布系统
      • 订阅消息
      • 强大队列
      • 数据落地到硬盘

    安装

    • 前置条件:需要 zookeeper。
    • 安装 zookeeper
      • 官方下载
      • 接下即安装( java 生态就是好)
      • conf 下 sample 文件改成 zoo.cfg (默认单机模式)
      • zookeeper 常用命令
          //开启 zookeeper
          bin/zkServer.sh start
          //使用客户端连接
          bin/zkCli.sh
          //远程连接
          bin/zkCli.sh -server ip:port
          //停止 zookeeper
          bin/zkServer.sh stop
      
      • 其他相关 zookeeper 不细讲用到在学习
    • 安装 kafaka
      • 官方下载
      • 接下即安装( java 生态就是好)
      • kafaka 常用命令
          //开启 kafaka
          bin/kafka-server-start.sh config/server.properties
          //创建 Topic  
          bin/kafka-topics.sh --create --zookeeper 192.168.112.20:2181 --replication-factor 1 --partitions 1 --topic test
          //Topic 展示
          bin/kafka-topics.sh --list --zookeeper 192.168.112.20:2181
      

    遇到的问题:下载的是 kafka 的源码包,执行报错 classpath is empty. please build the project first e.g. by running 'gradlew jarall' 应该下载二进制包 下载地址:https://kafka.apache.org/downloads

    使用 Go 实现生产消费

    • 实现生产者
    package main
    
    import (
    	"bufio"
    	"fmt"
    	"github.com/Shopify/sarama"
    	"os"
    	"strings"
    )
    
    func main() {
        config := sarama.NewConfig()
        config.Producer.Return.Successes = true
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        //初始化生产者
        producer, err := sarama.NewSyncProducer([]string{"192.168.112.20:9092"}, config)
    
        if err != nil {
            panic(err)
        }
        defer producer.Close()
        
        msg := &sarama.ProducerMessage{
            Topic:     "testGo",
            Partition: int32(-1),
            Key:       sarama.StringEncoder("key"),
        }
    
        var value string
        for {
            // 生产消息
            inputReader := bufio.NewReader(os.Stdin)
            value, err = inputReader.ReadString('\n')
            if err != nil {
                panic(err)
            }
            value = strings.Replace(value, "\n", "", -1)
            msg.Value = sarama.ByteEncoder(value)
            //发送消息
            paritition, offset, err := producer.SendMessage(msg)
            if err != nil {
                fmt.Println("Send Message Fail")
            }
            //输出结果
            fmt.Printf("Partion = %d, offset = %d\n", paritition, offset)
        }
    }
    
    
    • 实现消费者
    package main
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    	"sync"
    )
    //协成锁 等待所有协成执行完毕退出
    var wg sync.WaitGroup
    func main() {
        //消费者初始化
        consumer, err := sarama.NewConsumer([]string{"192.168.112.20:9092"}, nil)
        if err != nil {
            panic(err)
        }
        partitionList, err := consumer.Partitions("testGo")
        if err != nil {
            panic(err)
        }
        for partition := range partitionList {
            pc, err := consumer.ConsumePartition("testGo", int32(partition), sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }
            defer pc.AsyncClose()
            //上锁
            wg.Add(1)
            go func(sarama.PartitionConsumer) {
                //解锁
                defer wg.Done()
                for msg := range pc.Messages() {
                    //接受消息
                    fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                }
            }(pc)
            //等待全部解锁
            wg.Wait()
            consumer.Close()
        }
    }
    

    遇到问题 github.com/Shopify/sarama 包在 windows 环境下需要 gcc,懒得搞直接虚拟机。

    总结

    利用 go + kafka 实现生产者消费模型非常简单和快速,在日常业务上使用消息队列可以考虑 kafka+go 快速迭代,后续应用上采坑在来分享。

    2 条回复    2019-08-02 15:17:46 +08:00
    julyclyde
        1
    julyclyde  
       2019-08-02 12:15:55 +08:00
    嗯,所以你写了这么长,到底表达了什么呢?
    lixia625
        2
    lixia625  
       2019-08-02 15:17:46 +08:00
    @julyclyde 这是把 V2 当作自己的笔记本的意思
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3297 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 11:39 · PVG 19:39 · LAX 03:39 · JFK 06:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.