IT人生

  • 首页
  • 归档
  • kafka
  • Java
  • Spring
  • Golang
  • SQL
  • Spark
  • ElasticSearch
  • 关于

  • 搜索
Phoenix HBase Kudu ElasticSearch Spring 数据结构 操作系统 Kettle Azkaban Sqoop Hive Yarn Redis Mybatis Impala Cloudera 大数据 HDFS mycat shell Linux 架构 并发 mysql sql golang java 工具 spark kafka 人生

基于canal+kafka+spark streaming+kudu的实时数仓搭建

发表于 2018-11-19 | 分类于 kafka | 0 | 阅读次数 1982

简介

canal是一款mysql binlog日志解析工具。通过canal可以把mysql的所有变更都通过canal发送到kafka中,而spark streaming可以通过订阅kafka消息来做实时数据仓库的构建。 以下只是一个demo,不具备生产环境需求

架构

image.png

canal消息

canal消息是json格式的,如下为insert格式的kafka消息,

  1. type是指这条binlog对应的sql类型,可选值有:DELETE,INSERT,UPDATE,ALTER 这几种。
  2. database指所操作的数据库
  3. table指表名称
  4. sqlType是mysql在binlog内部映射的类型
  5. data是插入的字段以及值
  6. es是binlog中计入的时间戳
  7. ts是canal解析到kafka消息时候的时间戳
{
    "data":[
        {
            "id":"1107acd5427642e5a5709a379dd6d26a",
            "create_time":"2018-02-19 17:01:03",
            "command_id":"10061",
            "header":"{ "auth": "558307773617872896", "userAgent": "Mozilla/5.0 (Linux; Android 8.0.0; PRA-AL00X Build/HONORPRA-AL00X; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/70.0.3538.110 Mobile Safari/537.36 MicroMessenger/7.0.9.1560(0x27000935) Process/appbrand0 NetType/WIFI Language/zh_CN ABI/arm64"}",
            "user_Id":null,
            "trx_time":"2018-02-19 17:01:02",
            "content":"{"detail": {"errMsg": "no advertisement", "errCode": 1005}}",
            "partition_key":"20200219"
        }
    ],
    "database":"demo",
    "es":1582102862000,
    "id":54236666,
    "isDdl":false,
    "mysqlType":{
        "id":"char(32)",
        "create_time":"timestamp",
        "command_id":"int(20)",
        "header":"json",
        "user_Id":"varchar(200)",
        "trx_time":"timestamp",
        "content":"json",
        "partition_key":"int(8)"
    },
    "old":null,
    "pkNames":[
        "id",
        "partition_key"
    ],
    "sql":"",
    "sqlType":{
        "id":1,
        "create_time":93,
        "command_id":4,
        "header":12,
        "user_Id":12,
        "trx_time":93,
        "content":12,
        "partition_key":4
    },
    "table":"report_data",
    "ts":1582102863159,
    "type":"INSERT"
}

代码实现

流程是spark streaming消费kafka的消息,并通过impala客户端插入到数据库中。

spark streaming消费kafka部分

        SparkConf conf = new SparkConf();
        conf.setMaster("local[4]");
        conf.setAppName("transfer App");
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
        conf.set("spark.default.parallelism", "1");

        //kafka
        Map<String, Object> kafkaParams = new HashMap<>(10);
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "canal2kudu");
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//可能会丢消息,需要改为手动提交        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

        //kafka topic

        //streaming Batch执行的频率
        JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
                streamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topicList, kafkaParams)
        );

        //一次遍历RDD中记录
        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<String, String>>() {
                    @Override
                    public void call(ConsumerRecord<String, String> record) throws Exception {
                        String recordVal = record.value();
                        if (StringUtils.isEmpty(recordVal)) {
                            return;
                        }

                        try {
                            MsgDto msgDto = new Gson().fromJson(recordVal, MsgDto.class);
                            if (null == msgDto) {
                                return;
                            }

                            //目前统一把实时表放到kudurr下
                            String dbName =  msgDto.getDatabase();
                            String tableName = dbName.replace("-","_") + "_" + msgDto.getTable();
                            String fullTableName = "kudurealtime." + tableName;

                            Map<String, Integer> sqlTypeMap = msgDto.getSqlType();

                            List<Map<String, Object>> dataMapList = msgDto.getData();
                            DbOperationType operationType = msgDto.getType();

                            ImpalaSink impalaSink = new ImpalaSink();
                            int loopTimes = 1;
                            while (true){
                                boolean execResult = impalaSink.executeImpala(IMPALA_JDBC_URL, fullTableName, operationType, dataMapList, sqlTypeMap);
                                if(execResult){
                                    break;
                                }
                                loopTimes++;
                                if(loopTimes > LOOP_SIZE){
                                    break;
                                }
                            }

                        } catch (Exception ex) {
                            logger.error("process{" + recordVal + "}", ex);
                        }
                    }
                });
            }
        });

        //启动流
        streamingContext.start();
        try {
            streamingContext.awaitTermination();
        } catch (Exception e) {
            logger.error(e);
        }
    }

解析kafka消息为impala sql(为了演示直接调用jdbc)

 public boolean executeImpala(String jdbcUrl,String tableName, DbOperationType operationType, List<Map<String,Object>> dataList,Map<String, Integer> sqlTypeMap){
        boolean processResult = false;

        if(CollectionUtils.isEmpty(dataList)){
            return true;
        }

        List<String> execSql = Lists.newArrayList();
        switch (operationType){
            case INSERT:
                execSql = parseInsertSql(tableName,dataList,sqlTypeMap);
                break;
            case UPDATE:
                execSql = parseUpdateSql(tableName,dataList);
                break;
            case DELETE:
                execSql = parseDeleteSql(tableName,dataList);
                break;
                default:
                    break;
        }

        if(CollectionUtils.isEmpty(execSql)) {
            return true;
        }

        //logger.info(execSql);
        if("kudurealtime.futures_account".equals(tableName)){
            logger.info(execSql);
        }

        //JDBC执行
        Connection connectionn = null;
        Statement statement = null;
        try {
            Class.forName("com.cloudera.impala.jdbc41.Driver");
            connectionn = DriverManager.getConnection(jdbcUrl);   //生产
            statement = connectionn.createStatement();
            for (String item : execSql) {
                statement.execute(item);
            }
            processResult = true;
        }catch (Exception ex){
            logger.error(this,ex);
        }finally {
            if(null != statement){
                try {
                    statement.close();
                }catch (Exception ex){
                    logger.error(this,ex);
                }
            }
            if(null != connectionn) {
                try {
                    connectionn.close();
                }catch (Exception ex){
                    logger.error(this,ex);
                }
            }
        }

        return processResult;
    }


    private List<String> parseInsertSql(String tableName,List<Map<String,Object>> dataList,Map<String, Integer> sqlTypeMap){
        List<String> resultList = Lists.newArrayList();
        for (Map<String,Object> mapItem :dataList){
            if(MapUtils.isEmpty(mapItem)){
                continue;
            }
            StringBuilder keySB = new StringBuilder();
            StringBuilder valSB = new StringBuilder();

            //遍历K,V对
            for (String keyItem: mapItem.keySet()) {
                Object valueObj = mapItem.get(keyItem);
                String value = rebuildVal(keyItem,valueObj,sqlTypeMap);
                if(StringUtils.isNotEmpty(value)) {
                    valSB.append(value + ",");
                    keySB.append(keyItem + ",");
                }
            }

            //去掉最后的逗号
            keySB.deleteCharAt(keySB.length() - 1);
            valSB.deleteCharAt(valSB.length() - 1);
            resultList.add(String.format("insert into %s(%s) values(%s);",tableName,keySB.toString(),valSB.toString()));
        }

        return resultList;
    }

映射kafka消息的dto


public class MsgDto {

    private String database;

    private String table;

    private DbOperationType type;

    private String es;

    private String ts;


    private List<Map<String,Object>> data;

    private List<Map<String,Object>> old;

    private Map<String,Integer> sqlType;

    private Map<String,Object> mysqlType;

    private String sql;

    private Long id;

    private Boolean isDdl;


    public String getDatabase() {
        return database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public DbOperationType getType() {
        return type;
    }

    public void setType(DbOperationType type) {
        this.type = type;
    }

    public String getEs() {
        return es;
    }

    public void setEs(String es) {
        this.es = es;
    }

    public String getTs() {
        return ts;
    }

    public void setTs(String ts) {
        this.ts = ts;
    }

    public List<Map<String, Object>> getData() {
        return data;
    }

    public void setData(List<Map<String, Object>> data) {
        this.data = data;
    }

    public List<Map<String, Object>> getOld() {
        return old;
    }

    public void setOld(List<Map<String, Object>> old) {
        this.old = old;
    }

    public Map<String, Integer> getSqlType() {
        return sqlType;
    }

    public void setSqlType(Map<String, Integer> sqlType) {
        this.sqlType = sqlType;
    }

    public Map<String, Object> getMysqlType() {
        return mysqlType;
    }

    public void setMysqlType(Map<String, Object> mysqlType) {
        this.mysqlType = mysqlType;
    }

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public Boolean getDdl() {
        return isDdl;
    }

    public void setDdl(Boolean ddl) {
        isDdl = ddl;
    }

    @Override
    public String toString() {
        return "MsgDto{" +
                "database='" + database + '\'' +
                ", table='" + table + '\'' +
                ", type=" + type +
                ", es=" + es +
                ", ts=" + ts +
                ", data=" + data +
                ", old=" + old +
                ", sqlType=" + sqlType +
                ", mysqlType=" + mysqlType +
                ", sql='" + sql + '\'' +
                ", id=" + id +
                ", isDdl=" + isDdl +
                '}';
    }
}



public enum DbOperationType {
    DELETE,
    INSERT,
    UPDATE,
    ALTER
}


待优化

  1. 为了防止canal写入kafka中的消息乱序,在canal中只发送到一个partition

canal.mq.partition=0

  1. spark streaming在消费消息的时候,是自动提交的,这样会漏消息
  2. 经过测试,通过impala插入到Kudu中存在性能瓶颈,TPS达到500的时候存在消息堆积,解决办法是直接调用kudu client插入而不是用impala
  • 本文作者: Randy
  • 本文链接: http://www.itrensheng.com/archives/基于canal+kafka+spark streaming+kudu的实时数仓搭建
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Phoenix # HBase # Kudu # ElasticSearch # Spring # 数据结构 # 操作系统 # Kettle # Azkaban # Sqoop # Hive # Yarn # Redis # Mybatis # Impala # Cloudera # 大数据 # HDFS # mycat # shell # Linux # 架构 # 并发 # mysql # sql # golang # java # 工具 # spark # kafka # 人生
关于Mysql查询varchar和bigint类型不匹配的问题
Apache Kafka系列(一) 起步
  • 文章目录
  • 站点概览
Randy

Randy

技术可以暂时落后,但任何时候都要有上进的信念

80 日志
27 分类
31 标签
RSS
Github E-mail
Creative Commons
© 2021 备案号:沪ICP备19020689号-1
Randy的个人网站