IT人生

  • 首页
  • 归档
  • kafka
  • Java
  • Spring
  • Golang
  • SQL
  • Spark
  • ElasticSearch
  • 关于

  • 搜索
Phoenix HBase Kudu ElasticSearch Spring 数据结构 操作系统 Kettle Azkaban Sqoop Hive Yarn Redis Mybatis Impala Cloudera 大数据 HDFS mycat shell Linux 架构 并发 mysql sql golang java 工具 spark kafka 人生

golang系列(三)在golang中使用kafka

发表于 2019-03-11 | 分类于 golang | 0 | 阅读次数 2099
  1. golang系列(一)在Beego中存取Redis
  2. golang系列(二)基于Redis的分布式锁
  3. golang系列(三)在golang中使用kafka
  4. golang系列(四)在golang中使用gorm
  5. golang系列(五)常见的陷阱和错误

生产这Producer

package producer

import (
	"encoding/json"
	"github.com/Shopify/sarama"
	"github.com/satori/go.uuid"
	"sync"
	"wiresdata-consumer/configer"
)

var asyncProducerOnce sync.Once
var asyncProducer sarama.AsyncProducer

var syncProducerOnce sync.Once
var syncProducer sarama.SyncProducer

var (
	brokers = ""
)


func init()  {
	initKafkaProducerPara()
	initAsyncProducer()
}

func initKafkaProducerPara()  {
	brokers   = configer.Configer.Kafka.Brokers
}

func initAsyncProducer()  {
	var err error
	config := sarama.NewConfig()
	//等待服务器所有副本都保存成功后的响应
	config.Producer.RequiredAcks = sarama.WaitForLocal
	//Hash向partition发送消息
	config.Producer.Partitioner = sarama.NewHashPartitioner
	//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
	config.Producer.Return.Successes = true
	config.Producer.Return.Errors = true

	asyncProducerOnce.Do(func() {
		//使用配置,新建一个异步生产者
		asyncProducer, err = sarama.NewAsyncProducer([]string{brokers}, config)
		if err != nil {
			//beego.Error("NewAsyncProducer error",err)
			panic("NewAsyncProducer init error")
		}
	})

	syncProducerOnce.Do(func() {
		//使用配置,新建一个异步生产者
		syncProducer, err = sarama.NewSyncProducer([]string{brokers}, config)
		if err != nil {
			//beego.Error("NewSyncProducer error",err)
			panic("NewSyncProducer init error")
		}
	})
}

func AsyncProduce(topic string, payload interface{})  {
	payloadJson,error := json.Marshal(payload)
	if error != nil{
		//beego.Error("json.Marshal error",payload,error)
	}
	asyncProducer.Input() <- &sarama.ProducerMessage{
		Topic:topic,
		Value:sarama.StringEncoder(payloadJson),
	}
}



func SyncProduce(topic string, payload interface{}) (result bool) {
	uuid,_ := uuid.NewV4()
	key := uuid.String()
	payloadJson,error := json.Marshal(payload)
	if error != nil{
		//beego.Error("json.Marshal error",payload,error)
	}
	msg := &sarama.ProducerMessage{
		Topic:topic,
		Key: sarama.StringEncoder(key),
		Value:sarama.StringEncoder(payloadJson),
	}
	//重试两次
	tryTimes := 2
	for i := 0;i< tryTimes;i++  {
		_, _, err := syncProducer.SendMessage(msg)
		if err != nil{
			result = false
			//beego.Error("SendMessage error",err)
		}else {
			result = true
			break
		}
	}
	return
}

消费者Consumer

package consumer

import (
	"github.com/Shopify/sarama"
	"github.com/bsm/sarama-cluster"
	"github.com/satori/go.uuid"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"wiresdata-consumer/configer"
	. "wiresdata-consumer/logger"
)

var (
	brokerList = strings.Split(configer.Configer.Kafka.Brokers,",")
)

type Consumer struct {
	consumer *cluster.Consumer
	callbacks ConsumerCallbacks
}

func NewConsumer(callbacks ConsumerCallbacks, groupId string, topics[] string) *Consumer {
	clientId,_ := uuid.NewV4()
	consumer := Consumer{callbacks:callbacks}
	config := cluster.NewConfig()
	config.ClientID = clientId.String()
	//从最早未消费开始
	config.Consumer.Offsets.Initial = sarama.OffsetOldest
	saramaConsumer, err := cluster.NewConsumer(brokerList, groupId, topics, config)
	if err != nil {
		panic(err)
	}
	consumer.consumer = saramaConsumer
	return &consumer
}

func (c *Consumer) Consume() {
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

	go func(){
		for {
			select {
			case msg, more := <-c.consumer.Messages():
				if more {
					if c.callbacks.OnDataReceived!=nil {
						c.callbacks.OnDataReceived(msg.Value)
					}
					//Logger.Info(msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
					c.consumer.MarkOffset(msg, "")
				}
			case ntf, more := <-c.consumer.Notifications():
				if more {
					Logger.Info("Rebalanced: %+v\n", ntf)
				}
			case err, more := <-c.consumer.Errors():
				if more {
					if c.callbacks.OnError!=nil {
						c.callbacks.OnError(err)
					}
					Logger.Info("Error: %s\n", err.Error())
				}
			case <-sigchan:
				return
			}
		}
	}()
}


func (c *Consumer) Close(){
	c.consumer.Close()
}

type ConsumerCallbacks struct {
	OnDataReceived func(msg []byte)
	OnError func(err error)
}

func  CheckAppName(appName string)(checkResult bool) {
	appNames := configer.GetAppNameList()
	for _,v := range appNames{
		if v == appName{
			return true
		}
	}
	return false
}

消费数据到Mysql的Demo:

package consumer

import (
	"encoding/json"
	"fmt"
	"wiresdata-consumer/db/mysql"
	"wiresdata-consumer/kafka/consumer_group"
	"wiresdata-consumer/kafka/topic"
	. "wiresdata-consumer/logger"
	"wiresdata-consumer/models"
)

//消费kafka消息并通过impala插入
const (
	data_track_mysql_insert_sql = `insert into data_track(partition_key,id,app_name,ip,create_time) values (?,?,?,?,?)`
)

/**
初始化方法
 */
func init()  {
	fmt.Println("beginit init DataTrackMysqlConsumer")
	callBack := ConsumerCallbacks{
		OnDataReceived:onMysqlDataReceived,
		OnError:onMysqlError,
	}
	topics := []string{topic.KAFKA_TOPIC_DATA_TRACK}
	consumerGroup := consumer_group.CONSUMER_DATA_MYSQL
	consumer := NewConsumer(callBack, consumerGroup, topics)
	consumer.Consume()
}

func onMysqlDataReceived(msg []byte)  {
	var model models.DataTrackModel
	json.Unmarshal(msg,&model)
	if !CheckAppName( model.AppName){
		Logger.Error("CheckAppName:",model.AppName," while not config")
		return
	}
	//Logger.Info("CreateTime:",strconv.FormatFloat(float64(model.CreateTime )/1000,'f',-1,64))

	customJson := string("")
	valMap,ok := model.EventProperty.Custom.(map[string]interface{})
	if ok{
		customJsonMap,err := json.Marshal(valMap)
		if err == nil{
			customJson = string(customJsonMap)
		}
	}

	valStr,ok := model.EventProperty.Custom.(string)
	if ok{
		customJson = string(json.RawMessage(valStr))
	}

	if customJson == ""{
		customJson = `{}`
	}

	//Mysql的from_unixtime格式为from_unixtime(1566821249.123)
	_,err := mysql.ExecMySql(data_track_mysql_insert_sql,
		model.PartitionKey,
		model.Id,
		model.AppName,
		model.IP,
		model.CreateTime 
		)
	if err != nil{
		Logger.Error("onMysqlDataReceived error",err)
	}
}

func onMysqlError(err error){
	Logger.Error("onMysqlError",err)
}

  • 本文作者: Randy
  • 本文链接: http://www.itrensheng.com/archives/golanggolangkafka
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Phoenix # HBase # Kudu # ElasticSearch # Spring # 数据结构 # 操作系统 # Kettle # Azkaban # Sqoop # Hive # Yarn # Redis # Mybatis # Impala # Cloudera # 大数据 # HDFS # mycat # shell # Linux # 架构 # 并发 # mysql # sql # golang # java # 工具 # spark # kafka # 人生
golang系列(二)基于Redis的分布式锁
golang系列(四)在golang中使用gorm
  • 文章目录
  • 站点概览
Randy

Randy

技术可以暂时落后,但任何时候都要有上进的信念

80 日志
27 分类
31 标签
RSS
Github E-mail
Creative Commons
© 2021 备案号:沪ICP备19020689号-1
Randy的个人网站