IT人生

  • 首页
  • 归档
  • kafka
  • Java
  • Spring
  • Golang
  • SQL
  • Spark
  • ElasticSearch
  • 关于

  • 搜索
Phoenix HBase Kudu ElasticSearch Spring 数据结构 操作系统 Kettle Azkaban Sqoop Hive Yarn Redis Mybatis Impala Cloudera 大数据 HDFS mycat shell Linux 架构 并发 mysql sql golang java 工具 spark kafka 人生

Spark系列-SparkSQL实战

发表于 2019-04-10 | 分类于 spark | 0 | 阅读次数 1380
  • Spark系列-核心概念
  • Spark系列-SparkSQL实战
  • Spark性能优化指南——基础篇
  • Spark性能优化指南——高级篇

之前系统的计算大部分都是基于Kettle + Hive的方式,但是因为最近数据暴涨,很多Job的执行时间超过了1个小时,即使是在优化了HiveQL的情况下也有超过30分钟,所以近期把计算引擎从Hive变更为Spark。

普通的简单Job就使用SparkSQL来计算,数据流是经过spark计算,把结果插入到Mysql中

在项目中新建三个类,第一个Logger类用于日志的输出

# coding=utf-8
import logging
from logging import handlers

class Logger(object):
    leven_relations = {
        'debug':logging.DEBUG,
        'info':logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR
    }

    def __init__(self, fileName, level='info', when='D', backCount=3, fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
        self.logger = logging.getLogger(fileName)
        format_str = logging.Formatter(fmt)
        self.logger.setLevel(self.leven_relations.get(level))
        #屏幕日志
        sh = logging.StreamHandler()
        sh.setFormatter(format_str)
        #文件日志
        th = handlers.TimedRotatingFileHandler(filename=fileName, when=when, backupCount=backCount, encoding='utf-8')
        th.setFormatter(format_str)
        self.logger.addHandler(th)
        self.logger.addHandler(sh)

第二个是SparkSQL公共类,引用的是pyspark

# coding=utf-8

from pyspark import SparkConf,SparkContext
from pyspark.sql import HiveContext

class SparkSqlCommon(object):
    sql_str = ''
    app_name = ''

    def __init__(self, sql, app_name):
        if sql is None:
            raise Exception('sql cannot be empty')
        self.sql_str = sql

        if app_name is None:
            raise Exception('app_name cannot be empty')
        self.app_name = app_name

    def execute(self):
        spark_conf = SparkConf().setAppName(self.app_name)
        spark_context = SparkContext(conf=spark_conf)
        spark_context.setLogLevel("INFO")
        hive_context = HiveContext(spark_context)
        result_rdd = hive_context.sql(self.sql_str)
        result = result_rdd.collect()
        return result

第三个是Mysql公共类,用于把计算结果落地到mysql

# coding=utf-8

import pymysql
from com.randy.common.Logger import Logger

class DatacenterCommon(object):
    sql_str = ''
    jdbcHost = ''
    jdbcPort = ''
    jdbcSchema = ''
    jdbcUserName = ''
    jdbcPassword = ''

    def __init__(self, sql_str,log, jdbcHost = '127.0.0.1', jdbcPort = 3306, jdbcSchema = 'demo', jdbcUserName = 'root', jdbcPassword = '123456'):
        if sql_str is None:
            raise Exception('sql_str cannot be empty')

        self.sql_str = sql_str
        self.jdbcHost = jdbcHost
        self.jdbcPort = jdbcPort
        self.jdbcSchema = jdbcSchema
        self.jdbcUserName = jdbcUserName
        self.jdbcPassword = jdbcPassword
        self.log = log


    def execute(self):
        db = pymysql.connect(host=self.jdbcHost,
                             port=self.jdbcPort,
                             user=self.jdbcUserName,
                             passwd=self.jdbcPassword,
                             db=self.jdbcSchema,
                             charset='utf8')
        try:
            db_cursor = db.cursor()
            db_cursor.execute(self.sql_str)
            db.commit()
        except Exception, e:
            self.log.logger.error('str(e):\t\t', str(e))
            db.rollback()

调用的客户端代码如下

# coding=utf-8
# !/usr/bin/python2.7

import datetime
from com.randy.spark.Logger import Logger
from com.randy.spark.SparkSqlCommon import SparkSqlCommon
from com.randy.spark.DatacenterCommon import DatacenterCommon




#需要修改,每个应用都不一样
app_name = 'demo1'

# SparkSql(不能以分号结尾)
select_sql = '''
                  SELECT count(*) from futures.account
'''

# Mysql
insert_sql = '''
            insert into demo.demo1(id) values({0});
'''


if __name__ == '__main__':
    currentDay = datetime.datetime.now().strftime('%Y%m%d')
    log = Logger('/home/python-big-data-job/log/' + app_name + "_" + str(currentDay) + '.log')
    log.logger.info("**************************start invoke {0},{1} *****************".format(app_name,currentDay))

    sparkSqlCommon = SparkSqlCommon(sql=select_sql,app_name=app_name)
    selectResult = sparkSqlCommon.execute()
    log.logger.info("sparkSqlCommon result:{0}".format(selectResult))
    if selectResult is None:
        log.logger.error("taojin_1 selectResult while is empty")
    else:
        insert_sql = insert_sql.format(selectResult[0][0])
        log.logger.info(insert_sql)
        datacenterCommon = DatacenterCommon(sql_str=insert_sql, log=log)
        datacenterCommon.execute()

        log.logger.info("**************************end invoke {0},{1} *****************".format(app_name, currentDay))

其中spark-submit提交代码如下:

sudo -u hdfs spark-submit --master local[*] --py-files='/home/python-big-data-job/com.zip,/home/python-big-data-job/pymysql.zip' /home/python-big-data-job/taojin/demo1.py

因为项目中使用到了本地文件,所有把三个公共类打包到了com.zip中作为依赖文件

其中pymysql.zip是pymysql的源码文件,因为我在过程中发现了ImportError: No module named pymysql

但是集群已经使用pip安装了pymysql,没有找到有效解决办法,按照https://zhuanlan.zhihu.com/p/43434216和https://www.cnblogs.com/piperck/p/10121097.html都无效,最终只能把pymysql以依赖文件的方式打包

其中使用yarn cluster部署也还存在问题

  • 本文作者: Randy
  • 本文链接: http://www.itrensheng.com/archives/spark_sql_first_try
  • 版权声明: 本博客所有文章除特别声明外,均采用CC BY-NC-SA 3.0 许可协议。转载请注明出处!
# Phoenix # HBase # Kudu # ElasticSearch # Spring # 数据结构 # 操作系统 # Kettle # Azkaban # Sqoop # Hive # Yarn # Redis # Mybatis # Impala # Cloudera # 大数据 # HDFS # mycat # shell # Linux # 架构 # 并发 # mysql # sql # golang # java # 工具 # spark # kafka # 人生
Spark系列-核心概念
Spark性能优化指南——基础篇
  • 文章目录
  • 站点概览
Randy

Randy

技术可以暂时落后,但任何时候都要有上进的信念

80 日志
27 分类
31 标签
RSS
Github E-mail
Creative Commons
© 2021 备案号:沪ICP备19020689号-1
Randy的个人网站