简介
canal是一款mysql binlog日志解析工具。通过canal可以把mysql的所有变更都通过canal发送到kafka中,而spark streaming可以通过订阅kafka消息来做实时数据仓库的构建。 以下只是一个demo,不具备生产环境需求
架构
canal消息
canal消息是json格式的,如下为insert格式的kafka消息,
- type是指这条binlog对应的sql类型,可选值有:DELETE,INSERT,UPDATE,ALTER 这几种。
- database指所操作的数据库
- table指表名称
- sqlType是mysql在binlog内部映射的类型
- data是插入的字段以及值
- es是binlog中计入的时间戳
- ts是canal解析到kafka消息时候的时间戳
{
"data":[
{
"id":"1107acd5427642e5a5709a379dd6d26a",
"create_time":"2018-02-19 17:01:03",
"command_id":"10061",
"header":"{ "auth": "558307773617872896", "userAgent": "Mozilla/5.0 (Linux; Android 8.0.0; PRA-AL00X Build/HONORPRA-AL00X; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/70.0.3538.110 Mobile Safari/537.36 MicroMessenger/7.0.9.1560(0x27000935) Process/appbrand0 NetType/WIFI Language/zh_CN ABI/arm64"}",
"user_Id":null,
"trx_time":"2018-02-19 17:01:02",
"content":"{"detail": {"errMsg": "no advertisement", "errCode": 1005}}",
"partition_key":"20200219"
}
],
"database":"demo",
"es":1582102862000,
"id":54236666,
"isDdl":false,
"mysqlType":{
"id":"char(32)",
"create_time":"timestamp",
"command_id":"int(20)",
"header":"json",
"user_Id":"varchar(200)",
"trx_time":"timestamp",
"content":"json",
"partition_key":"int(8)"
},
"old":null,
"pkNames":[
"id",
"partition_key"
],
"sql":"",
"sqlType":{
"id":1,
"create_time":93,
"command_id":4,
"header":12,
"user_Id":12,
"trx_time":93,
"content":12,
"partition_key":4
},
"table":"report_data",
"ts":1582102863159,
"type":"INSERT"
}
代码实现
流程是spark streaming消费kafka的消息,并通过impala客户端插入到数据库中。
spark streaming消费kafka部分
SparkConf conf = new SparkConf();
conf.setMaster("local[4]");
conf.setAppName("transfer App");
conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
conf.set("spark.default.parallelism", "1");
//kafka
Map<String, Object> kafkaParams = new HashMap<>(10);
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVER);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "canal2kudu");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//可能会丢消息,需要改为手动提交 kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
//kafka topic
//streaming Batch执行的频率
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(1));
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicList, kafkaParams)
);
//一次遍历RDD中记录
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
consumerRecordJavaRDD.foreach(new VoidFunction<ConsumerRecord<String, String>>() {
@Override
public void call(ConsumerRecord<String, String> record) throws Exception {
String recordVal = record.value();
if (StringUtils.isEmpty(recordVal)) {
return;
}
try {
MsgDto msgDto = new Gson().fromJson(recordVal, MsgDto.class);
if (null == msgDto) {
return;
}
//目前统一把实时表放到kudurr下
String dbName = msgDto.getDatabase();
String tableName = dbName.replace("-","_") + "_" + msgDto.getTable();
String fullTableName = "kudurealtime." + tableName;
Map<String, Integer> sqlTypeMap = msgDto.getSqlType();
List<Map<String, Object>> dataMapList = msgDto.getData();
DbOperationType operationType = msgDto.getType();
ImpalaSink impalaSink = new ImpalaSink();
int loopTimes = 1;
while (true){
boolean execResult = impalaSink.executeImpala(IMPALA_JDBC_URL, fullTableName, operationType, dataMapList, sqlTypeMap);
if(execResult){
break;
}
loopTimes++;
if(loopTimes > LOOP_SIZE){
break;
}
}
} catch (Exception ex) {
logger.error("process{" + recordVal + "}", ex);
}
}
});
}
});
//启动流
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (Exception e) {
logger.error(e);
}
}
解析kafka消息为impala sql(为了演示直接调用jdbc)
public boolean executeImpala(String jdbcUrl,String tableName, DbOperationType operationType, List<Map<String,Object>> dataList,Map<String, Integer> sqlTypeMap){
boolean processResult = false;
if(CollectionUtils.isEmpty(dataList)){
return true;
}
List<String> execSql = Lists.newArrayList();
switch (operationType){
case INSERT:
execSql = parseInsertSql(tableName,dataList,sqlTypeMap);
break;
case UPDATE:
execSql = parseUpdateSql(tableName,dataList);
break;
case DELETE:
execSql = parseDeleteSql(tableName,dataList);
break;
default:
break;
}
if(CollectionUtils.isEmpty(execSql)) {
return true;
}
//logger.info(execSql);
if("kudurealtime.futures_account".equals(tableName)){
logger.info(execSql);
}
//JDBC执行
Connection connectionn = null;
Statement statement = null;
try {
Class.forName("com.cloudera.impala.jdbc41.Driver");
connectionn = DriverManager.getConnection(jdbcUrl); //生产
statement = connectionn.createStatement();
for (String item : execSql) {
statement.execute(item);
}
processResult = true;
}catch (Exception ex){
logger.error(this,ex);
}finally {
if(null != statement){
try {
statement.close();
}catch (Exception ex){
logger.error(this,ex);
}
}
if(null != connectionn) {
try {
connectionn.close();
}catch (Exception ex){
logger.error(this,ex);
}
}
}
return processResult;
}
private List<String> parseInsertSql(String tableName,List<Map<String,Object>> dataList,Map<String, Integer> sqlTypeMap){
List<String> resultList = Lists.newArrayList();
for (Map<String,Object> mapItem :dataList){
if(MapUtils.isEmpty(mapItem)){
continue;
}
StringBuilder keySB = new StringBuilder();
StringBuilder valSB = new StringBuilder();
//遍历K,V对
for (String keyItem: mapItem.keySet()) {
Object valueObj = mapItem.get(keyItem);
String value = rebuildVal(keyItem,valueObj,sqlTypeMap);
if(StringUtils.isNotEmpty(value)) {
valSB.append(value + ",");
keySB.append(keyItem + ",");
}
}
//去掉最后的逗号
keySB.deleteCharAt(keySB.length() - 1);
valSB.deleteCharAt(valSB.length() - 1);
resultList.add(String.format("insert into %s(%s) values(%s);",tableName,keySB.toString(),valSB.toString()));
}
return resultList;
}
映射kafka消息的dto
public class MsgDto {
private String database;
private String table;
private DbOperationType type;
private String es;
private String ts;
private List<Map<String,Object>> data;
private List<Map<String,Object>> old;
private Map<String,Integer> sqlType;
private Map<String,Object> mysqlType;
private String sql;
private Long id;
private Boolean isDdl;
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public DbOperationType getType() {
return type;
}
public void setType(DbOperationType type) {
this.type = type;
}
public String getEs() {
return es;
}
public void setEs(String es) {
this.es = es;
}
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public List<Map<String, Object>> getData() {
return data;
}
public void setData(List<Map<String, Object>> data) {
this.data = data;
}
public List<Map<String, Object>> getOld() {
return old;
}
public void setOld(List<Map<String, Object>> old) {
this.old = old;
}
public Map<String, Integer> getSqlType() {
return sqlType;
}
public void setSqlType(Map<String, Integer> sqlType) {
this.sqlType = sqlType;
}
public Map<String, Object> getMysqlType() {
return mysqlType;
}
public void setMysqlType(Map<String, Object> mysqlType) {
this.mysqlType = mysqlType;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Boolean getDdl() {
return isDdl;
}
public void setDdl(Boolean ddl) {
isDdl = ddl;
}
@Override
public String toString() {
return "MsgDto{" +
"database='" + database + '\'' +
", table='" + table + '\'' +
", type=" + type +
", es=" + es +
", ts=" + ts +
", data=" + data +
", old=" + old +
", sqlType=" + sqlType +
", mysqlType=" + mysqlType +
", sql='" + sql + '\'' +
", id=" + id +
", isDdl=" + isDdl +
'}';
}
}
public enum DbOperationType {
DELETE,
INSERT,
UPDATE,
ALTER
}
待优化
- 为了防止canal写入kafka中的消息乱序,在canal中只发送到一个partition
canal.mq.partition=0
- spark streaming在消费消息的时候,是自动提交的,这样会漏消息
- 经过测试,通过impala插入到Kudu中存在性能瓶颈,TPS达到500的时候存在消息堆积,解决办法是直接调用kudu client插入而不是用impala