- golang系列(一)在Beego中存取Redis
- golang系列(二)基于Redis的分布式锁
- golang系列(三)在golang中使用kafka
- golang系列(四)在golang中使用gorm
- golang系列(五)常见的陷阱和错误
生产这Producer
package producer
import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/satori/go.uuid"
"sync"
"wiresdata-consumer/configer"
)
var asyncProducerOnce sync.Once
var asyncProducer sarama.AsyncProducer
var syncProducerOnce sync.Once
var syncProducer sarama.SyncProducer
var (
brokers = ""
)
func init() {
initKafkaProducerPara()
initAsyncProducer()
}
func initKafkaProducerPara() {
brokers = configer.Configer.Kafka.Brokers
}
func initAsyncProducer() {
var err error
config := sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForLocal
//Hash向partition发送消息
config.Producer.Partitioner = sarama.NewHashPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
asyncProducerOnce.Do(func() {
//使用配置,新建一个异步生产者
asyncProducer, err = sarama.NewAsyncProducer([]string{brokers}, config)
if err != nil {
//beego.Error("NewAsyncProducer error",err)
panic("NewAsyncProducer init error")
}
})
syncProducerOnce.Do(func() {
//使用配置,新建一个异步生产者
syncProducer, err = sarama.NewSyncProducer([]string{brokers}, config)
if err != nil {
//beego.Error("NewSyncProducer error",err)
panic("NewSyncProducer init error")
}
})
}
func AsyncProduce(topic string, payload interface{}) {
payloadJson,error := json.Marshal(payload)
if error != nil{
//beego.Error("json.Marshal error",payload,error)
}
asyncProducer.Input() <- &sarama.ProducerMessage{
Topic:topic,
Value:sarama.StringEncoder(payloadJson),
}
}
func SyncProduce(topic string, payload interface{}) (result bool) {
uuid,_ := uuid.NewV4()
key := uuid.String()
payloadJson,error := json.Marshal(payload)
if error != nil{
//beego.Error("json.Marshal error",payload,error)
}
msg := &sarama.ProducerMessage{
Topic:topic,
Key: sarama.StringEncoder(key),
Value:sarama.StringEncoder(payloadJson),
}
//重试两次
tryTimes := 2
for i := 0;i< tryTimes;i++ {
_, _, err := syncProducer.SendMessage(msg)
if err != nil{
result = false
//beego.Error("SendMessage error",err)
}else {
result = true
break
}
}
return
}
消费者Consumer
package consumer
import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
"github.com/satori/go.uuid"
"os"
"os/signal"
"strings"
"syscall"
"wiresdata-consumer/configer"
. "wiresdata-consumer/logger"
)
var (
brokerList = strings.Split(configer.Configer.Kafka.Brokers,",")
)
type Consumer struct {
consumer *cluster.Consumer
callbacks ConsumerCallbacks
}
func NewConsumer(callbacks ConsumerCallbacks, groupId string, topics[] string) *Consumer {
clientId,_ := uuid.NewV4()
consumer := Consumer{callbacks:callbacks}
config := cluster.NewConfig()
config.ClientID = clientId.String()
//从最早未消费开始
config.Consumer.Offsets.Initial = sarama.OffsetOldest
saramaConsumer, err := cluster.NewConsumer(brokerList, groupId, topics, config)
if err != nil {
panic(err)
}
consumer.consumer = saramaConsumer
return &consumer
}
func (c *Consumer) Consume() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go func(){
for {
select {
case msg, more := <-c.consumer.Messages():
if more {
if c.callbacks.OnDataReceived!=nil {
c.callbacks.OnDataReceived(msg.Value)
}
//Logger.Info(msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
c.consumer.MarkOffset(msg, "")
}
case ntf, more := <-c.consumer.Notifications():
if more {
Logger.Info("Rebalanced: %+v\n", ntf)
}
case err, more := <-c.consumer.Errors():
if more {
if c.callbacks.OnError!=nil {
c.callbacks.OnError(err)
}
Logger.Info("Error: %s\n", err.Error())
}
case <-sigchan:
return
}
}
}()
}
func (c *Consumer) Close(){
c.consumer.Close()
}
type ConsumerCallbacks struct {
OnDataReceived func(msg []byte)
OnError func(err error)
}
func CheckAppName(appName string)(checkResult bool) {
appNames := configer.GetAppNameList()
for _,v := range appNames{
if v == appName{
return true
}
}
return false
}
消费数据到Mysql的Demo:
package consumer
import (
"encoding/json"
"fmt"
"wiresdata-consumer/db/mysql"
"wiresdata-consumer/kafka/consumer_group"
"wiresdata-consumer/kafka/topic"
. "wiresdata-consumer/logger"
"wiresdata-consumer/models"
)
//消费kafka消息并通过impala插入
const (
data_track_mysql_insert_sql = `insert into data_track(partition_key,id,app_name,ip,create_time) values (?,?,?,?,?)`
)
/**
初始化方法
*/
func init() {
fmt.Println("beginit init DataTrackMysqlConsumer")
callBack := ConsumerCallbacks{
OnDataReceived:onMysqlDataReceived,
OnError:onMysqlError,
}
topics := []string{topic.KAFKA_TOPIC_DATA_TRACK}
consumerGroup := consumer_group.CONSUMER_DATA_MYSQL
consumer := NewConsumer(callBack, consumerGroup, topics)
consumer.Consume()
}
func onMysqlDataReceived(msg []byte) {
var model models.DataTrackModel
json.Unmarshal(msg,&model)
if !CheckAppName( model.AppName){
Logger.Error("CheckAppName:",model.AppName," while not config")
return
}
//Logger.Info("CreateTime:",strconv.FormatFloat(float64(model.CreateTime )/1000,'f',-1,64))
customJson := string("")
valMap,ok := model.EventProperty.Custom.(map[string]interface{})
if ok{
customJsonMap,err := json.Marshal(valMap)
if err == nil{
customJson = string(customJsonMap)
}
}
valStr,ok := model.EventProperty.Custom.(string)
if ok{
customJson = string(json.RawMessage(valStr))
}
if customJson == ""{
customJson = `{}`
}
//Mysql的from_unixtime格式为from_unixtime(1566821249.123)
_,err := mysql.ExecMySql(data_track_mysql_insert_sql,
model.PartitionKey,
model.Id,
model.AppName,
model.IP,
model.CreateTime
)
if err != nil{
Logger.Error("onMysqlDataReceived error",err)
}
}
func onMysqlError(err error){
Logger.Error("onMysqlError",err)
}