createPartitionedTable

语法

createPartitionedTable(dbHandle, table, tableName, [partitionColumns], [compressMethods], [sortColumns], [keepDuplicates=ALL], [sortKeyMappingFunction])

参数

参数 sortColumns, keepDuplicatessortKeyMappingFunction 仅在 database 的 engine 参数指定为 TSDB 时才有效。

dbHandle database 函数返回的数据库句柄。它可以是本地磁盘数据库,也可以是分布式数据库。dbHandle 为空字符串或没有指定时,表示内存数据库的句柄。

table 一个表或包含多个表的元组。系统将会根据该表的结构创建新的分区表。

tableName 一个字符串,表示的分区表的名称。

partitionColumns 一个字符串或字符串向量,表示分区列。对于组合分区,partitionColumns 是一个字符串向量。对非顺序分区,此参数为必选参数。

compressMethods 一个字典,指定某些列使用 lz4 或者 delta 压缩算法存储。key 为字段名,value 为压缩算法(”lz4”或”delta”)。若未指定,默认采用 lz4 压缩算法。

New in version 2.00.9: 支持 DECIMAL 类型数据进行 delta 压缩

  • 对于SHORT, INT, LONG 与时间或日期类型数据,建议采用 delta 算法压缩。

  • 将字符串存储为 SYMBOL 类型数据,实现对字符串类型的压缩。

sortColumns 字符串标量或向量,用于指定表的排序列,写入的数据将按 sortColumns 进行排序。系统默认 sortColumns (指定多列时) 最后一列为时间列,其余列字段作为排序的索引列,称作 sort key。同一个 sort key 组合值(sort key entry)对应的数据将按时间列顺序连续存放在一起。查询时,若查询条件包含索引列,可以快速定位数据所在的数据块位置,提高查询性能。

  • sortColumns 只能是 integer, temporal, string, 或 symbol 类型。

    • sortColumns 指定为多列,则 sortColumns 的最后一列必须为时间列,其余列为索引列,且索引列不能为为 TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP 类型。

    • sortColumns 仅指定一列,则该列作为 sort key,其类型不能为TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP。若 sortColumns 指定为一列时间列 (非分区列),且同时指定了 sortKeyMappingFunction,则查询的过滤条件中 sortColumns 只能与相同时间类型的值进行比较。

  • 频繁查询的字段适合设置为 sortColumns,且建议优先把查询频率高的字段作为 sortColumns 中位置靠前的列。

  • 为保证性能最优,建议每个分区内索引列的组合数(sort key)不超过1000个。

  • sortColumns 是每个分区内部 level file 内数据的排序依据,与其是否为分区字段无关。

keepDuplicates 指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据。提供以下选项:

New in version 2.00.1: 支持 keepDuplicates=FIRST

  • ALL: 保留所有数据,为默认值。

  • LAST:仅保留最新数据

  • FIRST:仅保留第一条数据

New in version 2.00.6: 参数 sortKeyMappingFunction

在 TSDB 引擎单个分区 sort key 组合数过多,但每个 sort key 组合值对应的记录数较少的场景下,建议配置 sortKeyMappingFunction 参数以对 sort key 组合数进行降维。降维后单个 TSDB levelFile 内的数据块可以存储更多的数据,查询时既减少了读数据块的次数(降低了 I/O 开销),又提升了数据的压缩率。

sortKeyMappingFunction 由一元函数对象组成的向量,其长度与索引列一致,即 sortColumns 的长度 - 1,若只指定一个映射函数 mapfunc,必须写为 sortKeyMappingFunction=[mapfunc]。用于指定应用在索引列中各列的映射函数,以减少 sort key 的组合数,该过程称为 sort key 降维。

索引列中的各列被对应的映射函数降维后,原本的多个 sort key 组合值会被重新映射到一个新的 sort key 组合值上。而每个新 sort key 组合值对应的数据仍将根据 sortColumns 的最后一列(时间列)进行排序。降维在写入磁盘时进行,因此指定该参数一定程度上将影响写入性能。

使用 sortKeyMappingFunction 时需要注意以下几点:

  • sortKeyMappingFunction 指定的函数对象与索引列中的各列一一对应,若其中某列无需降维,则函数对象置为空。

  • sortKeyMappingFunction 中的函数对象为 hashBucket,且需要对采用 Hash 分区的分区字段进行降维时,应确保 Hash 分区的数量和 hashBucket 中的 buckets 之间不存在整除关系,否则会导致同一分区内的所有 Hash 值得到的 key 都相同。

详情

根据 table 的结构创建一个空的分区表。

  • 对于分布式数据库和本地磁盘数据库,table 参数只能是一个表。

  • 对于内存数据库,table 参数可以是一个表或包含多个表的元组。如果 table 是一个元组,每个 table 表示一个分区。

如果参数 table 是一个表,则根据该表的结构创建一个分区表。通过 append!tableInsert 给新创建的分区表插入数据。它不能用于创建顺序分区的分区表。

如果参数 table 是一系列表,则创建一个分区的内存表。参数 table 中表的数量与数据库中分区的数量相同。

注意:

  • 创建分区表时只会使用参数 table 的结构,并不会把 table 中的数据插入到新的分区表中。

  • OLAP 引擎允许集群每个节点创建的不同的分布式分区表句柄(包含临时句柄)上限为 8192;TSDB 引擎没有上限。临时句柄的说明请参考注释。

注释:调用函数 createPartitionedTable 创建分布式分区表时,若用户没有创建一个句柄变量来接收函数的返回值,则每个数据库会创建一个临时句柄。若在同一数据库下多次创表,则该数据库的临时句柄会被覆盖。

例子

例 1. 在分布式数据库中创建一个分区表

例 1.1 创建一张 OLAP 引擎下的分区表。

$ n=1000000;
$ t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
$ db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
$ Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});

createPartitionedTable 只是建立一张空的表格 Trades,该表复制了表 t 的字段。接着用 append! 函数将数据追加到 Trades 表里。

$ Trades.append!(t);

查询分区表:

$ Trades=loadTable(db,`Trades);
$ select min(value) from Trades;
0

在分布式数据库中,初次创建表后,可以跳过 loadTable 把表载入内存的步骤,因为分布式文件系统会动态刷新表的内容。系统重启后,需要再次执行 loadTable 函数加载表。

例 1.2 创建一张 TSDB 引擎下的分区表。

$ n = 10000
$ SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
$ sym = rand(`A`B, n)
$ TradeDate = 2022.01.01 + rand(100,n)
$ TotalVolumeTrade = rand(1000..3000, n)
$ TotalValueTrade = rand(100.0, n)
$ schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)

$ dbPath = "dfs://TSDB_STOCK"
$ if(existsDatabase(dbPath)){dropDatabase(dbPath)}
$ db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
$ snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
$ snap.append!(schemaTable_snap)
$ flushTSDBCache()
$ snap = loadTable(dbPath, `snap)
$ select * from snap

例 2. 在内存数据库中创建一个分区表

例 2.1 创建分区常规内存表

$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = table(n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)

$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)

例 2.2 创建分区键值内存表

$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = keyedTable(`time`sym, n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)

$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)

例 2.3 创建分区流数据表

注意,创建分区流数据表时 createPartitionedTable 的第二个参数必须是元组,并且其长度必须与分区数量相等,每个表对应一个分区。下例中,trades_stream1 和 trades_stream2 组成一个分区流数据表 trades。写入数据时,只能分别往 trades_stream1 和 trades_stream2 写入,不能直接写入到 trades。查询 trades 可以获取到两个表的数据。

$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_stream1 = streamTable(n:0, colNames, colTypes)
$ trades_stream2 = streamTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,table=[trades_stream1, trades_stream2], tableName="", partitionColumns=`sym)

$ insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)

$ select * from trades;

time

sym

qty

price

09:30:00.001

AAPL

100

56.5

09:30:01.001

DELL

100

15.5

例 2.4 创建分区 MVCC 内存表

创建分区 MVCC 内存表的方式与创建分区流数据表的方式相同。

$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
$ trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,table=[trades_mvcc1, trades_mvcc2], tableName="", partitionColumns=`sym)

$ insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)

$ select * from trades;

time

sym

qty

price

09:30:00.001

AAPL

100

56.5

09:30:01.001

DELL

100

15.5