createPartitionedTable
语法
createPartitionedTable(dbHandle, table, tableName, [partitionColumns], [compressMethods], [sortColumns], [keepDuplicates=ALL], [sortKeyMappingFunction])
参数
参数 sortColumns, keepDuplicates 及 sortKeyMappingFunction 仅在 database 的 engine 参数指定为 TSDB 时才有效。
dbHandle database 函数返回的数据库句柄。它可以是本地磁盘数据库,也可以是分布式数据库。dbHandle 为空字符串或没有指定时,表示内存数据库的句柄。
table 一个表或包含多个表的元组。系统将会根据该表的结构创建新的分区表。
tableName 一个字符串,表示的分区表的名称。
partitionColumns 一个字符串或字符串向量,表示分区列。对于组合分区,partitionColumns 是一个字符串向量。对非顺序分区,此参数为必选参数。
compressMethods 一个字典,指定某些列使用 lz4 或者 delta 压缩算法存储。key 为字段名,value 为压缩算法(”lz4”或”delta”)。若未指定,默认采用 lz4 压缩算法。
New in version 2.00.9: 支持 DECIMAL 类型数据进行 delta 压缩
对于SHORT, INT, LONG 与时间或日期类型数据,建议采用 delta 算法压缩。
将字符串存储为 SYMBOL 类型数据,实现对字符串类型的压缩。
sortColumns 字符串标量或向量,用于指定表的排序列,写入的数据将按 sortColumns 进行排序。系统默认 sortColumns (指定多列时) 最后一列为时间列,其余列字段作为排序的索引列,称作 sort key。同一个 sort key 组合值(sort key entry)对应的数据将按时间列顺序连续存放在一起。查询时,若查询条件包含索引列,可以快速定位数据所在的数据块位置,提高查询性能。
sortColumns 只能是 integer, temporal, string, 或 symbol 类型。
若 sortColumns 指定为多列,则 sortColumns 的最后一列必须为时间列,其余列为索引列,且索引列不能为为 TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP 类型。
若 sortColumns 仅指定一列,则该列作为 sort key,其类型不能为TIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP。若 sortColumns 指定为一列时间列 (非分区列),且同时指定了 sortKeyMappingFunction,则查询的过滤条件中 sortColumns 只能与相同时间类型的值进行比较。
频繁查询的字段适合设置为 sortColumns,且建议优先把查询频率高的字段作为 sortColumns 中位置靠前的列。
为保证性能最优,建议每个分区内索引列的组合数(sort key)不超过1000个。
sortColumns 是每个分区内部 level file 内数据的排序依据,与其是否为分区字段无关。
keepDuplicates 指定在每个分区内如何处理所有 sortColumns 之值皆相同的数据。提供以下选项:
New in version 2.00.1: 支持 keepDuplicates=FIRST
ALL: 保留所有数据,为默认值。
LAST:仅保留最新数据
FIRST:仅保留第一条数据
New in version 2.00.6: 参数 sortKeyMappingFunction
在 TSDB 引擎单个分区 sort key 组合数过多,但每个 sort key 组合值对应的记录数较少的场景下,建议配置 sortKeyMappingFunction 参数以对 sort key 组合数进行降维。降维后单个 TSDB levelFile 内的数据块可以存储更多的数据,查询时既减少了读数据块的次数(降低了 I/O 开销),又提升了数据的压缩率。
sortKeyMappingFunction 由一元函数对象组成的向量,其长度与索引列一致,即 sortColumns 的长度 - 1,若只指定一个映射函数 mapfunc,必须写为 sortKeyMappingFunction=[mapfunc]。用于指定应用在索引列中各列的映射函数,以减少 sort key 的组合数,该过程称为 sort key 降维。
索引列中的各列被对应的映射函数降维后,原本的多个 sort key 组合值会被重新映射到一个新的 sort key 组合值上。而每个新 sort key 组合值对应的数据仍将根据 sortColumns 的最后一列(时间列)进行排序。降维在写入磁盘时进行,因此指定该参数一定程度上将影响写入性能。
使用 sortKeyMappingFunction 时需要注意以下几点:
sortKeyMappingFunction 指定的函数对象与索引列中的各列一一对应,若其中某列无需降维,则函数对象置为空。
当 sortKeyMappingFunction 中的函数对象为 hashBucket,且需要对采用 Hash 分区的分区字段进行降维时,应确保 Hash 分区的数量和 hashBucket 中的 buckets 之间不存在整除关系,否则会导致同一分区内的所有 Hash 值得到的 key 都相同。
详情
根据 table 的结构创建一个空的分区表。
对于分布式数据库和本地磁盘数据库,table 参数只能是一个表。
对于内存数据库,table 参数可以是一个表或包含多个表的元组。如果 table 是一个元组,每个 table 表示一个分区。
如果参数 table 是一个表,则根据该表的结构创建一个分区表。通过 append! 或 tableInsert 给新创建的分区表插入数据。它不能用于创建顺序分区的分区表。
如果参数 table 是一系列表,则创建一个分区的内存表。参数 table 中表的数量与数据库中分区的数量相同。
注意:
创建分区表时只会使用参数 table 的结构,并不会把 table 中的数据插入到新的分区表中。
OLAP 引擎允许集群每个节点创建的不同的分布式分区表句柄(包含临时句柄)上限为 8192;TSDB 引擎没有上限。临时句柄的说明请参考注释。
注释:调用函数 createPartitionedTable 创建分布式分区表时,若用户没有创建一个句柄变量来接收函数的返回值,则每个数据库会创建一个临时句柄。若在同一数据库下多次创表,则该数据库的临时句柄会被覆盖。
例子
例 1. 在分布式数据库中创建一个分区表
例 1.1 创建一张 OLAP 引擎下的分区表。
$ n=1000000;
$ t=table(2020.01.01T00:00:00 + 0..(n-1) as timestamp, rand(`IBM`MS`APPL`AMZN,n) as symbol, rand(10.0, n) as value)
$ db = database("dfs://rangedb_tradedata", RANGE, `A`F`M`S`ZZZZ)
$ Trades = db.createPartitionedTable(table=t, tableName="Trades", partitionColumns="symbol", compressMethods={timestamp:"delta"});
createPartitionedTable 只是建立一张空的表格 Trades,该表复制了表 t 的字段。接着用 append! 函数将数据追加到 Trades 表里。
$ Trades.append!(t);
查询分区表:
$ Trades=loadTable(db,`Trades);
$ select min(value) from Trades;
0
在分布式数据库中,初次创建表后,可以跳过 loadTable 把表载入内存的步骤,因为分布式文件系统会动态刷新表的内容。系统重启后,需要再次执行 loadTable
函数加载表。
例 1.2 创建一张 TSDB 引擎下的分区表。
$ n = 10000
$ SecurityID = rand(`st0001`st0002`st0003`st0004`st0005, n)
$ sym = rand(`A`B, n)
$ TradeDate = 2022.01.01 + rand(100,n)
$ TotalVolumeTrade = rand(1000..3000, n)
$ TotalValueTrade = rand(100.0, n)
$ schemaTable_snap = table(SecurityID, TradeDate, TotalVolumeTrade, TotalValueTrade).sortBy!(`SecurityID`TradeDate)
$ dbPath = "dfs://TSDB_STOCK"
$ if(existsDatabase(dbPath)){dropDatabase(dbPath)}
$ db_snap = database(dbPath, VALUE, 2022.01.01..2022.01.05, engine='TSDB')
$ snap=createPartitionedTable(dbHandle=db_snap, table=schemaTable_snap, tableName="snap", partitionColumns=`TradeDate, sortColumns=`SecurityID`TradeDate, keepDuplicates=ALL, sortKeyMappingFunction=[hashBucket{,5}])
$ snap.append!(schemaTable_snap)
$ flushTSDBCache()
$ snap = loadTable(dbPath, `snap)
$ select * from snap
例 2. 在内存数据库中创建一个分区表
例 2.1 创建分区常规内存表
$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = table(n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)
$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)
例 2.2 创建分区键值内存表
$ n = 200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ t = keyedTable(`time`sym, n:0, colNames, colTypes)
$ db = database(, RANGE, `A`D`F)
$ pt = db.createPartitionedTable(table=t, tableName=`pt, partitionColumns=`sym)
$ insert into pt values(09:30:00.001,`AAPL,100,56.5)
$ insert into pt values(09:30:01.001,`DELL,100,15.5)
例 2.3 创建分区流数据表
注意,创建分区流数据表时 createPartitionedTable
的第二个参数必须是元组,并且其长度必须与分区数量相等,每个表对应一个分区。下例中,trades_stream1 和 trades_stream2 组成一个分区流数据表 trades。写入数据时,只能分别往 trades_stream1 和 trades_stream2 写入,不能直接写入到 trades。查询 trades 可以获取到两个表的数据。
$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_stream1 = streamTable(n:0, colNames, colTypes)
$ trades_stream2 = streamTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,table=[trades_stream1, trades_stream2], tableName="", partitionColumns=`sym)
$ insert into trades_stream1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_stream2 values(09:30:01.001,`DELL,100,15.5)
$ select * from trades;
time |
sym |
qty |
price |
---|---|---|---|
09:30:00.001 |
AAPL |
100 |
56.5 |
09:30:01.001 |
DELL |
100 |
15.5 |
例 2.4 创建分区 MVCC 内存表
创建分区 MVCC 内存表的方式与创建分区流数据表的方式相同。
$ n=200000
$ colNames = `time`sym`qty`price
$ colTypes = [TIME,SYMBOL,INT,DOUBLE]
$ trades_mvcc1 = mvccTable(n:0, colNames, colTypes)
$ trades_mvcc2 = mvccTable(n:0, colNames, colTypes)
$ db=database(, RANGE, `A`D`F)
$ trades = createPartitionedTable(db,table=[trades_mvcc1, trades_mvcc2], tableName="", partitionColumns=`sym)
$ insert into trades_mvcc1 values(09:30:00.001,`AAPL,100,56.5)
$ insert into trades_mvcc2 values(09:30:01.001,`DELL,100,15.5)
$ select * from trades;
time |
sym |
qty |
price |
---|---|---|---|
09:30:00.001 |
AAPL |
100 |
56.5 |
09:30:01.001 |
DELL |
100 |
15.5 |