一. 什么是Phoenix
Phoenix,中文译为“凤凰”,官网地址:http://phoenix.apache.org/ Phoenix是由saleforce.com开源的一个项目,后又捐给了Apache基金会。它相当于一个Java中间件,提供jdbc连接,操作hbase数据表。Phoenix是一个HBase的开源SQL引擎。你可以使用标准的JDBC API代替HBase客户端API来创建表,插入数据,查询你的HBase数据。Phoenix的团队用了一句话概括Phoenix:'We put the SQL back in NoSQL' 意思是:我们把SQL又放回NoSQL去了!这边说的NoSQL专指HBase,意思是可以用SQL语句来查询Hbase,你可能会说:“Hive和Impala也可以啊!”。但是Hive和Impala还可以查询文本文件,Phoenix的特点就是,它只能查Hbase,别的类型都不支持!但是也因为这种专一的态度,让Phoenix在Hbase上查询的性能超过了Hive和Impala。
以下是Phoenix和Hive在Hadoop(HDFS和HBase)上查询性能对比:
以下是Phoenix和Impala的对比图
在四个场景中,Hive On HBase最差,phoenix(key filter)最快,究其原因就是phoenix(key filter)走了index,查询时间基本在1s以内。另外,Hive on HDFS跟phoenix on HDFS对比,在5000w条数据量左右是分界线,时间在20s左右,其实hive启动时间都在5s左右,再跑一个MR/TEZ作业都在10s左右(后续有一定的优化)
二. 为什么使用Phoneix
首先要从HBase说起。Hbase是运行在Hadoop上的NoSQL数据库,它是一个分布式的和可扩展的大数据仓库,也就是说HBase能够利用HDFS的分布式处理模式,并从Hadoop的MapReduce程序模型中获益。
HBase的优点:
- 半结构化或非结构化数据:
对于数据结构字段不够确定或杂乱无章非常难按一个概念去进行抽取的数据适合用HBase,因为HBase支持动态添加列。 - 记录很稀疏:
RDBMS的行有多少列是固定的。为null的列浪费了存储空间。HBase为null的Column不会被存储,这样既节省了空间又提高了读性能。 - 多版本号数据:
依据Row key和Column key定位到的Value能够有随意数量的版本号值,因此对于须要存储变动历史记录的数据,用HBase是很方便的。比方某个用户的Address变更,用户的Address变更记录也许也是具有研究意义的。 - 仅要求最终一致性:
对于数据存储事务的要求不像金融行业和财务系统这么高,只要保证最终一致性就行。(比如HBase+elasticsearch时,可能出现数据不一致) - 高可用和海量数据以及很大的瞬间写入量:
WAL解决高可用,支持PB级数据,put性能高,适用于插入比查询操作更频繁的情况。比如,对于历史记录表和日志文件。(HBase的写操作更加高效) - 业务场景简单:
不需要太多的关系型数据库特性,列入交叉列,交叉表,事务,连接等
HBase的缺点:
- 单一RowKey固有的局限性决定了它不可能有效地支持多条件查询[2]
- 不适合于大范围扫描查询
- 不直接支持 SQL 的语句查询
Phoenix在HBase基础上实现了更多实用性的功能,正好弥补了HBase的不足:
- 实现了二级索引来提升非主键字段查询的性能。
- 统计相关数据来提高并行化水平,并帮助选择最佳优化方案。
- 跳过扫描过滤器来优化IN,LIKE,OR查询。
- 优化主键的来均匀分布写压力。
三. Phoenix架构
Phoenix是构建在HBase之上的SQL引擎。你也许会存在“Phoenix是否会降低HBase的效率?”或者“Phoenix效率是否很低?”这样的疑虑,事实上并不会,Phoenix通过以下方式实现了比你自己手写的方式相同或者可能是更好的性能:
- 编译你的SQL查询为原生HBase的scan语句。
- 检测scan语句最佳的开始和结束的key。
- 精心编排你的scan语句让他们并行执行。
- 推送你的WHERE子句的谓词到服务端过滤器处理。
- 执行聚合查询通过服务端钩子(称为协同处理器)
四. Phoenix常用功能
1. 建表
在Phoenix中建表,底层会在HBase中创建一张表
create table "DATA_REPORT"(
"row" varchar primary key,
"id" varchar,
"app_id" varchar,
"command_id" integer,
"create_time" varchar,
"ip" varchar,
"user_id" varchar,
"device_id" varchar,
"partition_key" integer
);
2. 创建视图
Phoenix中的视图是需要基于HBase表或者Phoenix表的,如下在HBase中有以下的表:
hbase(main):003:0> describe 'DATA_REPORT'
Table DATA_REPORT is ENABLED
DATA_REPORT
COLUMN FAMILIES DESCRIPTION
{NAME => 'F', VERSIONS => '1', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETED_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => '3600 SECONDS (1 HOUR)', MIN_VERSIONS =
> '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'false', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE => 'true', BLOCKSIZE =>
'65536'}
1 row(s)
则在Phoenix中可以创建对应的view,其中的F是HBase的列族:
create view "DATA_REPORT"(
"row" varchar primary key,
"F"."id" varchar,
"F"."app_id" varchar,
"F"."command_id" integer,
"F"."create_time" varchar,
"F"."ip" varchar,
"F"."user_id" varchar,
"F"."device_id" varchar,
"F"."partition_key" integer
);
3. 创建索引
create index "idx_drh_ctime_uid" on "DATA_REPORT"("F"."create_time","F"."user_id");
create index "IDX_DRH_CTIME_APP_USER" on "DATA_REPORT"("F"."create_time","F"."app_id","F"."user_id") ASYNC SALT_BUCKETS=15 ;
create index "IDX_DRH_APP_CTIME_ADID" on "DATA_REPORT"("F"."app_id", "F"."create_time", "F"."i") split on ('a','b','ba0','ba11','ba12','ba13','ba14','ba15','ba16','ba17','ba18','ba2','ba3','ba4','ba5','ba6','ba7','ba8','ba9','c','d','e','f','g','h','i','j','k','l','m','n','o','p','q','r','s','t','u','v','w','x','y','z');
4. SQL On Phoenix
SELECT /*+ index(data_report_hbase,idx_drh_ctime_uid_hc)*/
"user_id",
"create_time",
"header",
"content"
from "data_report_hbase"
WHERE 1=1 and "create_time" >= TO_CHAR( TO_NUMBER(TO_TIMESTAMP('2020-06-09 12:27:54', 'yyyy-MM-dd HH:mm:ss','GMT+8')) ,'#############')
and "create_time" <= TO_CHAR( TO_NUMBER(TO_TIMESTAMP('2020-06-09 14:27:54', 'yyyy-MM-dd HH:mm:ss','GMT+8')) ,'#############')
LIMIT 1000
五. Phoenix UDF开发(以GET_JSON_OBJECT为例)
详细的文档请参考官网:https://phoenix.apache.org/udf.html
1. 配置 allowUserDefinedFunctions
首先需要修改phoenix.functions.allowUserDefinedFunctions为true否则会报以下错误:
Error: ERROR 6003 (42F03): User defined functions are configured to not be allowed. To allow configure phoenix.functions.allowUserDefinedFunctions to true. (state=42F03,code=6003)
java.sql.SQLException: ERROR 6003 (42F03): User defined functions are configured to not be allowed. To allow configure phoenix.functions.allowUserDefinedFunctions to true.
如果在CDH所创建的Phoenix实例中可以做如下修改:
2. 开发
大家都知道GET_JSON_OBJECT是Hive和Impala中很常用的一个UDF,但是Phoenix中目前不提供这个方法,需要我们自己开发这个UDF
定义个类继承自org.apache.phoenix.expression.function.ScalarFunction,并通过BuiltInFunction注解来定义这个UDF的名称。同时需要复写:evaluate,getDataType和getName三个方法,实例如下:
@BuiltInFunction(name = GetJsonObjectFunction.NAME, args = {
@Argument(allowedTypes = {PVarchar.class}),
@Argument(allowedTypes = {PVarchar.class})})
public class GetJsonObjectFunction extends ScalarFunction {
private static final Pattern PATTERNKEY = Pattern.compile("([^\\[\\]]+)(\\[([0-9]+|\\*)\\])*?");
private static final Pattern PATTERNINDEX = Pattern.compile("\\[([0-9]+|\\*)\\]");
public static final String NAME = "GET_JSON_OBJECT";
public GetJsonObjectFunction() {}
public GetJsonObjectFunction(List<Expression> children) throws SQLException {
super(children);
}
@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
Expression jsonStringExpression = getChildren().get(0);
String jsonPath = getJsonPath();
if (!jsonStringExpression.evaluate(tuple, ptr)) {
return false;
}
if (ptr.getLength() == 0) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return true;
}
if (jsonPath == null || jsonPath.isEmpty() || !jsonPath.startsWith("$")) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return true;
}
String jsonString = (String) PVarchar.INSTANCE.toObject(ptr, jsonStringExpression.getSortOrder());
Gson gson = new Gson();
JsonElement extractJson = null;
try {
extractJson = gson.fromJson(jsonString, JsonElement.class);
} catch (JsonParseException e) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return true;
}
jsonPath = jsonPath.substring(1);
String[] pathExpr = jsonPath.split("\\.", -1);
for (String path : pathExpr) {
if (extractJson == null) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return true;
}
extractJson = extract(extractJson, path);
}
String resultJsonString = null;
if (extractJson != null) {
try {
resultJsonString = gson.toJson(extractJson);
} catch (Exception e) {
ptr.set(ByteUtil.EMPTY_BYTE_ARRAY);
return true;
}
}
ptr.set(PVarchar.INSTANCE.toBytes(resultJsonString));
return true;
}
@Override
public PDataType getDataType() {
return PVarchar.INSTANCE;
}
@Override
public String getName() {
return NAME;
}
public String getJsonPath() {
Expression jsonPathExpression = getChildren().get(1);
if (jsonPathExpression instanceof LiteralExpression) {
Object jsonPathValue = ((LiteralExpression) jsonPathExpression).getValue();
if (jsonPathValue != null) {
return jsonPathValue.toString();
}
}
return null;
}
private JsonElement extract(JsonElement jsonElement, String path) {
if (jsonElement == null) {
return null;
}
Matcher mKey = PATTERNKEY.matcher(path);
if (mKey.matches() == Boolean.TRUE) {
jsonElement = extractKey(jsonElement, mKey.group(1));
if (jsonElement == null) {
return null;
}
}
Matcher mIndex = PATTERNINDEX.matcher(path);
while (mIndex.find()) {
jsonElement = extractIndex(jsonElement, mIndex.group(1));
if (jsonElement == null) {
return null;
}
}
return jsonElement;
}
private JsonElement extractKey(JsonElement json, String key) {
if (json.isJsonObject()) {
return ((JsonObject) json).get(key);
} else if (json.isJsonArray()) {
JsonArray jsonArray = new JsonArray();
for (int i = 0; i < ((JsonArray) json).size(); i++) {
JsonElement jsonElement = ((JsonArray) json).get(i);
if (jsonElement.isJsonObject()) {
jsonElement = ((JsonObject) jsonElement).get(key);
} else {
continue;
}
if (jsonElement == null) {
continue;
}
if (jsonElement.isJsonArray()) {
for (int j = 0; j < ((JsonArray) jsonElement).size(); j++) {
jsonArray.add(((JsonArray) jsonElement).get(i));
}
} else {
jsonArray.add(jsonElement);
}
}
if (jsonArray.size() == 0) {
return null;
}
return jsonArray;
} else {
return null;
}
}
private JsonElement extractIndex(JsonElement jsonArray, String index) {
if (jsonArray.isJsonObject()) {
return null;
}
if (index.equals("*")) {
return jsonArray;
} else {
try{
return ((JsonArray) jsonArray).get(Integer.parseInt(index));
} catch(IndexOutOfBoundsException e) {
return null;
}
}
}
}
3. 打包并激活
打为jar包后,上传到hdfs上并在phoenix中绑定这个UDF
CREATE FUNCTION GET_JSON_OBJECT(varchar,varchar) returns varchar as 'com.itrensheng.phoenix.udf.GetJsonObjectFunction' using jar 'hdfs://data1:8020/hbase/lib/phoenix_udf-1.0.0.jar';