mapr

在分布式环境中,使用 mapr 语句可以定义 用户自定义聚合函数(UDAF) 的 map reduce 实现。此外,若要在 cgroup by 中使用用户自定义聚合函数,亦需要在 mapr 语句中进行声明。

语法

mapr udaf(args..){mapF1, mapF2, ..., mapFn -> reduceFunc[; cumF1, cumF2, ..., cumFn -> runningReduceFunc]}

参数

udaf(args…) 是用户自定义函数的函数名和参数。

mapF1~mapFn 是 map 函数,该部分计算将被 map 到各个数据节点进行。

reduceFunc 是 reduce 函数,map 的计算结果将被传入 reduceFunc 用于进一步计算。

[] 部分为可选参数,用于自定义聚合函数的 cgroup by 实现。cgroup(cumulative group)即累积分组,常用于按时间维度分组后累积计算每个组的统计值。与 group by 不同的是,将数据分组后,group by 每组的数据单独进行计算,而 cgroup by 每组参与计算的数据包含当前组和其之前所有组的数据,详见 cgroup by。

cumF1~cumFn 与 mapF1~mapFn 一一对应,用于对 map 的结果进行进一步计算。cumF1~cumFn 通常指定为内置的累积函数,如 cumsum;或者指定为内置的 copy 函数,将 map 的结果直接传给 runningReduceFunc,然后在 runningReduceFunc 中定义累积聚合的逻辑。

注意:mapr 语句及相关的函数需要添加到 init 配置项指定的系统初始化脚本(默认 dolphindb.dos)中,重启 server 后才会生效。

例子

定义一个计算调和平均数的自定义聚合函数,调和平均数对应公式如下:

$$ H = \frac{n}{\frac{1}{x_1}+\frac{1}{x_1}+…+\frac{1}{x_1}} $$

上述公式对 \(x_n\) 的倒数求平均后再取倒数。基于 mapr,将求倒数平均的部分 map 到各个分区并行计算,最后在 reduce 阶段,用加权平均汇总结果,然后再求一次倒数即可。

// 先声明普通的聚合函数
$ defg harmonicMean(x){
$     return x.reciprocal().avg().reciprocal()
$ }

// 分布式实现
// 先定义一个 map 函数 reciprocalAvg,用于求倒数的平均
$ def reciprocalAvg(x) : reciprocal(x).avg()

// 定义一个 reduce 函数 harmonicMeanReduce 用于进行 reduce 部分汇总和倒数计算
$ defg harmonicMeanReduce(reciprocalAvgs, counts) : wavg(reciprocalAvgs, counts).reciprocal()

// mapr 语句,声明 harmonicMean 的 map-reduce 实现
$ mapr harmonicMean(x){reciprocalAvg(x), count(x) -> harmonicMeanReduce}

将上述脚本追加到 init 指定的初始化脚本中。重启 server,然后在客户端运行以下脚本:

// 定义一个普通聚合函数,用于和 mapr 实现进行对比
$ defg harmonicMean_norm(x){
$     return x.reciprocal().avg().reciprocal()
$ }

// 创建分区内存表
$ n = 100000
$ t = table((2020.09.01 + rand(9, n)).sort!() as date, take(`IBM`MSFT, n) as sym, rand(10.0, n) as value)
$ db = database("", RANGE, [2020.09.01, 2020.09.03, 2020.09.10])
$ stock = db.createPartitionedTable(t, "stock", "date").append!(t)

// 对比查询分区内存表的耗时
$ timer(1000) select harmonicMean_norm(value) as gval from stock group by sym // Time elapsed: 4932.661 ms
$ timer(1000) select harmonicMean(value) as gval from stock group by sym // Time elapsed: 2490.347 ms

可以看出 map-reduce 的并行计算提升了查询的性能。

在上例的基础上,对 mapr 语句进行改写,定义调和平均数的 cgroup by 实现。

// 定义一个累积聚合函数 harmonicMeanRunning
$ defg harmonicMeanRunning(reciprocalAvgs, counts) : cumwavg(reciprocalAvgs, counts).reciprocal()

// 在上述 mapr 语句的基础上,声明 harmonicMean 的 cgroup by 实现
$ mapr harmonicMean(x){reciprocalAvg(x), count(x) -> harmonicMeanReduce; copy, copy -> harmonicMeanRunning}

将上述脚本继续更新到 init 指定的初始化脚本中,重启 server,然后在客户端运行以下脚本(复用上例创建的分区内存表):

// 在 cgroup by 语句中调用自定义聚合函数 harmonicMean
$ select harmonicMean(value) as gval from stock cgroup by date order by date

更多例子和说明请参考教程:自定义聚合函数