JasonT

JasonT 查看完整档案

嘉兴编辑南华大学  |  核技术 编辑  |  填写所在公司/组织填写个人主网站
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

JasonT 提出了问题 · 4月6日

请问DolphinDB中如何把time类型(09:34:07.001)显示成second类型(09:34:07)?

我的建表语句如下:

securityID = `0001`0002`0002`0002`0003`0003`0001`0001`0001$SYMBOL
price = 49.6 29.46 29.52 30.02 174.97 175.23 50.76 50.32 51.29
qty = 200 100 120 200 680 740 230 250 380
timestamp = [09:34:07.001, 09:34:07.020, 09:34:07.751, 09:34:09.000, 09:34:10.000, 09:34:11.001, 09:34:12.001, 09:34:13.001, 09:34:14.001]
t = table(timestamp, securityID, price, qty);

image

现在timestamp列的数据是time类型,我想在SQL查询的时候显示成second类型,请问可以直接转换吗?

关注 1 回答 0

JasonT 赞了文章 · 4月1日

DolphinDB定时作业教程

DolphinDB提供的定时作业(scheduled job)功能,可以让系统在指定的时间以指定的频率自动执行作业。当我们需要数据库定时自动执行一些脚本进行计算分析(譬如每日休市后分钟级的K线计算、每月统计报表生成)、数据库管理(譬如数据库备份、数据同步)、操作系统管理(譬如过期的日志文件删除)等工作时,可以用这个功能来实现。

定时作业用一个函数来表示,这给了作业定义极大的灵活性。凡是能用函数来表示的工作,都可以作为定时任务来运行。定时作业通过scheduleJob函数提交,并按设定时间在后台运行。作业创建后,作业相关定义信息序列化保存到数据节点的磁盘文件。节点重启后,系统会反序列化并加载定时作业。定时作业每次运行的结果也会保存到节点磁盘上,我们可以使用getJobMessagegetJobReturn来查看每个作业的运行日志和返回值。

1.功能介绍

1.1 创建定时作业

创建定时作业使用函数scheduleJob。作业创建后,系统会序列化作业定义信息并保存到文件<homeDir>/sysmgmt/jobEditlog.meta。函数语法如下:

scheduleJob(jobId, jobDesc, jobFunc, scheduledTime, startDate, endDate, frequency, [days])

其中要注意的是:

  • 参数jobFunc(作业函数)是一个不带参数的函数。
  • 参数scheduledTime(预定时间)可以是minute类型的标量或向量。当它为向量时,注意相邻2个时间点的间隔不能小于30分钟。
  • 函数返回值是定时作业的作业ID。如果输入的jobId与已有定时作业的作业ID不重复,系统返回输入的jobId。否则在jobId后面添加当前日期,"000",“001”等作为后缀,直到产生唯一的作业ID。

众所周知,执行一个函数必须提供函数需要的所有参数。在函数化编程中,一个提供了所有参数的函数,实际上就是原函数的一个特殊的部分应用(Partial Application),也即一个不带参数的函数。在DolphinDB中,我们用花括号{}来表示部分应用。

自定义函数、内置函数、插件函数、函数视图(Function View)和模块中的函数等各类函数都可以作为作业函数。因此,定时作业几乎能做任何事情。比如用自定义函数、插件函数等做计算分析,用内置函数run运行一个脚本文件,用shell函数执行操作系统管理等等。下面例子中的作业调用了一个自定义函数getMaxTemperature,用于计算前一天某个设备温度指标的最大值,参数是设备编号,创建作业时,用getMaxTemperature{1}给设备编号赋值1,定时作业在每天0点执行。

def getMaxTemperature(deviceID){
    maxTemp=exec max(temperature) from loadTable("dfs://dolphindb","sensor")
            where ID=deviceID ,ts between (today()-1).datetime():(today().datetime()-1)
    return  maxTemp
}
scheduleJob(`testJob, "getMaxTemperature", getMaxTemperature{1}, 00:00m, today(), today()+30, 'D');

下面的例子执行了一个脚本文件。作业函数用了run函数,并指定脚本文件monthlyJob.dos的完整路径作为参数,作业在2020年的每月1号0点执行。

scheduleJob(`monthlyJob, "Monthly Job 1", run{"/home/DolphinDB/script/monthlyJob.dos"}, 00:00m, 2020.01.01, 2020.12.31, 'M', 1);

下面的例子执行了一个删除日志文件的操作系统命令。作业函数用了shell函数,并指定具体的命令“rm /home/DolphinDB/server/dolphindb.log”作为参数。作业在每周的周日1点执行。

scheduleJob(`weeklyjob, "rm log", shell{"rm /home/DolphinDB/server/dolphindb.log"}, 1:00m, 2020.01.01, 2021.12.31, 'W', 6);

在实际应用中,用函数参数、函数返回值进行输入输出有点不太方便,我们更常用的做法是从数据库中取出数据,计算后把结果再存到数据库中。下面的例子是在每日休市后,计算分钟级的K线。自定义函数computeK中,行情数据从分布式数据库表trades中取出,计算后存入分布式数据库表OHLC中。作业的frequency为"W"、days为[1,2,3,4,5],scheduledTime为15:00m,表示作业在每周一到周五的15点执行。

def computeK(){
    barMinutes = 7
    sessionsStart=09:30:00.000 13:00:00.000
    OHLC =  select first(price) as open, max(price) as high, min(price) as low,last(price) as close, sum(volume) as volume 
        from loadTable("dfs://stock","trades")
        where time >today() and time < now()
        group by symbol, dailyAlignedBar(timestamp, sessionsStart, barMinutes*60*1000) as barStart
    append!(loadTable("dfs://stock","OHLC"),OHLC)
}
scheduleJob(`kJob, "7 Minutes", computeK, 15:00m, 2020.01.01, 2021.12.31, 'W', [1,2,3,4,5]);

1.2 查询定时作业

查询节点中的定时作业定义信息可以用getScheduledJobs。函数语法如下:

getScheduledJobs([jobIdPattern])

其中参数jobIdPattern是表示作业ID或作业ID模式的字符串。它支持通配符“%”和“?”。函数的返回值是表格形式的定时作业信息。若jobId没有指定,则返回所有作业。

系统会对每次作业的执行情况进行保存,包括定时作业的运行日志和返回值。运行日志保存在jodId.msg 文件中,定时作业的返回值保存在jobId.object文件中。这些文件都保存在目录<homeDir>/batchJobs下。我们可以分别使用getJobMessagegetJobReturn来查看每个作业的运行日志和返回值。但要注意jobID的取值,一是创建作业时,如前所述,若jobId与已有定时作业的作业ID重复,系统返回的不是输入的jobId;二是对会多次执行的作业,每次执行定时作业时,作业ID是不一样的。因此我们需要用getRecentJobs来查看已完成的定时作业。比如我们定义如下定时作业:

def foo(){
    print "test scheduled job at"+ now()
    return now()
}
scheduleJob(`testJob, "foo", foo, 17:00m+0..2*30, today(), today(), 'D');

运行getRecentJobs()后得到如下信息:

jobId                jobDesc    startTime                endTime
------              ------- ----------------------- ----------------------
testJob                foo1    2020.02.14T17:00:23.636    2020.02.14T17:00:23.639
testJob20200214        foo1    2020.02.14T17:30:23.908    2020.02.14T17:30:23.910
testJob20200214000  foo1    2020.02.14T18:00:23.148    2020.02.14T18:00:26.749

从中我们看到,第一次执行的作业ID是“testJob”,第二次是“testJob20200214”...每次都有变化。如下所示,我们可用getJobMessagegetJobReturn查看了第3次的执行情况:

>getJobMessage(`testJob20200214000);
2020-02-14 18:00:23.148629 Start the job [testJob20200214000]: foo
2020-02-14 18:00:23.148721 test the scheduled job at 2020.02.14T18:00:23.148
2020-02-14 18:00:26.749111 The job is done.

>getJobReturn(`testJob20200214000);
2020.02.14T18:00:23.148

1.3 删除定时作业

删除定时作业用函数deleteScheduledJob。语法如下:

deleteScheduledJob(jobId)

参数jobId是作业ID。删除前可用getScheduledJobs得到想要删除作业的作业ID。

2.定时作业运行时的权限

用户创建定时作业时以什么身份登录,执行定时作业时就以这个身份运行。因此用户创建定时作业时,需要确保用户有权限访问用到的资源。比如登录用户不是授权用户,就不能访问集群的分布式功能,若用到了集群的分布式功能,执行时就会出错。以下例子中用户guestUser1没有访问DFS权限:

def foo1(){
    print "Test scheduled job "+ now()
    cnt=exec count(*) from loadTable("dfs://FuturesContract","tb")
    print "The count of table is "+cnt
    return cnt
}
login("guestUser1","123456")
scheduleJob(`guestGetDfsjob, "dfs read", foo1, [12:00m, 21:03m, 21:45m], 2020.01.01, 2021.12.31, "D");

作业执行后,用getJobMessage(`guestGetDfsjob)查询,如下所示,定时作业没有权限去读取分布式数据库:

2020-02-14 21:03:23.193039 Start the job [guestGetDfsjob]: dfs read
2020-02-14 21:03:23.193092 Test the scheduled job at 2020.02.14T21:03:23.193
2020-02-14 21:03:23.194914 Not granted to read table dfs://FuturesContract/tb

因此,若要远程执行控制节点的某些功能,访问集群中的某个分布式表,需要先以管理员(admin)或其他授权用户身份登录。具体可以通过login函数来完成。

从所示日志中也可以发现,访问分布式表后的语句没有执行,也就是说作业执行过程中若遇到错误,执行会中断。为了防止出现异常而停止执行后续的脚本,可使用try-catch语句俘获异常。运行过程中需要输出运行信息,可以用print打印,输出都会记录在jodId.msg日志文件中。

3.定时作业的序列化

定时作业在创建后,系统会把创建用户(userID)、作业的ID、描述信息、起始时间、作业频率、作业的定义等持久化保存。存储路径为<homeDir>/sysmgmt/jobEditlog.meta。作业用一个DolphinDB的函数来表示。函数的定义包括了一系列语句,这些语句又会调用其他函数和一些全局类对象,譬如共享变量(shared variable)。共享变量序列化时用名称来表示。反序列化时,共享变量必须存在,否则会失败。作业函数或其依赖的函数根据是否经过编译可以分两类:经过编译的函数包括内置函数和插件函数和脚本函数包括自定义函数、函数视图和模块中的函数等。这两类函数的序列化方法有所不同,下面分别进行说明。

3.1 经过编译的函数的序列化

对经过编译的函数的序列化,只序列化函数名称和模块名称。反序列化的时候,会在系统中搜索这些模块及函数,若搜索不到,就会失败。所以定时作业中若用到了插件函数,就需要在反序列化之前预先加载。系统与定时作业相关组件资源的初始化顺序依次是:系统级初始化脚本(dolphindb.dos),函数视图(function view)、用户级启动脚本(startup.dos)和定时作业。定时作业在启动脚本执行后加载。如下例所示,在作业函数jobDemo中用到了odbc插件:

use odbc
def jobDemo(){
    conn = odbc::connect("dsn=mysql_factorDBURL");
}
scheduleJob("job demo","example of init",jobDemo,15:48m, 2019.01.01, 2020.12.31, 'D')

但odbc插件在系统启动时没有加载,所以读取定时作业的时候,因无法识别这个函数,输出下列日志后退出系统。

<ERROR>:Failed to unmarshall the job [job demo]. Failed to deserialize assign statement.. Invalid message format

在启动脚本中加入下列代码加载odbc插件后,系统即启动成功。

loadPlugin("plugins/odbc/odbc.cfg")

3.2 脚本函数的序列化

脚本函数会序列化函数参数以及函数定义的每一个语句。语句中若又包含了依赖的脚本函数,也会序列化这些依赖函数的定义。

创建定时作业后,若这些脚本函数被删除或被修改了,或它依赖的脚本函数被修改,不影响定时作业运行。若希望定时作业按新的函数执行,就需要先删除定时作业、然后重新创建定时作业,否则会运行旧的序列化的函数。其中要注意关联的函数也需要重新定义。下面举例说明:

  • 例子1,作业函数在创建定时作业后被修改,如下所示,作业函数f在创建scheduleJob后被重新定义:
def f(){
    print "The old function is called " 
}
scheduleJob(`test, "f", f, 11:05m, today(), today(), 'D');
go
def f(){
    print "The new function is called " 
}

定时作业执行后,用getJobMessage(`test)得到如下信息,从中看到定时作业执行的还是旧的自定义函数。

2020-02-14 11:05:53.382225 Start the job [test]: f
2020-02-14 11:05:53.382267 The old function is called 
2020-02-14 11:05:53.382277 The job is done.
  • 例子2,作业函数在创建定时作业后依赖的函数被修改,如下所示,作业函数是函数视图fv,fv调用了函数foo,在scheduleJob后,函数foo重新被定义,函数视图也重新生成:
def foo(){
    print "The old function is called " 
}
def fv(){
    foo()
}
addFunctionView(fv)  

scheduleJob(`testFvJob, "fv", fv, 11:36m, today(), today(), 'D');
go
def foo(){
    print "The new function is called " 
}
dropFunctionView(`fv)
addFunctionView(fv) 

定时作业执行后,然后getJobMessage(`testFvJob)得到如下信息,从中看到定时作业执行的还是旧的函数。

2020-02-14 11:36:23.069892 Start the job [testFvJob]: fv
2020-02-14 11:36:23.069939 The old function is called 
2020-02-14 11:36:23.069951 The job is done.

用模块函数也是如此。我们创建一个模块printLog.dos,其内容如下:

module printLog
def printLogs(logText){
    writeLog(string(now()) + " : " + logText)
    print "The old function is called"
}

然后创建一个定时作业调用这个printLog::printLogs函数:

use printLog
def f5(){
    printLogs("test my log")
}
scheduleJob(`testModule, "f5", f5, 13:32m, today(), today(), 'D');

在运行定时作业之前修改模块如下:

module printLog
def printLogs(logText){
    writeLog(string(now()) + " : " + logText)
    print "The new function is called"
}

定时作业执行后,然后getJobMessage(`testModule)得到如下信息,从中看到定时作业执行的还是旧的函数。

2020-02-14 13:32:22.870855 Start the job [testModule]: f5
2020-02-14 13:32:22.871097 The old function is called
2020-02-14 13:32:22.871106 The job is done.

4.定时运行脚本文件

在创建定时作业时,若作业函数是run一个脚本文件,因为序列化时只保存了文件名,没有保存文件内容,所以需要把依赖的自定义函数都放到脚本文件中,否则会因为找不到自定义的函数而执行失败。比如创建一个脚本文件testjob.dos,文件内容如下:

foo()

然后在DolphinDB GUI中执行下列脚本:

def foo(){
    print ("Hello world!")
}
run "/home/xjqian/testjob.dos"

结果显示能正常执行:

2020.02.14 13:47:00.992: executing code (line 104-108)...
Hello world!

再创建定时作业run这个脚本文件,代码如下所示:

scheduleJob(`dailyfoofile1, "Daily Job 1", run {"/home/xjqian/testjob.dos"}, 16:14m, 2020.01.01, 2020.12.31, 'D');

但运行这个作业时却发生了如下异常:

Exception was raised when running the script [/home/xjqian/testjob.dos]:Syntax Error: [line #3] Cannot recognize the token foo

这是foo函数定义和定时作业执行不在同一个会话(session)中,作业执行时找不到函数定义的缘故。把foo()的定义放到脚本文件中,修改testjob.dos文件内容如下:

def foo(){
    print ("Hello world!")
}
foo()

再重新创建定时作业运行这个脚本文件,就能顺利完成。

5.小结和展望

常见故障及排除

  • 作业函数引用了共享变量,但是作业加载前没有定义该共享变量。一般建议在用户的启动脚本中定义该共享变量。
  • 作业函数引用了插件中的函数,但是作业加载前没有加载该插件。一般建议在用户的启动脚本中定义加载该插件。
  • 定时运行一个脚本文件,找不到依赖的函数。脚本文件必须包含依赖的自定义函数。
  • 创建定时作业的用户没有访问分布式数据库表的权限。授权该用户访问相应数据库的权限。
  • 在启动脚本中使用函数scheduleJobgetScheduledJobsdeleteScheduledJob时抛出异常。节点启动时,定时作业的初始化在启动脚本之后,因此不能在启动脚本中使用跟定时作业相关的功能

在某些罕见的情况下,可能出现在系统重启时,发生定时作业加载失败,甚至导致系统无法启动的情况。尤其是版本升级的时候,内置函数、插件函数等函数接口可能会有变化从而导致作业无法加载,或者出现一些兼容性bug导致系统重启失败。因此,我们开发时需要保留定义定时作业的脚本。若因定时任务导致系统无法启动,可以先删除定时作业的序列化文件<homeDir>/sysmgmt/jobEditlog.meta,在系统重启后再重新创建这些定时作业。

后续功能开发

  • 增加浏览作业函数以及依赖的函数的定义的功能。
  • 定义和实现定时作业之间的依赖关系。
查看原文

赞 2 收藏 0 评论 0

JasonT 赞了问题 · 4月1日

dolphindb分区不一致的问题

如何去理解chunkNode,
当其版本与master上版本不一致应该如何处理呢

关注 1 回答 0

JasonT 赞了文章 · 4月1日

时序数据库DolphinDB文本数据加载教程

DolphinDB提供以下4个函数,将文本数据导入内存或数据库:

loadText: 将文本文件导入为内存表。

ploadText: 将文本文件并行导入为分区内存表。与loadText函数相比,速度更快。

loadTextEx: 将文本文件导入数据库中,包括分布式数据库,本地磁盘数据库或内存数据库。

textChunkDS:将文本文件划分为多个小数据源,再通过mr函数进行灵活的数据处理。

DolphinDB的文本数据导入不仅灵活,功能丰富,而且速度非常快。DolphinDB与Clickhouse, MemSQL, Druid, Pandas等业界流行的系统相比,单线程导入的速度更快,最多可达一个数量级的优势;多线程并行导入的情况下,速度优势更加明显。

本教程介绍文本数据导入时的常见问题,相应的解决方案以及注意事项。

  1. 自动识别数据格式

大多数其它系统中,导入文本数据时,需要由用户指定数据的格式。为了方便用户,DolphinDB在导入数据时,能够自动识别数据格式。

自动识别数据格式包括两部分:字段名称识别和数据类型识别。如果文件的第一行没有任何一列以数字开头,那么系统认为第一行是文件头,包含了字段名称。DolphinDB会抽取少量部分数据作为样本,并自动推断各列的数据类型。因为是基于部分数据,某些列的数据类型的识别可能有误。但是对于大多数文本文件,无须手动指定各列的字段名称和数据类型,就能正确地导入到DolphinDB中。

请注意:DolphinDB支持自动识别大部分DolphinDB提供的数据类型,但是目前暂不支持识别UUID和IPADDR类型,在后续版本中会支持。

loadText函数用于将数据导入DolphinDB内存表。下例调用loadText函数导入数据,并查看生成的数据表的结构。例子中涉及到的数据文件请参考附录。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath);

查看数据表前5行数据:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

调用schema函数查看表结构(字段名称、数据类型等信息):

tmpTB.schema().colDefs;

name       typeString typeInt comment
---------- ---------- ------- -------
symbol     SYMBOL     17
exchange   SYMBOL     17
cycle      INT        4
tradingDay DATE       6
date       DATE       6
time       INT        4
open       DOUBLE     16
high       DOUBLE     16
low        DOUBLE     16
close      DOUBLE     16
volume     INT        4
turnover   DOUBLE     16
unixTime   LONG       5
  1. 指定数据导入格式

本教程讲述的4个数据加载函数中,均可用schema参数指定一个表,内含各字段的名称、类型、格式、需要导入的列等信息。该表可包含以下4列:

  • name:字符串,表示列名
  • type:字符串,表示每列的数据类型
  • format:字符串,表示日期或时间列的格式
  • col:整型,表示要加载的列的下标。该列的值必须是升序。

其中,name和type这两列是必需的,而且必须是前两列。format和col这两列是可选的,且没有先后关系的要求。

例如,我们可以使用下面的数据表作为schema参数:

name         type
----------   -------
timestamp    SECOND
ID           INT
qty          INT
price        DOUBLE

2.1 提取文本文件的schema

extractTextSchema函数用于获取文本文件的schema,包括字段名称和数据类型等信息。

例如,使用extractTextSchema函数得到本教程中示例文件的表结构:

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name       type
---------- ------
symbol     SYMBOL
exchange   SYMBOL
cycle      INT
tradingDay DATE
date       DATE
time       INT
open       DOUBLE
high       DOUBLE
low        DOUBLE
close      DOUBLE
volume     INT
turnover   DOUBLE
unixTime   LONG

通过extractTextSchema函数得到数据文件的表结构schemaTB以后,若表中自动解析的数据类型不符合预期,可以使用SQL语句对该表进行修改,从而得到满足要求的表结构。

2.2 指定字段名称和类型

当系统自动识别的字段名称或者数据类型不符合预期或需求时,可以通过设置schema参数为文本文件中的每列指定字段名称和数据类型。

例如,若导入数据的volume列被自动识别为INT类型,而需要的volume类型是LONG类型,就需要通过schema参数指定volumne列类型为LONG。下面的例子中,首先调用extractTextSchema函数得到文本文件的表结构,再根据需求修改表中列的数据类型。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="LONG" where name="volume";

使用loadText函数导入文本文件,将数据按照schemaTB所规定的字段数据类型导入到数据库中。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看表中前五行的数据,volume列数据以长整型的形式正常显示:

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

上例介绍了修改数据类型的情况,若要修改表中的字段名称,也可以通过同样的方法实现。

请注意,若DolphinDB对日期和时间相关数据类型的解析不符合预期,需要通过本教程第2.3小节的方式解决。

2.3 指定日期和时间类型的格式

对于日期列或时间列的数据,如果DolphinDB识别的数据类型不符合预期,不仅需要在schema的type列指定数据类型,还需要在format列中指定格式(用字符串表示),如"MM/dd/yyyy"。如何表示日期和时间格式请参考日期和时间的调整及格式

下面结合例子具体说明对日期和时间列指定数据类型的方法。

在DolphinDB中执行以下脚本,生成本例所需的数据文件。

dataFilePath="/home/data/timeData.csv"
t=table(["20190623 14:54:57","20190623 15:54:23","20190623 16:30:25"] as time,`AAPL`MS`IBM as sym,2200 5400 8670 as qty,54.78 59.64 65.23 as price)
saveText(t,dataFilePath);

加载数据前,使用extractTextSchema函数获取该数据文件的schema:

schemaTB=extractTextSchema(dataFilePath)
schemaTB;

name  type
----- ------
time  SECOND
sym   SYMBOL
qty   INT
price DOUBLE

显然,系统识别time列的数据类型不符合预期。如果直接加载该文件,time列的数据将为空。为了能够正确加载该文件time列的数据,需要指定time列的数据类型为DATETIME,并且指定该列的格式为"yyyyMMdd HH:mm:ss"。

update schemaTB set type="DATETIME" where name="time"
schemaTB[`format]=["yyyyMMdd HH:mm:ss",,,];

导入数据并查看,数据显示正确:

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

time                sym  qty  price
------------------- ---- ---- -----
2019.06.23T14:54:57 AAPL 2200 54.78
2019.06.23T15:54:23 MS   5400 59.64
2019.06.23T16:30:25 IBM  8670 65.23

2.4 导入指定列

在导入数据时,可以通过schema参数指定只导入文本文件中的某几列。

下例中,只需加载文本文件中symbol, date, open, high, close, volume, turnover这7列。

首先,调用extractTextSchema函数得到目标文本文件的表结构。

dataFilePath="/home/data/candle_201801.csv"
schemaTB=extractTextSchema(dataFilePath);

使用rowNo函数为各列生成列号,赋值给schema表中的col列,然后修改schema表,仅保留表示需要导入的字段的行。

update schemaTB set col = rowNo(name)
schemaTB=select * from schemaTB where name in `symbol`date`open`high`close`volume`turnover;
请注意:
1.列号从0开始。上例中第一列symbol列对应的列号是0。
2.导入数据时不能改变各列的先后顺序。如果需要调整列的顺序,可以将数据文件加载后,再使用reorderColumns!函数。

最后,使用loadText函数,并配置schema参数,导入文本文件中指定的列。

tmpTB=loadText(filename=dataFilePath,schema=schemaTB);

查看表中前5行,只导入了所需的列:

select top 5 * from tmpTB

symbol date       open   high  close volume turnover
------ ---------- ------ ----- ----- ------ ----------
000001 2018.01.02 9.31E7 13.35 13.35 13     2.003635E6
000001 2018.01.02 9.32E7 13.37 13.33 13     867181
000001 2018.01.02 9.33E7 13.32 13.32 13     903894
000001 2018.01.02 9.34E7 13.35 13.35 13     1.012E6
000001 2018.01.02 9.35E7 13.35 13.35 13     1.601939E6

2.5 跳过文本数据的前若干行

在数据导入时,若需跳过文件前n行(可能为文件说明),可指定skipRows参数为n。由于描述文件的说明通常不会非常冗长,因此这个参数的取值最大为1024。本教程讲述的4个数据加载函数均支持skipRows参数。

下例中,通过loadText函数导入数据文件,并且查看该文件导入以后表的总行数,以及前5行的内容。

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath)
select count(*) from tmpTB;

count
-----
5040

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

指定skipRows参数取值为1000,跳过文本文件的前1000行导入文件:

tmpTB=loadText(filename=dataFilePath,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

col0   col1 col2 col3       col4       col5      col6  col7  col8  col9  col10  col11      col12
------ ---- ---- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE 1    2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE 1    2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE 1    2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE 1    2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE 1    2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
请注意:如上例所示,在跳过前n行进行导入时,若数据文件的第一行是列名,改行会作为第一行被略过。

在上面的例子中,文本文件指定skipRows参数导入以后,由于表示列名的第一行被跳过,列名变成了默认列名:col1,col2等等。若需要保留列名而又指定跳过前n行,可先通过extractTextSchema函数得到文本文件的schema,在导入时指定schema参数:

schema=extractTextSchema(dataFilePath)
tmpTB=loadText(filename=dataFilePath,schema=schema,skipRows=1000)
select count(*) from tmpTB;

count
-----
4041

select top 5 * from tmpTB;

symbol exchange cycle tradingDay date       time      open  high  low   close volume turnover   unixTime
------ -------- ----- ---------- ---------- --------- ----- ----- ----- ----- ------ ---------- -------------
000001 SZSE     1     2018.01.08 2018.01.08 101000000 13.13 13.14 13.12 13.14 646912 8.48962E6  1515377400000
000001 SZSE     1     2018.01.08 2018.01.08 101100000 13.13 13.14 13.13 13.14 453647 5.958462E6 1515377460000
000001 SZSE     1     2018.01.08 2018.01.08 101200000 13.13 13.14 13.12 13.13 700853 9.200605E6 1515377520000
000001 SZSE     1     2018.01.08 2018.01.08 101300000 13.13 13.14 13.12 13.12 738920 9.697166E6 1515377580000
000001 SZSE     1     2018.01.08 2018.01.08 101400000 13.13 13.14 13.12 13.13 469800 6.168286E6 1515377640000
  1. 并行导入数据

3.1 单个文件多线程载入内存

ploadText函数可将一个文本文件以多线程的方式载入内存。该函数与loadText函数的语法是一致的,区别在于,ploadText函数可以快速载入大型文件,并且生成内存分区表。它充分利用了多核CPU来并行载入文件,并行程度取决于服务器本身CPU核数量和节点的localExecutors配置。

下面比较loadText函数与ploadText函数导入同一个文件的性能。

首先通过脚本生成一个4GB左右的文本文件:

filePath="/home/data/testFile.csv"
appendRows=100000000
t=table(rand(100,appendRows) as int,take(string('A'..'Z'),appendRows) as symbol,take(2010.01.01..2018.12.30,appendRows) as date,rand(float(100),appendRows) as float,00:00:00.000 + rand(86400000,appendRows) as time)
t.saveText(filePath);

分别通过loadTextploadText来载入文件。本例所用节点是6核12超线程的CPU。

timer loadText(filePath);
Time elapsed: 12629.492 ms

timer ploadText(filePath);
Time elapsed: 2669.702 ms

结果显示在此配置下,ploadText的性能是loadText的4.5倍左右。

3.2 多文件并行导入

在大数据应用领域,数据导入往往不只是一个或两个文件的导入,而是数十个甚至数百个大型文件的批量导入。为了达到更好的导入性能,建议尽量以并行方式导入批量的数据文件。

loadTextEx函数可将文本文件导入指定的数据库中,包括分布式数据库,本地磁盘数据库或内存数据库。由于DolphinDB的分区表支持并发读写,因此可以支持多线程导入数据。使用loadTextEx将文本数据导入到分布式数据库,具体实现为将数据先导入到内存,再由内存写入到数据库,这两个步骤由同一个函数完成,以保证高效率。

下例展示如何将磁盘上的多个文件批量写入到DolphinDB分区表中。首先,在DolphinDB中执行以下脚本,生成100个文件,共约778MB,包括1千万条记录。

n=100000
dataFilePath="/home/data/multi/multiImport_"+string(1..100)+".csv"
for (i in 0..99){
    trades=table(sort(take(100*i+1..100,n)) as id,rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5)
    trades.saveText(dataFilePath[i])
};

创建数据库和表:

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,1..10000)
tb=db.createPartitionedTable(trades,`tb,`id);

DolphinDB的cut函数可将一个向量中的元素分组。下面调用cut函数将待导入的文件路径进行分组,再调用submitJob函数,为每个线程分配写入任务,批量导入数据。

def writeData(db,file){
   loop(loadTextEx{db,`tb,`id,},file)
}
parallelLevel=10
for(x in dataFilePath.cut(100/parallelLevel)){
    submitJob("loadData"+parallelLevel,"loadData",writeData{db,x})
};
请注意:DolphinDB的分区表不允许多个线程同时向一个分区写数据。上例中,每个文件中的分区列(id列)取值不同,因此不会造成多个线程写入同一个分区的情况。在设计分区表的并发读写时,请确保不会有多个线程同时写入同一分区。

通过getRecentJobs函数可以取得当前本地节点上最近n个批处理作业的状态。使用select语句计算并行导入批量文件所需时间,得到在6核12超线程的CPU上耗时约1.59秒。

select max(endTime) - min(startTime) from getRecentJobs() where jobId like "loadData"+string(parallelLevel)+"%";

max_endTime_sub
---------------
1590

执行以下脚本,将100个文件单线程顺序导入数据库,记录所需时间,耗时约8.65秒。

timer writeData(db, dataFilePath);
Time elapsed: 8647.645 ms

结果显示在此配置下,并行开启10个线程导入速度是单线程导入的5.5倍左右。

查看数据表中的记录条数:

select count(*) from loadTable("dfs://DolphinDBdatabase", `tb);

count
------
10000000
  1. 导入数据库前的预处理

在将数据导入数据库之前,若需要对数据进行复杂的处理,例如日期和时间数据类型的强制转换,填充空值等,可以在调用loadTextEx函数时指定transform参数。tansform参数接受一个函数作为参数,并且要求该函数只能接受一个参数。函数的输入是一个未分区的内存表,输出也是一个未分区的内存表。需要注意的是,只有loadTextEx函数提供transform参数。

4.1 指定日期和时间数据的数据类型

4.1.1 将数值类型表示的日期和时间转化为指定类型

数据文件中表示时间的数据可能是整型或者长整型,而在进行数据分析时,往往又需要将这类数据强制转化为时间类型的格式导入并存储到数据库中。针对这种场景,可通过loadTextEx函数的transform参数为文本文件中的日期和时间列指定相应的数据类型。

首先,创建分布式数据库和表。

login(`admin,`123456)
dataFilePath="/home/data/candle_201801.csv"
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="TIME" where name="time"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date);

自定义函数foo,用于对数据进行预处理,并返回处理过后的数据表。

def foo(mutable t){
    return t.replaceColumn!(`time,time(t.time/10))
}
请注意:在自定义函数体内对数据进行处理时,请尽量使用本地的修改(带有!的函数)来提升性能。

调用loadTextEx函数,并且指定transform参数,系统会对文本文件中的数据执行transform参数指定的函数,即foo函数,再将得到的结果保存到数据库中。

tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=foo);

查看表内前5行数据。可见time列是以TIME类型存储,而不是文本文件中的INT类型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time               open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- ------------------ ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01.02 2018.01.02 02:35:10.000000000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:20.000000000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:30.000000000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:40.000000000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01.02 2018.01.02 02:35:50.000000000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.1.2 为文本文件中的日期和时间相关列指定数据类型

另一种与日期和时间列相关的处理是,文本文件中日期以DATE类型存储,在导入数据库时希望以MONTH的形式存储。这种情况也可通过loadTextEx函数的transform参数转换该日期列的数据类型,步骤与上述过程一致。

login(`admin,`123456)
dbPath="dfs://DolphinDBdatabase"
db=database(dbPath,VALUE,2018.01.02..2018.01.30)
schemaTB=extractTextSchema(dataFilePath)
update schemaTB set type="MONTH" where name="tradingDay"
tb=table(1:0,schemaTB.name,schemaTB.type)
tb=db.createPartitionedTable(tb,`tb1,`date)
def fee(mutable t){
    return t.replaceColumn!(`tradingDay,month(t.tradingDay))
}
tmpTB=loadTextEx(dbHandle=db,tableName=`tb1,partitionColumns=`date,filename=dataFilePath,transform=fee);

查看表内前5行数据。可见tradingDay列是以MONTH类型存储,而不是文本文件中的DATE类型:

select top 5* from loadTable(dbPath,`tb1);

symbol exchange cycle tradingDay date       time     open  high  low   close volume  turnover   unixTime
------ -------- ----- ---------- ---------- -------- ----- ----- ----- ----- ------- ---------- -------------
000001 SZSE     1     2018.01M   2018.01.02 93100000 13.35 13.39 13.35 13.38 2003635 2.678558E7 1514856660000
000001 SZSE     1     2018.01M   2018.01.02 93200000 13.37 13.38 13.33 13.33 867181  1.158757E7 1514856720000
000001 SZSE     1     2018.01M   2018.01.02 93300000 13.32 13.35 13.32 13.35 903894  1.204971E7 1514856780000
000001 SZSE     1     2018.01M   2018.01.02 93400000 13.35 13.38 13.35 13.35 1012000 1.352286E7 1514856840000
000001 SZSE     1     2018.01M   2018.01.02 93500000 13.35 13.37 13.35 13.37 1601939 2.140652E7 1514856900000

4.2 对表内数据填充空值

transform参数支持调用DolphinDB的内置函数,当内置函数要求多个参数时,我们可以使用部分应用将多参数函数转换为一个参数的函数。例如,调用nullFill!函数对文本文件中的空值进行填充。

db=database(dbPath,VALUE,2018.01.02..2018.01.30)
tb=db.createPartitionedTable(tb,`tb1,`date)
tmpTB=loadTextEx(dbHandle=db,tableName=`pt,partitionColumns=`date,filename=dataFilePath,transform=nullFill!{,0});
  1. 使用Map-Reduce自定义数据导入

DolphinDB支持使用Map-Reduce自定义数据导入,将数据按行进行划分,并将划分后的数据通过Map-Reduce导入到DolphinDB。

可使用textChunkDS函数将文件划分为多个小文件数据源,再通过mr函数写入到数据库中。在调用mr将数据存入数据库前,用户还可进行灵活的数据处理,从而实现更复杂的导入需求。

5.1 将文件中的股票和期货数据存储到两个不同的数据表

在DolphinDB中执行以下脚本,生成一个大小约为1GB的数据文件,其中包括股票数据和期货数据。

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`stock`futures,n) as type, rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,take(2000.01.01..2000.06.30,n) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4,rand(10000,n) as qty5,rand(10000,n) as qty6)
trades.saveText(dataFilePath);

分别创建用于存放股票数据和期货数据的分布式数据库和表:

login(`admin,`123456)
dbPath1="dfs://DolphinDBTickDatabase"
dbPath2="dfs://DolphinDBFuturesDatabase"
db1=database(dbPath1,VALUE,`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S)
db2=database(dbPath2,VALUE,2000.01.01..2000.06.30)
tb1=db1.createPartitionedTable(trades,`stock,`sym)
tb2=db2.createPartitionedTable(trades,`futures,`date);

定义函数,用于划分数据,并将数据写入到不同的数据库。

def divideImport(tb, mutable stockTB, mutable futuresTB)
{
    tdata1=select * from tb where type="stock"
    tdata2=select * from tb where type="futures"
    append!(stockTB, tdata1)
    append!(futuresTB, tdata2)
}

再通过textChunkDS函数划分文本文件,以300MB为单位进行划分,文件被划分成了4部分。

ds=textChunkDS(dataFilePath,300)
ds;

(DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment, DataSource<readTableFromFileSegment)

调用mr函数,指定数据源将文件导入到数据库中。由于map函数(由mapFunc参数指定)只接受一个表作为参数,这里我们使用部分应用将多参数函数转换为一个参数的函数。

mr(ds=ds, mapFunc=divideImport{,tb1,tb2}, parallel=false);
请注意,这里每个小文件数据源可能包含相同分区的数据。DolphinDB不允许多个线程同时对相同分区进行写入,因此要将mr函数的parallel参数设置为false,否则会抛出异常。

查看2个数据库中表的前5行,股票数据库中均为股票数据,期货数据库中均为期货数据。

stock表:

select top 5 * from loadTable("dfs://DolphinDBTickDatabase", `stock);

type  sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
----- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
stock AMZN 2000.02.14 11.224234 112.26763  1160.926836 11661.418403 11902.403305 11636.093467 4    53   450  2072 9116 12
stock AMZN 2000.03.29 10.119057 111.132165 1031.171855 10655.048121 12682.656303 11182.317321 6    21   651  2078 7971 6207
stock AMZN 2000.06.16 11.61637  101.943971 1019.122963 10768.996906 11091.395164 11239.242307 0    91   857  3129 3829 811
stock AMZN 2000.02.20 11.69517  114.607763 1005.724332 10548.273754 12548.185724 12750.524002 1    39   270  4216 8607 6578
stock AMZN 2000.02.23 11.534805 106.040664 1085.913295 11461.783565 12496.932604 12995.461331 4    35   488  4042 6500 4826

futures表:

select top 5 * from loadTable("dfs://DolphinDBFuturesDatabase", `futures);

type    sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 ...
------- ---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ---
futures MSFT 2000.01.01 11.894442 106.494131 1000.600933 10927.639217 10648.298313 11680.875797 9    10   241  524  8325 ...
futures S    2000.01.01 10.13728  115.907379 1140.10161  11222.057315 10909.352983 13535.931446 3    69   461  4560 2583 ...
futures GM   2000.01.01 10.339581 112.602729 1097.198543 10938.208083 10761.688725 11121.888288 1    1    714  6701 9203 ...
futures IBM  2000.01.01 10.45422  112.229537 1087.366764 10356.28124  11829.206165 11724.680443 0    47   741  7794 5529 ...
futures TSLA 2000.01.01 11.901426 106.127109 1144.022732 10465.529256 12831.721586 10621.111858 4    43   136  9858 8487 ...
n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

5.2 快速加载大文件首尾部分数据

可使用textChunkDS将大文件划分成多个小的数据源(chunk),然后加载首尾两个数据源。在DolphinDB中执行以下脚本生成数据文件:

n=10000000
dataFilePath="/home/data/chunkText.csv"
trades=table(rand(`IBM`MSFT`GM`C`FB`GOOG`V`F`XOM`AMZN`TSLA`PG`S,n) as sym,sort(take(2000.01.01..2000.06.30,n)) as date,10.0+rand(2.0,n) as price1,100.0+rand(20.0,n) as price2,1000.0+rand(200.0,n) as price3,10000.0+rand(2000.0,n) as price4,10000.0+rand(3000.0,n) as price5,10000.0+rand(4000.0,n) as price6,rand(10,n) as qty1,rand(100,n) as qty2,rand(1000,n) as qty3,rand(10000,n) as qty4, rand(10000,n) as qty5, rand(1000,n) as qty6)
trades.saveText(dataFilePath);

再通过textChunkDS函数划分文本文件,以10MB为单位进行划分。

ds=textChunkDS(dataFilePath, 10);

调用mr函数,加载首尾两个chunk的数据。因为这两个chunk的数据非常小,加载速度非常快。

head_tail_tb = mr(ds=[ds.head(), ds.tail()], mapFunc=x->x, finalFunc=unionAll{,false});

查看head_tail_tb表中的记录数以及前5条记录。因为数据是随机生成,记录数可能每次会略有不同,前5行的数据也会跟下面显示的不同。

select count(*) from head_tail_tb;

count
------
192262

查看表的前5行数据:

select top 5 * from head_tail_tb;

sym  date       price1    price2     price3      price4       price5       price6       qty1 qty2 qty3 qty4 qty5 qty6
---- ---------- --------- ---------- ----------- ------------ ------------ ------------ ---- ---- ---- ---- ---- ----
IBM  2000.01.01 10.978551 114.535418 1163.425635 11827.976468 11028.01038  10810.987825 2    51   396  6636 9403 937
MSFT 2000.01.01 11.776656 106.472172 1138.718459 10720.778545 10164.638399 11348.744314 9    79   691  533  5669 72
FB   2000.01.01 11.515097 118.674854 1153.305462 10478.6335   12160.662041 13874.09572  3    29   592  2097 4103 113
MSFT 2000.01.01 11.72034  105.760547 1139.238066 10669.293733 11314.226676 12560.093619 1    99   166  2282 9167 483
TSLA 2000.01.01 10.272615 114.748639 1043.019437 11508.695323 11825.865846 10495.364306 6    43   95   9433 6641 490
  1. 其它注意事项

6.1 不同编码的数据的处理

由于DolphinDB的字符串采用UTF-8编码,加载的文件必须是UTF-8编码。若为其它形式的编码,可以在导入以后进行转化。DolphinDB提供了convertEncodefromUTF8toUTF8函数,用于导入数据后对字符串编码进行转换。

例如,使用convertEncode函数转换表tmpTB中的exchange列的编码:

dataFilePath="/home/data/candle_201801.csv"
tmpTB=loadText(filename=dataFilePath, skipRows=0)
tmpTB.replaceColumn!(`exchange, convertEncode(tmpTB.exchange,"gbk","utf-8"));

6.2 数值类型的解析

本教程第1节介绍了DolphinDB在导入数据时的数据类型自动解析机制,本节讲解数值类型数据的解析。在数据导入时,若指定数据类型为数值类型(包括CHAR,SHORT,INT,LONG,FLOAT和DOUBLE),则系统能够识别以下几种形式的数据:

  • 数字表示的数值,例如:123
  • 以逗号分隔的数字表示的数值,例如:100,000
  • 带有小数点的数字表示的数值,即浮点数,例如:1.231
  • 科学计数法表示的数值,例如:1.23E5

DolphinDB在导入时会会自动忽略数字前后的字母及其他符号,如果没有出现任何数字,则解析为NULL值。下面结合例子具体说明。

首先,执行以下脚本,创建一个文本文件。

dataFilePath="/home/data/testSym.csv"
prices1=["2131","$2,131", "N/A"]
prices2=["213.1","$213.1", "N/A"]
totals=["2.658E7","-2.658e7","2.658e-7"]
tt=table(1..3 as id, prices1 as price1, prices2 as price2, totals as total)
saveText(tt,dataFilePath);

创建的文本文件中,price1和price2列中既有数字,又有字符。若不指定schema参数导入数据,DolphinDB会将price1和price2列均识别为SYMBOL类型:

tmpTB=loadText(dataFilePath)
tmpTB;

id price1 price2 total
-- ------ ------ --------
1  2131   213.1  2.658E7
2  $2,131 $213.1 -2.658E7
3  N/A    N/A    2.658E-7

tmpTB.schema().colDefs;

name   typeString typeInt comment
------ ---------- ------- -------
id     INT        4
price1 SYMBOL     17
price2 SYMBOL     17
total  DOUBLE     16

若分别指定price1和price2列为INT和FLOAT类型,DolphinDB在导入时会会自动忽略数字前后的字母及其他符号。如果没有出现任何数字,则解析为NULL值。

schemaTB=table(`id`price1`price2`total as name, `INT`INT`FLOAT`DOUBLE as type)
tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id price1 price2     total
-- ------ ---------- --------
1  2131   213.100006 2.658E7
2  2131   213.100006 -2.658E7
3                    2.658E-7

6.3 自动脱去文本外的双引号

在CSV文件中,有时候会用双引号来处理文本和数值中含有的特殊字符(譬如分隔符)的字段。DolphinDB处理这样的数据时,会自动脱去文本外的双引号。下面结合例子具体说明。

首先生成示例数据。生成的文件中,num列数据为使用三位分节法表示的数值。

dataFilePath="/home/data/testSym.csv"
tt=table(1..3 as id,  [""500"",""3,500"",""9,000,000""] as num)
saveText(tt,dataFilePath);

导入数据并查看表内数据,DolphinDB自动脱去了文本外的双引号。

tmpTB=loadText(dataFilePath,,schemaTB)
tmpTB;

id num
-- -------
1  500
2  3500
3  9000000

附录

本教程的例子中使用的数据文件: candle_201801.csv

查看原文

赞 2 收藏 0 评论 0

JasonT 提出了问题 · 3月29日

解决想取一天内time最早和最晚时间的volume差值,group by SecurityID

建表语句:

time = take(10:10:00.300+1..30*300, 200)
tradeMoney = double(take(1..100, 200))
SecurityId = take(`A, 100) join take(`B, 100)
volume = take(1..300, 200)
t = table(time,tradeMoney, SecurityId,volume)

表结构如下:

image.png

想取一天内time最早和最晚时间的volume差值,group by SecurityID。请问在dolphindb中怎么写好呢?

关注 2 回答 1

JasonT 赞了问题 · 3月22日

金融物联网领域dolphinDb火热,其中相比较传统Hadoop和spark等分布式数据库的优势在哪里了

金融物联网领域dolphinDb火热,其中相比较传统Hadoop和spark等分布式数据库的优势在哪里了

关注 1 回答 0

JasonT 赞了回答 · 3月19日

解决请问在DolphinDB中如何判断一个sharetable是否存在?

可以执行下面的代码判断一个共享表table1是否存在:

objs(true).name.find(`table1)

如果存在返回1,如果不存在返回-1.

关注 2 回答 1

JasonT 提出了问题 · 3月19日

解决请问在DolphinDB中如何判断一个sharetable是否存在?

我想创建一个名为table1的共享表,但是我不确定这个表是否存在,请问在DolphinDB里面有判断的函数吗?

关注 2 回答 1

JasonT 赞了回答 · 3月18日

解决为什么在一个会话里,定义过的自定义函数会报错Unrecognized的错误

有一种情况是在这个长连接没有关闭的这几个小时里出现网络波动,造成连接实际上已经断开。

解决方法1:重新执行以下函数的定义声明。

解决方法2:重新创建一个新的连接,重新执行一遍所有脚本。

关注 2 回答 1

JasonT 提出了问题 · 3月18日

解决为什么在一个会话里,定义过的自定义函数会报错Unrecognized的错误

对一个数据库使用以下的查询语句:

login("admin", "123456")

def my_func(x){
    return mean(x) + std(x);
}

select count(qty) from loadTable("dfs://test", "test") where price < contextby(my_func, price, sym) group by sym

在同一个会话里,会话没有关闭过,差不多过了几个小时,我再次执行:

select count(qty) from loadTable("dfs://test", "test") where price < contextby(my_func, price, sym) group by sym

出现下面的错误:

Unrecognized column name my_func

我明明已经在这个会话里面申明过这个函数了,请问怎么再次执行就无法识别这个函数了?

关注 2 回答 1

认证与成就

  • 获得 192 次点赞
  • 获得 9 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 9 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2020-10-18
个人主页被 1.7k 人浏览