你好,我们有一个程序会实时的同步RDBMS的数据到HBase,为了保证数据操作的先后顺序,我们使用了该条数据的日志解析时间作为HBase的版本时间戳。并且在写入之前对日志的偏移量做了排序。也就是使用put提交的时候已经保证了数据的顺序性。此为背景
最近在业务人员发现有一条数据的状态不对,本来应该是存在的,但是显示被删除了。于是我查看了该条数据的版本信息。发现只有一条初始化的版本和删除的版本,但是日志中记录了有两条操作,也就是数据先Delete然后再Insert的。这两条数据的操作时间分别是是2017-06-20 11:02:59.241000,2017-06-20 11:02:59.241009,我们的数据存储的版本时间戳就是使用的CURRENT_TS的值,但是只能精确到毫秒(超过毫秒的精度虽然也能存储,但是Phoenix就用不了了,况且本条数据的OP_TS是相同的),也就是对应HBase的时间戳都是1497927779241。正好对应了该条记录的最新版本号。
这里还需要说明一下,我们只有再正确写入没有错误的情况下才会将日志记录下来,也就是其实两条数据都写入了,只是落地顺序和我的数据录入顺序不一致。数据写入是使用的HBase 的JAVA API的列表put方法。
在《HBase权威指南》这本书的 第三章 客户端API操作的3.2.1中,有提到过:用户无法控制服务端执行put的顺序,这意味着服务器被调用的顺序也不受用户控制。
但是我在自己的测试环境中测试的结果并非如此,写入的数据最终仍旧是以put的顺序为准。以下是我的测试Scala代码
object HBaseApiTest {
def main(args: Array[String]) {
val conf = new Configuration()
conf.addResource("hbase-site.xml")
val connect = HConnectionManager.createConnection(conf)
try{
val tableName = "HBASE_PUT_LIST_TEST"
val table = connect.getTable(TableName.valueOf(tableName))
table.setWriteBufferSize(2 * 1024 * 1024)
table.setAutoFlush(false, false)
val columns = (1 to 10000).flatMap(i=> Array((i,1,"D"),(i,1,"I"),(i,2,"D"),(i,2,"I"),(i,3,"D"),(i,3,"I")))
val puts = new util.LinkedList[Put]()
columns.foreach(d =>{
val put = new Put(Bytes.toBytes(d._1.toString))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("SQLTYPE"), d._2, Bytes.toBytes(d._3.toString))
puts.add(put)
} )
table.put(puts)
table.flushCommits()
}finally {
connect.close()
}
}
}
请问我应该如何做到顺序插入且兼顾效率呢