IT人生

  • 首页
  • 归档
  • kafka
  • Java
  • Spring
  • Golang
  • SQL
  • Spark
  • ElasticSearch
  • 关于

  • 搜索
Phoenix HBase Kudu ElasticSearch Spring 数据结构 操作系统 Kettle Azkaban Sqoop Hive Yarn Redis Mybatis Impala Cloudera 大数据 HDFS mycat shell Linux 架构 并发 mysql sql golang java 工具 spark kafka 人生

记一次千万级DAU产品的Mysql插优化入性能提升10倍

发表于 2019-11-24 | 分类于 数据库 | 0 | 阅读次数 1835

系统简介

参与创业的第二年,产品的DAU终于突破了1000万,对于一个后端&大数据开发来说,这是一件多么刺激的事情。新上线的一个功能是客户端的埋点,用于产品和用户行为分析,数据丢失不敏感。 目前客户端接入层的大概实现如下:(大数据计算层的先省略掉,以后再介绍) 3.png 其中客户端接入层api是部署了6台服务器,kafka topic是12个partition,MySQL是3个Master

上线后的问题

下午4点10分打开了客户端的自动升级以后,到下午7点之前系统并未见任何异常。但是7点之后线上kafka出现告警,当天kafka consumer group消费的埋点topic出现lag大于50万,且lag值在继续扩大。也就是说线上的kafka消费性能出现了瓶颈,导致线上kafka数据的消费存在积压。

1.我随即查看了阿里云slb对应的流量走势图如下:

2.png 从上面的走势图可以看出来,流量从4点10分打开客户端自动升级至7点半的时间段之内,QPS达到了2000,甚至一度将要冲破4000。

2.我接下来查看了应用服务器,监控走势图如下:

5.png 我浏览了6台应用服务器,监控走势都差不多如上图,也就是瓶颈不在应用服务器

3.我再看了一下线上mysql所在物理机的监控图如下:

4.png 可以看出mysql服务器的cpu有所增长,但是在可控范围之内,而内存占用也就差不多20%。很奇怪的两张图是磁盘的读写字节数和读写请求数,从右下角的两张图可以看出来,流量刚开始增长的时候,磁盘是只有写入,没有读取的(而这是服务埋点系统的,因为所有的mysql操作都是insert,并没有select)。但是接下来从7点半开始,磁盘的读请求书激增,同事读字节数甚至超过了写入字节数。(而这点也恰恰是我忽略的一点)

4.kafka consumer消费代码同步改为异步

目前线上kafka是同步消费,且手动提交offset的方式,具体代码如下:

@KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC)
public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment){
    try {
        String key = consumerRecord.key().toString();
        String message = consumerRecord.value().toString();
        int partition = consumerRecord.partition();
        long offset = consumerRecord.offset();
        int runTimes = SystemConstant.CONSUMER_ITERATE_TIMES;
        boolean commitOffsets = false;
        while (!commitOffsets) {
            runTimes--;
            log.info(String.format("consum partition{%s}, offset{%s}, key{%s}",partition,offset,key));
            commitOffsets = invokeLand(message);
            if(!commitOffsets && runTimes <=0){
                commitOffsets = true;
            }
        }
        if (commitOffsets) {
            //处理成功则提交offset
            acknowledgment.acknowledge();
        }
    }catch (Exception ex){
        log.error("receive throw exception",ex);
    }
}

以上代码是同步消费kafka的方式,我先改动为异步消费如下:

    static class NameTreadFactory implements ThreadFactory {
        private final AtomicInteger mThreadNum = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "data-consumer-" + mThreadNum.getAndIncrement());
            return t;
        }
    }
    private ExecutorService executorService =new ThreadPoolExecutor(10, 200,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), new NameTreadFactory(), new ThreadPoolExecutor.AbortPolicy());

  @KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC)
    public void receive(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment){
        try {
            String message = consumerRecord.value().toString();
            executorService.execute(() ->{
                invokeLand(message);
            });

            acknowledgment.acknowledge();

        }catch (Exception ex){
            log.error("receive throw exception",ex);
        }
    }

5.Kafka lag消息减少

监控了一下系统,发现kafka lag的消息在20分钟作用从70万降到了2000,我分别观察了应用服务器和数据库服务器的情况,此时监控系统并未发现线程池未出现被拒绝的情况各项指标正常,数据库服务器各项指标也正常,也就是说系统负载目前可以支撑当前的QPS。

6.MySQL服务器告警

第二天早上7点半左右收到mysql服务器告警,CPU使用率接近90%。看了一下实时QPS稳定在3000左右。3台数据库服务器的CPU和IO都持续攀升,CPU都将近90%,写入速度每天大概是55M/s,读取速度大概是110M/s。(我第二次忽略了关键问题,为何只有insert的系统,读取速度是写入熟读的2倍) 6.png

7.改进kafka为批量消费

从4中可以看到,此时insert到数据库依然是单笔插入的,现在改为批量插入

 @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(pollTimeout);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 3000);
        propsMap.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024);
        propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50 * 1024);
        propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 3000);
        return propsMap;
    }

 @KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC,containerFactory = "kafkaListenerContainerFactory")
    public void receive(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment){
        try {
             ...
            acknowledgment.acknowledge();
        }catch (Exception ex){
            log.error("receive throw exception",ex);
        }
    }

其中的mapper改为了
    @Insert({
            "<script>",
            "INSERT INTO data_report(id, app_id, create_time, command_id, header, user_id, trx_time, content, partition_key) VALUES ",
            "<foreach collection='reportDataDtos' item='item' index='index' separator=','>",
            "(#{item.id}, #{item.appId}, #{item.createTime}, #{item.commandId}, #{item.header,typeHandler=com.datareport.mapper.handler.MybatisJsonTypeHandler}, #{item.userId}, #{item.trxTime}, #{item.content,typeHandler=com.datareport.mapper.handler.MybatisJsonTypeHandler}, #{item.partitionKey})",
            "</foreach>",
            "</script>"
    })
    void batchInsert(@Param(value = "reportDataDtos") List<ReportDataDto> reportDataDtos);

做了如上改动之后,实时监控入库操作,发现批次并不是我所设设定的FETCH_MIN_BYTES_CONFIG和FETCH_MAX_WAIT_MS_CONFIG(线上消息大概每条400Bytes,所以我设定每个批次至少是100条消息)。实际情况是每个批次1到20不等,而数据库服务器的CPU和IO负载有所降低

8.应用服务设定一个队列

根据7中所要设定的批次,虽然也会多笔同时插入数据库,但是数据库IO的请求次数并未达到理性状况。此时在应用服务器做了一个双端阻塞队列,当队列长度达到100的时候,提交一个异步线程到数据库,做批量插入操作。

BlockingDeque<ReportDataDto> blockingDeque = new LinkedBlockingDeque<ReportDataDto>();
  @KafkaListener(topics = KafkaTopicConstant.DATA_REPORT_KAFKA_TOPIC,containerFactory = "kafkaListenerContainerFactory")
    public void receive(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment){
        try {
            for(ConsumerRecord<String, String> consumerRecord : consumerRecords){
                String message = consumerRecord.value();
                if(StringUtils.isNotEmpty(message)){
                    ReportDataDto reportDataDto = GsonUtil.build().fromJson(message,ReportDataDto.class);
                    if(blockingDeque.size() >= 100){
                        
                        final List<ReportDataDto> pendingList = new LinkedList<ReportDataDto>();
                        while (!blockingDeque.isEmpty()){
                            pendingList.add(blockingDeque.takeFirst());
                        }
                        executorService.execute(() -> {
                            invokeLand(pendingList);
                        });
                        blockingDeque.putLast(reportDataDto);
                    }else {
                        blockingDeque.putLast(reportDataDto);
                    }
                }
            }
            acknowledgment.acknowledge();
        }catch (Exception ex){
            log.error("receive throw exception",ex);
        }
    }

监控数据库服务器,各项指标有所降低,CPU使用率从80%降到了60%,读写请求次数降低了一半。 7.png 此时,从读写字节数的走势图来看,读请求的字节数依然是写请求的两倍(第三次,终于引起了我的注意)。从3中的截图看出来,读请求并非是随着写入请求同步持续走高的,中间是存在一个差值的时间段,大概是从7点到9点这1个小时的时间 8.png

9.上面的8中数据库读字节数为何在一个小时的时间之内激增,甚至超过了写字节数?

查看系统的写入,其中主键使用的是一个char类型的UUID(至于为什么用一个char类型作为主键,是因为消费者写入的不仅仅是mysql,还要写入到mongodb和HBase中供其他系统使用),代码如下:

            String snowId = String.valueOf(UUID.randomUUID()).replace("-", "");

InnoDB采用的是B+ 树的数据结构,而B+ 树为了维护索引有序性,在插入新值的时候需要做必要的维护。如果表ID采用自增主键的插入数据模式,每次插入一条新记录,都是追加操作,都不涉及到挪动其他记录,也不会触发叶子节点的分裂。但是现在使用的是UUID,不仅会使B+数页分裂,还影响数据页的利用率。原本放在一个页的数据,现在分到两个页中,整体空间利用率降低大约 50%。 那改造的目标就是使ID保持一个自增长的方式,综合考虑,使用雪花算法来替换UUID的方案,代码改动如下:

            String snowId = SnowflakeIdUtil.getInstance().nextId();

监控系统截图如下: 9.png 从监控可以看出,CPU再次从70%降到了10%,而读写字节数也从47M/s降到了4M/s。综合来看,雪花算法替换UUID的方案,在高并发场景下,性能有10倍提升

  • 本文作者: Randy
  • 本文链接: http://www.itrensheng.com/archives/mysql_optimize_id_index
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Phoenix # HBase # Kudu # ElasticSearch # Spring # 数据结构 # 操作系统 # Kettle # Azkaban # Sqoop # Hive # Yarn # Redis # Mybatis # Impala # Cloudera # 大数据 # HDFS # mycat # shell # Linux # 架构 # 并发 # mysql # sql # golang # java # 工具 # spark # kafka # 人生
redis zset内部实现
pyspark查询基于hbase的hive external表异常:Class org.apache.hadoop.hive.hbase.HBaseSerDe not found
  • 文章目录
  • 站点概览
Randy

Randy

技术可以暂时落后,但任何时候都要有上进的信念

80 日志
27 分类
31 标签
RSS
Github E-mail
Creative Commons
© 2021 备案号:沪ICP备19020689号-1
Randy的个人网站