查询数据

DolphinDB支持大多数标准SQL语法,我们可以使用SQL语句查询数据库中的数据。

$ db=database("dfs://rangedb")
$ pt=loadTable(db,`pt)
$ select count(x) from pt;

count_x

1000000

与传统的数据库不同,DolphinDB是集数据库、编程语言和分布式计算于一体的系统。数据库和表在DolphinDB中是一个普通变量,并不存在独立的保留空间。因此,每次访问数据库或表时都要使用database或loadTable函数将它们赋予到变量中。

分布式查询(即查询分布式数据表)和普通查询的语法并无差异。理解分布式查询的工作原理有助于写出高效的查询。系统首先根据where子句确定需要的分区,然后重写查询,并把新的查询发送到相关分区所在的位置,最后整合所有分区的结果。

分区剪枝

绝大多数分布式查询只涉及分布式表的部分分区。若where子句中某个过滤条件仅包含分布式表的原始分区字段、关系运算符(<, <=, =, ==, >, >=, in, between)和逻辑运算符(or, and),以及常量(包括常量与常量的运算),且非链式条件(例如100<x<200),且过滤逻辑可以缩窄相关分区范围,则系统只加载与查询相关的分区,以节省查询耗时。若where子句中的过滤条件无一满足此要求,或过滤逻辑无法缩窄分区范围,则会遍历所有分区进行查询。数据量较大时,过滤条件的不同写法会造成查询耗时的巨大差异。

下面的例子可以帮助理解DolphinDB如何缩窄相关分区范围。

$ n=10000000
$ id=take(1..1000, n).sort()
$ date=1989.12.31+take(1..365, n)
$ announcementDate = date+rand(5, n)
$ x=rand(1.0, n)
$ y=rand(10, n)
$ t=table(id, date, announcementDate, x, y)
$ db=database("dfs://rangedb1", RANGE, [1990.01.01, 1990.03.01, 1990.05.01, 1990.07.01, 1990.09.01, 1990.11.01, 1991.01.01])
$ pt = db.createPartitionedTable(t, `pt, `date)
$ pt.append!(t);

以下类型的查询可以在加载和处理数据前缩小分区范围:

$ x = select max(x) from pt where date>1990.12.01-10;

系统确定,只有一个分区与查询相关:[1990.11.01, 1991.01.01)。

$ select max(x) from pt where date between 1990.08.01:1990.12.01 group by date;

系统确定,只有三个分区与查询相关:[1990.07.01, 1990.09.01)、[1990.09.01, 1990.11.01)和[1990.11.01, 1991.01.01)。

$ select max(x) from pt where y<5 and date between 1990.08.01:1990.08.31;

系统确定,只有一个分区与查询相关:[1990.07.01, 1990.09.01)。注意,系统忽略了y<5的条件。加载了相关分区后,系统会根据y<5的条件进一步筛选数据。

以下查询不能确定相关分区。如果pt是数据量非常大的分区表,查询会耗费大量时间,因此应该尽量避免以下写法。

$ select max(x) from pt where date+30>2019.12.01;
//不可对分区字段进行运算

$ select max(x) from pt where 2019.12.01<date<2019.12.31;
//不可使用链式比较

$ select max(x) from pt where month(date)<=2019.12M;
//不可对分区字段使用函数

$ select max(x) from pt where y<5;
//至少有一个过滤条件需要使用分区字段

$ select max(x) from pt where date<announcementDate-3;
//与分区字段比较时仅可使用常量,不可使用其他列

$ select max(x) from pt where y<5 or date between 1990.08.01:1990.08.31;
//由于必须执行y<5,过滤逻辑无法缩窄相关分区范围

查询优化

DolphinDB在分布式表上执行SQL操作时,会在每个分区中并发执行SQL语句。在分区剪枝场景下,某些分区内的数据完全涵盖在过滤条件筛选的数据范围内;此时在该分区内执行这条过滤语句没有意义,且增加了分区扫描的开销。DolphinDB支持在分区的子查询上删除无意义的过滤条件,从而达到查询性能的优化。

$ select max(x) from pt where date between 1990.08.21:1990.12.25;

由于该数据库按照两个月为一个范围进行分区,因此只需要在1990.07.01-08.31分区和1990.11.01-12.31两个分区内执行该select语句中的between语句筛选符合条件的数据,1990.09.01-10.31这个分区的子查询内可以删除between的筛选操作。

需要使用MapReduce的分布式查询的实现

当发生下述两种情况时,系统将重写分布式查询:一种情况是使用 order by 子句时; 另一种情况是使用聚合函数时,分组列不是分区列。

当分区列是第一个分组列时,系统只需对每个相关分区执行查询,然后合并各个查询结果。

当分区列不是第一个分组列时,系统使用Map-Reduce方法来实现分布式查询。它首先在Map-Reduce定义中搜索聚合函数,然后根据聚合函数的mapr定义将原始查询重写到map查询,并将map查询发送到每个相关的分区执行,最后执行reduce查询以合并结果。

$ select avg(x) from t where id>200 and id<900 group by date;

// the partition column of table t is column id rather than column date

对于上面的示例,map查询将执行以下操作:

$ tempTable = select sum(x) as col1, count(x) as col2 from t where id>200 and id<900 group by date;

reduce查询将进行以下操作:

$ select wavg(col1, col2) as avg_x from tempTable group by date;

并不是所有的分布式查询都可以这样重写,比如分布式表的中值计算。详情请参考分布式计算。

系统会判断内置函数是否为聚合函数。当用户定义自己的聚合函数时,他们必须使用关键字defg来告诉系统它是一个聚合函数,而不是使用def。如果我们使用关键字def定义聚合函数,并将该函数应用于分布式查询,则可能会收到错误的结果或异常。

DolphinDB允许在SQL查询中使用用户定义的函数(UDF)或用户定义的聚合函数(UDAF)。用户可以简单地定义一个函数,然后在查询中即时使用它,而无需编译或部署。然而,分布式查询的实现与普通查询的实现略有不同。系统自动检查UDF或UDAF是否存在分布式查询中。如果系统检测到UDF或UDAF,则它将它们和相关UDF或UDAF序列化,与查询一起发送到远程站点。这种复杂的检查和序列化过程对用户是不可见的。与其他系统相比,这是DolphinDB提供的独特功能之一。

DolphinDB不支持在分布式查询的where子句中使用聚合函数,如sum或count。这是因为在使用Map-Reduce函数执行聚合函数之前,分布式查询要使用where子句来选择相关分区。如果聚合函数出现在where子句中,则分布式查询不能选择相关的分区。如果我们需要在分布式查询的where子句中使用聚合函数,我们可以编写新的分布式查询来计算这些聚合函数的值,将这些值分配给某些变量,并在原始分布式查询中引用这些变量。

DolphinDB的SQL语句支持使用上下文中的变量。使用分布式SQL语句时,系统会自动把本地节点上的变量值复制到需要的远程节点上。这也是DolphinDB相较于别的系统的一个优点。

更多SQL功能请参考 第 8 章:SQL 语句