赵强老师

赵强老师 查看完整档案

北京编辑清华大学  |  软件工程 编辑京东  |  京东大学大数据学院院长 编辑 itshare.duanshu.com 编辑
编辑

17年以上的IT行业从业经历,清华大学计算机软件工程专业毕业,京东大学大数据学院院长,Oracle中国有限公司高级技术顾问;曾在BEA、甲骨文、摩托罗拉等世界500强公司担任高级软件架构师或咨询顾问等要职,精通大数据、数据库、中间件技术和Java技术。

个人动态

赵强老师 发布了文章 · 10月9日

【赵强老师】Flink的DataSet算子

Flink为了能够处理有边界的数据集和无边界的数据集,提供了对应的DataSet API和DataStream API。我们可以开发对应的Java程序或者Scala程序来完成相应的功能。下面举例了一些DataSet API中的基本的算子。

下面我们通过具体的代码来为大家演示每个算子的作用。

1、Map、FlatMap与MapPartition

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);

DataSet<List<String>> mapData = text.map(new MapFunction<String, List<String>>() {

    public List<String> map(String data) throws Exception {
        String[] words = data.split(" ");
        
        //创建一个List
        List<String> result = new ArrayList<String>();
        for(String w:words){
            result.add(w);
        }
        return result;
    }
});
mapData.print();
System.out.println("*****************************************");

DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {

    public void flatMap(String data, Collector<String> collection) throws Exception {
        String[] words = data.split(" ");
        for(String w:words){
            collection.collect(w);
        }
    }
});
flatMapData.print();

System.out.println("*****************************************");
/*    new MapPartitionFunction<String, String>
    第一个String:表示分区中的数据元素类型
    第二个String:表示处理后的数据元素类型*/
DataSet<String> mapPartitionData = text.mapPartition(new MapPartitionFunction<String, String>() {

    public void mapPartition(Iterable<String> values, Collector<String> out) throws Exception {
        //针对分区进行操作的好处是:比如要进行数据库的操作,一个分区只需要创建一个Connection
        //values中保存了一个分区的数据
         Iterator<String> it = values.iterator();
        while (it.hasNext()) {
            String next = it.next();
            String[] split = next.split(" ");
            for (String word : split) {
                out.collect(word);
            }
        }
        //关闭链接
    }
});
mapPartitionData.print();

2、Filter与Distinct

//获取运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

ArrayList<String> data = new ArrayList<String>();
data.add("I love Beijing");
data.add("I love China");
data.add("Beijing is the capital of China");
DataSource<String> text = env.fromCollection(data);

DataSet<String> flatMapData = text.flatMap(new FlatMapFunction<String, String>() {

    public void flatMap(String data, Collector<String> collection) throws Exception {
        String[] words = data.split(" ");
        for(String w:words){
            collection.collect(w);
        }
    }
});

//去掉重复的单词
flatMapData.distinct().print();
System.out.println("*********************");

//选出长度大于3的单词
flatMapData.filter(new FilterFunction<String>() {
    
    public boolean filter(String word) throws Exception {
        int length = word.length();
        return length>3?true:false;
    }
}).print();

3、Join操作

//获取运行的环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//创建第一张表:用户ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));
//创建第二张表:用户ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"广州"));
data2.add(new Tuple2(4,"重庆"));

//实现join的多表查询:用户ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

table1.join(table2).where(0).equalTo(0)
/*第一个Tuple2<Integer,String>:表示第一张表
 * 第二个Tuple2<Integer,String>:表示第二张表
 * Tuple3<Integer,String, String>:多表join连接查询后的返回结果   */                           
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String, String>>() {
    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
            Tuple2<Integer, String> table2) throws Exception {
        return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
    } }).print();

4、笛卡尔积

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//创建第一张表:用户ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(2,"Mike"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));

//创建第二张表:用户ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(3,"广州"));
data2.add(new Tuple2(4,"重庆"));

//实现join的多表查询:用户ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

//生成笛卡尔积
table1.cross(table2).print();

5、First-N

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//这里的数据是:员工姓名、薪水、部门号
DataSet<Tuple3<String, Integer,Integer>> grade = 
        env.fromElements(new Tuple3<String, Integer,Integer>("Tom",1000,10),
                         new Tuple3<String, Integer,Integer>("Mary",1500,20),
                         new Tuple3<String, Integer,Integer>("Mike",1200,30),
                         new Tuple3<String, Integer,Integer>("Jerry",2000,10));

//按照插入顺序取前三条记录
grade.first(3).print();
System.out.println("**********************");

//先按照部门号排序,在按照薪水排序
grade.sortPartition(2, Order.ASCENDING).sortPartition(1, Order.ASCENDING).print();
System.out.println("**********************");

//按照部门号分组,求每组的第一条记录
grade.groupBy(2).first(1).print();

6、外链接操作

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

//创建第一张表:用户ID  姓名
ArrayList<Tuple2<Integer, String>> data1 = new ArrayList<Tuple2<Integer,String>>();
data1.add(new Tuple2(1,"Tom"));
data1.add(new Tuple2(3,"Mary"));
data1.add(new Tuple2(4,"Jone"));

//创建第二张表:用户ID 所在的城市
ArrayList<Tuple2<Integer, String>> data2 = new ArrayList<Tuple2<Integer,String>>();
data2.add(new Tuple2(1,"北京"));
data2.add(new Tuple2(2,"上海"));
data2.add(new Tuple2(4,"重庆"));

//实现join的多表查询:用户ID  姓名  所在的程序
DataSet<Tuple2<Integer, String>> table1 = env.fromCollection(data1);
DataSet<Tuple2<Integer, String>> table2 = env.fromCollection(data2);

//左外连接
table1.leftOuterJoin(table2).where(0).equalTo(0)
      .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
                Tuple2<Integer, String> table2) throws Exception {
            // 左外连接表示等号左边的信息会被包含
            if(table2 == null){
                return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
            }else{
                return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
            }
        }
    }).print();

System.out.println("***********************************");
//右外连接
table1.rightOuterJoin(table2).where(0).equalTo(0)
      .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

        public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1,
                Tuple2<Integer, String> table2) throws Exception {
            //右外链接表示等号右边的表的信息会被包含
            if(table1 == null){
                return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
            }else{
                return new Tuple3<Integer, String, String>(table2.f0,table1.f1,table2.f1);
            }
        }
    }).print();

System.out.println("***********************************");

//全外连接
table1.fullOuterJoin(table2).where(0).equalTo(0)
.with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,String>, Tuple3<Integer,String,String>>() {

    public Tuple3<Integer, String, String> join(Tuple2<Integer, String> table1, Tuple2<Integer, String> table2)
            throws Exception {
        if(table1 == null){
            return new Tuple3<Integer, String, String>(table2.f0,null,table2.f1);
        }else if(table2 == null){
            return new Tuple3<Integer, String, String>(table1.f0,table1.f1,null);
        }else{
            return new Tuple3<Integer, String, String>(table1.f0,table1.f1,table2.f1);
        }
    }
    
}).print();

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 9月27日

【赵强老师】MySQL的闪回

MySQL DBA或开发人员,有时会误删或者误更新数据,如果是线上环境并且影响较大,就需要能快速回滚。传统恢复方法是利用备份重搭实例,再应用去除错误sql后的binlog来恢复数据。此法费时费力,甚至需要停机维护,并不适合快速回滚。也有团队利用LVM快照来缩短恢复时间,但快照的缺点是会影响mysql的性能。MySQL闪回(flashback)利用binlog直接进行回滚,能快速恢复且不用停机。

闪回的原理

MySQL binlog以event的形式,记录了MySQL server从启用binlog以来所有的变更信息,能够帮助重现这之间的所有变化。MySQL引入binlog主要有两个目的:一是为了主从复制;二是某些备份还原操作后需要重新应用binlog。有三种可选的binlog格式,各有优缺点:

  • statement:基于SQL语句的模式,binlog数据量小,但是某些语句和函数在复制过程可能导致数据不一致甚至出错;
  • row:基于行的模式,记录的是行的完整变化。很安全,但是binlog会比其他两种模式大很多;
  • mixed:混合模式,根据语句来选用是statement还是row模式;

利用binlog闪回,需要将binlog格式设置为row。利用下面的语句可以查看当前binlog的模式。

show global variables like "%binlog_format%";

闪回的实战

真实的闪回场景中,最关键的是能快速筛选出真正需要回滚的SQL。我们使用开源工具binlog2sql来进行实战演练。binlog2sql由美团点评DBA团队(上海)出品,多次在线上环境做快速回滚。

① 安装binlog2sql工具

首先安装Python工具管理表pip
yum -y install epel-release
yum -y install python-pip

安装binlog2sql
git clone https://github.com/danfengcao/binlog2sql.git && cd binlog2sql
pip install -r requirements.txt

② 闪回案例实战

1、我们使用之前的员工表数据,单独建立一个数据库

create database testflashback;
use testflashback;
source /root/tools/scott.sql

2、误操作,执行下面的事务。

start transaction;
delete from emp where sal>3000;
update emp set sal=6000;
delete from emp where job='CLERK';
commit;

3、查看目前的binlog文件

show master logs;

4、最新的binlog文件是mysql-binlog.000001。我们的目标是筛选出需要回滚的SQL,由于误操作人只知道大致的误操作时间,我们首先根据时间做一次过滤。只需要解析testflashback库emp表。(注:如果有多个sql误操作,则生成的binlog可能分布在多个文件,需解析多个文件)

python binlog2sql/binlog2sql.py -uroot -pWelcome_1 \
--start-file='mysql-binlog.000001' > /root/tools/raw.sql

上面的语句将列车emp表的所有binlog日志。如果能够确定大致的时间范围,可以使用参数--start-datetime和--stop-datetime进行过滤。例如:

--start-datetime='2016-12-26 11:44:00' --stop-datetime='2016-12-26 11:50:00'

解析处理的binlog如下:

5、根据位置信息,我们确定了误操作sql来自同一个事务,准确位置在14956-16791之间(binlog2sql对于同一个事务会输出同样的start position)。再根据位置过滤,使用 -B 选项生成回滚sql,检查回滚sql是否正确。(注:真实场景下,生成的回滚SQL经常会需要进一步筛选。结合grep、编辑器等)

python binlog2sql/binlog2sql.py -uroot -pWelcome_1 --start-file='mysql-binlog.000001' \
--start-position=14956 --stop-position=16791 -B > /root/tools/rollback.sql

下面是生成的闪回语句:

6、与业务方确认回滚sql没问题,执行回滚语句。登录mysql,确认回滚成功。

mysql -uroot -pWelcome_1 < /root/tools/rollback.sql

7、检查数据是否恢复

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 9月13日

【赵强老师】在MongoDB中使用MapReduce方式计算聚合

MapReduce 能够计算非常复杂的聚合逻辑,非常灵活,但是,MapReduce非常慢,不应该用于实时的数据分析中。MapReduce能够在多台Server上并行执行,每台Server只负责完成一部分wordload,最后将wordload发送到Master Server上合并,计算出最终的结果集,返回客户端。
MapReduce的基本思想,如下图所示:

在这个例子中,我们以一个求和为例。首先执行Map阶段,把一个大任务拆分成若干个小任务,每个小任务运行在不同的节点上,从而支持分布式计算,这个阶段叫做Map(如蓝框所示);每个小任务输出的结果再进行二次计算,最后得到结果55,这个阶段叫做Reduce(如红框所示)。

使用MapReduce方式计算聚合,主要分为三步:Map,Shuffle(拼凑)和Reduce,Map和Reduce需要显式定义,shuffle由MongoDB来实现。

  • Map:将操作映射到每个doc,产生Key和Value
  • Shuffle:按照Key进行分组,并将key相同的Value组合成数组
  • Reduce:把Value数组化简为单值

我们以下面的测试数据(员工数据)为例,来为大家演示。

db.emp.insert(
[
{_id:7369,ename:'SMITH' ,job:'CLERK'    ,mgr:7902,hiredate:'17-12-80',sal:800,comm:0,deptno:20},
{_id:7499,ename:'ALLEN' ,job:'SALESMAN' ,mgr:7698,hiredate:'20-02-81',sal:1600,comm:300 ,deptno:30},
{_id:7521,ename:'WARD'  ,job:'SALESMAN' ,mgr:7698,hiredate:'22-02-81',sal:1250,comm:500 ,deptno:30},
{_id:7566,ename:'JONES' ,job:'MANAGER'  ,mgr:7839,hiredate:'02-04-81',sal:2975,comm:0,deptno:20},
{_id:7654,ename:'MARTIN',job:'SALESMAN' ,mgr:7698,hiredate:'28-09-81',sal:1250,comm:1400,deptno:30},
{_id:7698,ename:'BLAKE' ,job:'MANAGER'  ,mgr:7839,hiredate:'01-05-81',sal:2850,comm:0,deptno:30},
{_id:7782,ename:'CLARK' ,job:'MANAGER'  ,mgr:7839,hiredate:'09-06-81',sal:2450,comm:0,deptno:10},
{_id:7788,ename:'SCOTT' ,job:'ANALYST'  ,mgr:7566,hiredate:'19-04-87',sal:3000,comm:0,deptno:20},
{_id:7839,ename:'KING'  ,job:'PRESIDENT',mgr:0,hiredate:'17-11-81',sal:5000,comm:0,deptno:10},
{_id:7844,ename:'TURNER',job:'SALESMAN' ,mgr:7698,hiredate:'08-09-81',sal:1500,comm:0,deptno:30},
{_id:7876,ename:'ADAMS' ,job:'CLERK'    ,mgr:7788,hiredate:'23-05-87',sal:1100,comm:0,deptno:20},
{_id:7900,ename:'JAMES' ,job:'CLERK'    ,mgr:7698,hiredate:'03-12-81',sal:950,comm:0,deptno:30},
{_id:7902,ename:'FORD'  ,job:'ANALYST'  ,mgr:7566,hiredate:'03-12-81',sal:3000,comm:0,deptno:20},
{_id:7934,ename:'MILLER',job:'CLERK'    ,mgr:7782,hiredate:'23-01-82',sal:1300,comm:0,deptno:10}
]
);

(案例一)求员工表中,每种职位的人数

var map1=function(){emit(this.job,1)}
var reduce1=function(job,count){return Array.sum(count)}
db.emp.mapReduce(map1,reduce1,{out:"mrdemo1"})

(案例二)求员工表中,每个部门的工资总和

var map2=function(){emit(this.deptno,this.sal)}
var reduce2=function(deptno,sal){return Array.sum(sal)}
db.emp.mapReduce(map2,reduce2,{out:"mrdemo2"})

(案例三)Troubleshoot the Map Function

定义自己的emit函数:
var emit = function(key, value) {
print("emit");
print("key: " + key + "  value: " + tojson(value));
}

测试一条数据:
emp7839=db.emp.findOne({_id:7839})
map2.apply(emp7839)
输出以下结果:
emit
key: 10  value: 5000

测试多条数据:
var myCursor=db.emp.find()
while (myCursor.hasNext()) {
    var doc = myCursor.next();
    print ("document _id= " + tojson(doc._id));
    map2.apply(doc);
    print();
}

(案例四)Troubleshoot the Reduce Function

一个简单的测试案例
var myTestValues = [ 5, 5, 10 ];
var reduce1=function(key,values){return Array.sum(values)}
reduce1("mykey",myTestValues)

测试:Reduce的value包含多个值
测试数据:薪水、奖金:
var myTestObjects = [
                      { sal: 1000, comm: 5 },
                      { sal: 2000, comm: 10 },
                      { sal: 3000, comm: 15 }
                    ];
开发reduce方法:
var reduce2=function(key,values) {
   reducedValue = { sal: 0, comm: 0 };
   for(var i=0;i<values.length;i++) {
     reducedValue.sal += values[i].sal;
     reducedValue.comm += values[i].comm;
   }  
   return reducedValue;
}

测试:
reduce2("aa",myTestObjects)

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 8月27日

【赵强老师】Redis的慢查询日志

Redis慢查询日志帮助开发和运维人员定位系统存在的慢操作。慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(慢查询ID,发生时间戳,耗时,命令的详细信息)记录下来。

Redis客户端一条命令分为如下四部分执行:

需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题。需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题。

一、慢查询的配置参数

  • 慢查询的预设阀值  slowlog-log-slower-than

slowlog-log-slower-than参数就是预设阀值,单位是微秒,默认值是10000,如果一条命令的执行时间超过10000微妙,那么它将被记录在慢查询日志中。
如果slowlog-log-slower-than的值是0,则会记录所有命令。
如果slowlog-log-slower-than的值小于0,则任何命令都不会记录日志。

  • 慢查询日志的长度slowlog-max-len

slowlog-max-len只是说明了慢查询日志最多存储多少条。Redis使用一个列表来存储慢查询日志,showlog-max-len就是列表的最大长度。当慢查询日志已经到达列表的最大长度时,又有慢查询日志要进入列表,则最早插入列表的日志将会被移出列表,新日志被插入列表的末尾。

二、慢查询日志的组成

慢查询日志由以下四个属性组成:
标识ID,发生时间戳,命令耗时,执行命令和参数

三、慢查询日志的访问和管理

获取慢查询日志slowlog get [n]
命令:slowlog get [N]
选型:N,可选,代表获取的日志条数
例如:showlog get 5

四、慢查询日志最佳实践

  • slowlog-max-len的设置建议

线上环境建议调大慢查询日志的列表,记录慢查询日志时Redis会对长命令做截断操作,并不会占用大量内存。增大慢查询列表可以减缓慢查询被剔除出列表的可能性。例如线上可以设置为1000以上。

  • slowlog-log-lower-than的设置建议

需要根据redis的并发量调整该值。由于redis采用单线程响应名利,对于高流量的场景,如果执行命令的时间在1毫秒以上,那么redis最多可支撑OPS(每秒操作次数)不到1000,因此高OPS场景的REDIS建议设置为1毫秒。

  • 慢查询只记录命令执行时间,并不包括命令排队时间和网络传输时间。

因此客户端命令的执行时间要大于redis服务器实际执行命令的时间。因为命令执行排队极致,慢查询会导致命令级联阻塞,因此当客户端出现请求超时,需要检查该时间点是否有对应的慢查询,从而分析是否因为慢查询导致的命令级联阻塞

  • 慢查询日志是一个先进先出队列

慢查询较多的情况下,可能会丢失部分慢查询命令,可以定期执行slow get命令将慢查询日志持久化到其他存储中。然后制作可视化界面查询。

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 8月10日

【赵强老师】Redis的RDB持久化

Redis 提供了多种不同级别的持久化方式:

  • RDB 持久化可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot)。
  • AOF (Append-only file)持久化记录服务器执行的所有写操作命令,并在服务器启动时,通过重新执行这些命令来还原数据集。 AOF 文件中的命令全部以 Redis 协议的格式来保存,新命令会被追加到文件的末尾。 Redis 还可以在后台对 AOF 文件进行重写(rewrite),使得 AOF 文件的体积不会超出保存数据集状态所需的实际大小。
  • Redis 还可以同时使用 AOF 持久化和 RDB 持久化。 在这种情况下, 当 Redis 重启时, 它会优先使用 AOF 文件来还原数据集, 因为 AOF 文件保存的数据集通常比 RDB 文件所保存的数据集更完整。
  • 你甚至可以关闭持久化功能,让数据只在服务器运行时存在。

一、RDB的持久化

工作原理:每隔一定时间给内存照一个快照,将内存中的数据写入文件(rdb文件)。这是Redis默认的持久化方式。当redis生成dump.rdb文件时,工作过程如下:

  • 当达到RDB生成条件时,redis主进程fork一个子进程
  • fork出来的子进程将内存的数据集dump到临时的RDB中
  • 当子进程对临时的RDB文件写入完毕,redis用新的RDB文件代替旧的RDB文件

配置参数如下:

RDB示例测试:可以使用redis-benchmark进行压力测试,观察RDB文件大小的变化。

bin/redis-benchmark -n 100000  表示执行100000个操作

二、RDB的缺点:

在两次快照之间,如果发生断电,数据会丢失。举例:在生成rdb后,插入新值。突然断电,数据可能会丢失。

三、监控RDB:

Redis监控最直接的方法当然就是使用系统提供的 info 命令来做了,只需要执行下面一条命令,就能获得 Redis 系统的状态报告。

bin/redis-cli info | grep rdb_
  • rdb_changes_since_last_save 表明上次RDB保存以后改变的key次数
  • rdb_bgsave_in_progress 表示当前是否在进行bgsave操作,1表示正在进行;0表示没有进行
  • rdb_last_save_time 上次保存RDB文件的时间戳
  • rdb_last_bgsave_time_sec 上次保存的耗时
  • rdb_last_bgsave_status 上次保存的状态
  • rdb_current_bgsave_time_sec 目前保存RDB文件已花费的时间

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 7月23日

【赵强老师】MySQL高可用架构:MHA

MHA(Master HA)是一款开源的 MySQL 的高可用程序,它为 MySQL 主从复制架构提供了 automating master failover 功能。MHA 在监控到 master 节点故障时,会提升其中拥有最新数据的 slave 节点成为新的master 节点,在此期间,MHA 会通过于其它从节点获取额外信息来避免一致性方面的问题。MHA 还提供了 master 节点的在线切换功能,即按需切换 master/slave 节点。
MHA 是由日本人 yoshinorim(原就职于DeNA现就职于FaceBook)开发的比较成熟的 MySQL 高可用方案。MHA 能够在30秒内实现故障切换,并能在故障切换中,最大可能的保证数据一致性。目前淘宝也正在开发相似产品 TMHA, 目前已支持一主一从。

一、MHA的组成

(一)MHA中的角色

MHA 服务有两种角色, MHA Manager(管理节点)和 MHA Node(数据节点):

  • MHA Manager:通常单独部署在一台独立机器上管理多个 master/slave 集群(组),每个master/slave 集群称作一个application,用来管理统筹整个集群。
  • MHA Node:运行在每台 MySQL 服务器上(master/slave/manager),它通过监控具备解析和清理 logs 功能的脚本来加快故障转移。主要是接收管理节点所发出指令的代理,代理需要运行在每一个 mysql 节点上。简单讲node就是用来收集从节点服务器上所生成的bin-log。对比打算提升为新的主节点之上的从节点的是否拥有并完成操作,如果没有发给新主节点在本地应用后提升为主节点。

(二)MHA提供的工具

MHA会提供诸多工具程序,其常见的如下所示:

  • Manager节点

  • Node节点(这些工具通常由MHA Manager的脚本触发,无需人为操作)

二、MHA的工作原理

  • 从宕机崩溃的master保存二进制日志事件(binlog events);
  • 识别含有最新更新的 slave ;
  • 应用差异的中继日志(relay log) 到其他 slave ;
  • 应用从 master 保存的二进制日志事件(binlog events);
  • 提升一个 slave 为新 master ;
  • 使用其他的 slave 连接新的 master 进行复制。

三、部署MHA

(一)准备实验环境

  • 三台主机:mysql111、mysql112、mysql113
  • 操作系统:CentOS
  • 配置主机名(/etc/hosts)
  • 配置免密码登录
  • 每台主机安装MySQL

(二)搭建主从环境

  • 每台MySQL开启binlog,设置server-id
vi /etc/my.cnf
log-bin=mysql-binlog
server-id=1           (注意:每台MySQL设置不同的server-id)

show variables like '%log_bin%';
  • 在所有机器上,创建主从复制的账号
create user 'repl'@'192.168.79.%' identified by 'Welcome_1';
grant replication slave on *.* to 'repl'@'192.168.79.%';
flush privileges;
  • 在所有机器上,创建管理账号
create user 'myadmin'@'192.168.79.%' identified by 'Welcome_1';
grant all privileges on *.* to 'myadmin'@'192.168.79.%';
flush privileges;
  • 在主库和从库上,启用GTID
set @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;
set @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;
set @@GLOBAL.GTID_MODE = ON_PERMISSIVE;
set @@GLOBAL.GTID_MODE = ON;

参数说明:
GTID是MySQL 5.6的新特性,其全称是Global Transaction Identifier,可简化MySQL的主从切换以及Failover。GTID用于在binlog中唯一标识一个事务。当事务提交时,MySQL Server在写binlog的时候,会先写一个特殊的Binlog Event,类型为GTID_Event,指定下一个事务的GTID,然后再写事务的Binlog。主从同步时GTID_Event和事务的Binlog都会传递到从库,从库在执行的时候也是用同样的GTID写binlog,这样主从同步以后,就可通过GTID确定从库同步到的位置了。也就是说,无论是级联情况,还是一主多从情况,都可以通过GTID自动找点儿,而无需像之前那样通过File_name和File_position找点儿了。

  • 在从库上分别配置主从复制命令并开启主从同步
change master to master_host='mysql111',master_user='repl',\
master_password='Welcome_1',master_auto_position=1;

start slave;
  • 在从库上查看主从复制的状态
show slave status\G;

  • 测试主从复制

(三)安装MHA

  • 所有节点安装node软件依赖包和node软件
yum -y install perl-DBD-MySQL 
rpm -ivh mha4mysql-node-0.58-0.el7.centos.noarch.rpm
  • 在mysql111上安装安装manager软件依赖包
yum install -y perl-Config-Tiny 
yum install -y epel-release 
yum install -y perl-Log-Dispatch 
yum install -y perl-Parallel-ForkManager 
yum install -y perl-Time-HiRes
  • 在mysql111上安装安装manager软件
rpm -ivh mha4mysql-manager-0.58-0.el7.centos.noarch.rpm

(四)创建Manager的配置文件

#创建配置文件目录、日志目录
mkdir -p /etc/mha
mkdir -p /var/log/mha/log

#编辑mha配置文件 vi /etc/mha/mha.cnf
[server default]
manager_log=/var/log/mha/log/manager
manager_workdir=/var/log/mha/log
master_binlog_dir=/var/lib/mysql
user=myadmin
password=Welcome_1
ping_interval=2
repl_user=repl
repl_password=Welcome_1
ssh_user=root
[server1]
hostname=mysql111
port=3306
[server2]
hostname=mysql112
port=3306
[server3]
hostname=mysql113
port=3306

(五)检查状态,并开启MHA

  • 检查互信
masterha_check_ssh --conf=/etc/mha/mha.cnf

  • 检查主从复制状态
masterha_check_repl --conf=/etc/mha/mha.cnf

  • 开启MHA-manager
nohup masterha_manager --conf=/etc/mha/mha.cnf > /var/log/mha/log/manager.log < /dev/null 2>&1 &
  • 查看MHA状态
masterha_check_status --conf=/etc/mha/mha.cnf

  • 测试HA高可用的自动切换
在mysql111上,执行shutdown操作
mysqladmin -uroot -pWelcome_1 shutdown

MHA会自动进行主从切换。切换完成后,MHA进程会自动停止运行。

在mysql112和mysql113上观察,执行下面的语句:
show slave status\G;

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 7月19日

【赵强老师】Flink的Watermark机制(基于Flink 1.11.0实现)

在使用eventTime的时候如何处理乱序数据?我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。所以在进行window计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。Watermark是用于处理乱序事件的,用于衡量Event Time进展的机制。watermark可以翻译为水位线。

一、Watermark的核心原理

Watermark的核心本质可以理解成一个延迟触发机制。
在 Flink 的窗口处理过程中,如果确定全部数据到达,就可以对 Window 的所有数据做 窗口计算操作(如汇总、分组等),如果数据没有全部到达,则继续等待该窗口中的数据全 部到达才开始处理。这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处 理进度(表达数据到达的完整性),保证事件数据(全部)到达 Flink 系统,或者在乱序及 延迟到达时,也能够像预期一样计算出正确并且连续的结果。当任何 Event 进入到 Flink 系统时,会根据当前最大事件时间产生 Watermarks 时间戳。

那么 Flink 是怎么计算 Watermak 的值呢?

Watermark =进入Flink 的最大的事件时间(mxtEventTime)-指定的延迟时间(t)

那么有 Watermark 的 Window 是怎么触发窗口函数的呢?
如果有窗口的停止时间等于或者小于 maxEventTime - t(当时的warkmark),那么这个窗口被触发执行。

其核心处理流程如下图所示。

二、Watermark的三种使用情况

1、本来有序的Stream中的 Watermark

如果数据元素的事件时间是有序的,Watermark 时间戳会随着数据元素的事件时间按顺 序生成,此时水位线的变化和事件时间保持一直(因为既然是有序的时间,就不需要设置延迟了,那么t就是 0。所以 watermark=maxtime-0 = maxtime),也就是理想状态下的水位 线。当 Watermark 时间大于 Windows 结束时间就会触发对 Windows 的数据计算,以此类推, 下一个 Window 也是一样。这种情况其实是乱序数据的一种特殊情况。

2、乱序事件中的Watermark

现实情况下数据元素往往并不是按照其产生顺序接入到 Flink 系统中进行处理,而频繁 出现乱序或迟到的情况,这种情况就需要使用 Watermarks 来应对。比如下图,设置延迟时间t为2。

3、并行数据流中的Watermark

在多并行度的情况下,Watermark 会有一个对齐机制,这个对齐机制会取所有 Channel 中最小的 Watermark。

三、设置Watermark的核心代码

1、首先,正确设置事件处理的时间语义,一般都是采用Event Time。

sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    

2、其次,指定生成Watermark的机制,包括:延时处理的时间和EventTime对应的字段。如下:

注意:不管是数据是否有序,都可以使用上面的代码。有序的数据只是无序数据的一种特殊情况。

四、Watermark编程案例

测试数据:基站的手机通话数据,如下:

需求:按基站,每5秒统计通话时间最长的记录。

  • StationLog用于封装基站数据
package watermark;

//station1,18688822219,18684812319,10,1595158485855
public class StationLog {
    private String stationID;   //基站ID
    private String from;        //呼叫放
    private String to;            //被叫方
    private long duration;        //通话的持续时间
    private long callTime;        //通话的呼叫时间
    public StationLog(String stationID, String from, 
                      String to, long duration, 
                      long callTime) {
        this.stationID = stationID;
        this.from = from;
        this.to = to;
        this.duration = duration;
        this.callTime = callTime;
    }
    public String getStationID() {
        return stationID;
    }
    public void setStationID(String stationID) {
        this.stationID = stationID;
    }
    public long getCallTime() {
        return callTime;
    }
    public void setCallTime(long callTime) {
        this.callTime = callTime;
    }
    public String getFrom() {
        return from;
    }
    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }
    public void setTo(String to) {
        this.to = to;
    }
    public long getDuration() {
        return duration;
    }
    public void setDuration(long duration) {
        this.duration = duration;
    }
}
  • 代码实现:WaterMarkDemo用于完成计算(注意:为了方便咱们测试设置任务的并行度为1)
package watermark;

import java.time.Duration;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

//每隔五秒,将过去是10秒内,通话时间最长的通话日志输出。
public class WaterMarkDemo {
    public static void main(String[] args) throws Exception {
        //得到Flink流式处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //设置周期性的产生水位线的时间间隔。当数据流很大的时候,如果每个事件都产生水位线,会影响性能。
        env.getConfig().setAutoWatermarkInterval(100);//默认100毫秒
        
        //得到输入流
        DataStreamSource<String> stream = env.socketTextStream("bigdata111", 1234);
        stream.flatMap(new FlatMapFunction<String, StationLog>() {

            public void flatMap(String data, Collector<StationLog> output) throws Exception {
                String[] words = data.split(",");
                //                           基站ID            from    to        通话时长                                                    callTime
                output.collect(new StationLog(words[0], words[1],words[2], Long.parseLong(words[3]), Long.parseLong(words[4])));
            }
        }).filter(new FilterFunction<StationLog>() {
            
            @Override
            public boolean filter(StationLog value) throws Exception {
                return value.getDuration() > 0?true:false;
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
                    @Override
                    public long extractTimestamp(StationLog element, long recordTimestamp) {
                        return element.getCallTime(); //指定EventTime对应的字段
                    }
                })
        ).keyBy(new KeySelector<StationLog, String>(){
            @Override
            public String getKey(StationLog value) throws Exception {
                return value.getStationID();  //按照基站分组
            }}
        ).timeWindow(Time.seconds(5)) //设置时间窗口
        .reduce(new MyReduceFunction(),new MyProcessWindows()).print();

        env.execute();
    }
}
//用于如何处理窗口中的数据,即:找到窗口内通话时间最长的记录。
class MyReduceFunction implements ReduceFunction<StationLog> {
    @Override
    public StationLog reduce(StationLog value1, StationLog value2) throws Exception {
        // 找到通话时间最长的通话记录
        return value1.getDuration() >= value2.getDuration() ? value1 : value2;
    }
}
//窗口处理完成后,输出的结果是什么
class MyProcessWindows extends ProcessWindowFunction<StationLog, String, String, TimeWindow> {
    @Override
    public void process(String key, ProcessWindowFunction<StationLog, String, String, TimeWindow>.Context context,
            Iterable<StationLog> elements, Collector<String> out) throws Exception {
        StationLog maxLog = elements.iterator().next();

        StringBuffer sb = new StringBuffer();
        sb.append("窗口范围是:").append(context.window().getStart()).append("----").append(context.window().getEnd()).append("\n");;
        sb.append("基站ID:").append(maxLog.getStationID()).append("\t")
          .append("呼叫时间:").append(maxLog.getCallTime()).append("\t")
          .append("主叫号码:").append(maxLog.getFrom()).append("\t")
          .append("被叫号码:")    .append(maxLog.getTo()).append("\t")
          .append("通话时长:").append(maxLog.getDuration()).append("\n");
        out.collect(sb.toString());
    }
}

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 7月17日

【赵强老师】什么是Oracle的数据字典?

数据字典是oracle存放有关数据库信息的地方,几乎所有的系统信息和对象信息都可在数据字典中进行查询。数据字典是oracle数据库系统的信息核心,它是一组提供有关数据库信息的表和视图的集合,这些表和视图是只读的。它是随着数据库的建立而建立的,当数据库执行特定动作时数据字典也会自动更新。数据一览与数据字典来记录、校验和管理正在进行的操作。

Oracle中,sys用户是数据字典的拥有者,数据字典保证在所有数据库的系统表空间system内,任何用户都无权更改sys模式下的模式对象或数据字典中的行。也就是说数据字典只能查询,不能手动进行修改。

一、数据字典用途

Oracle通过存取数据字典从而比较方便地获取有关用户某事对象和存储结构等信息。当系统执行了DDL语句后,oracle会及时修改数据字典。任何用户只能以读的形式使用数据字典获取数据库信息。

二、数据字典存储的信息

  • 数据用户的名称
  • 为用户授予的权限和角色
  • 模式对象的名。
  • 完整性约束的具体信息;
  • 每个字段的默认值;
  • 数据库空间的使用情况;
  • 存储审计的信息
  • 对象与用户的严格管理(适用于高度机密管理);
  • 其他一般数据库信息

三、四种前缀的数据字典视图

  • user_ :任何用户都可以读取的视图,每个用户读取的都不一样,它只提供当前用户某事下的对象信息。
如查询当前模式下的所有对象:
select object_name, object_type from user_objects;
  • all_ :所有用户都可读取的用户视图,它提供与用户有关的对象信息。
如查询当前用户可访问的所有对象:
select owner, object_name, object_type from all_objects;
  • dba_:提供了只有数据库管理员才可读取的视图,包括所有用户视图中的对象信息。
如:
select owner, object_name, object_type from sys.dba_objects;
  • v$:动态性能视图

动态性能视图用于记录当前例程的活动信息,当启动oracle server时,系统会建立动态性能视图;当停止oracle server时,系统会删除动态性能视图,oracle的所有动态性能视图都是以v_$开始的,并且oracle为每个动态性能视图都提供例了相应的同义词,并且同义词是以v$开始的,例如_$datefile的同义词为v$datefile;动态性能视图的所有者为sys,一般情况下,由DBA或是特权用户来查询动态性能视图。

四、查询数据字典示例

  • 查询用户相关的数据字典
查询用户
select username from dba_users; -- 只有管理员权限的用户才能查询
select username from all_users; -- 当前或任何用户都可使用

-- 查看当前用户的默认表空间
select username, default_tablespace from user_users;

--当前用户角色
select * from user_role_privs;

-- 当前用户的系统权限和表级权限
select * from user_sys_privs;
select * from user_tab_privs;
  • 查询表空间相关的数据字典(拥有DBA权限的用户才能查询)
select * from dba_data_files;
select * from dba_tablespaces; --表空间
select tablespace_name, sum(bytes), sum(blocks) from dba_free_space group by tablespace_name; --空闲表空间
select * from dba_data_files where tablespace_name='USERS'; -- 表空间对于的数据文件
select * from dba_segments where tablespace_name='USERS';

--查询用户模式对象所使用过的正在使用空间大小
select name, type, source_size, code_size from user_object_size;
  • 查询数据库对象(拥有DBA权限的用户才能查询)
select * from dba_objects
select * from dba_objects where object_type = upper('package body');
select * from dba_objects where OBJECT_TYPE='TABLE' and OWNER='SCOTT'

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 7月7日

【赵强老师】第一个Oracle的手工备份和恢复

一、什么是手工管理的备份与恢复?

尽管在Oracle中,已经有了RMAN的备份与恢复。但是作为Oracle备份恢复的一种方式,我们将在本文中通过一个例子来为大家介绍如何使用手工的方式来完成Oracle的备份与恢复。手工方式的本质是通过操作系统的cp命令完成,但是在备份与恢复的时候,需要把数据块置为正确的状态。

手工方式下也存在一些缺点,例如:需要手工管理备份内容,容易丢失,不利于管理

二、第一个手工管理的备份与恢复

这里我们以一个表空间的备份与恢复为例。

  • 准备测试数据
create tablespace mytbs datafile '/home/oracle/mytbs01.dbf' size 50M;
alter user scott quota unlimited on mytbs;
create table scott.test1 tablespace mytbs as select ename,sal from scott.emp;
create table scott.test2 tablespace mytbs as select * from scott.dept;
  • 执行手工备份
alter tablespace mytbs begin backup;
host cp /home/oracle/mytbs01.dbf /home/oracle/backup/demo1/mytbs01.dbf
alter tablespace mytbs end backup;
  • 执行破坏性操作
删除数据文件,模拟数据丢失
rm -rf /home/oracle/mytbs01.dbf

打开数据库:startup

出现以下错误:
ORA-01157: cannot identify/lock data file 5 - see DBWR trace file
ORA-01110: data file 5: '/home/oracle/mytbs01.dbf'

查询错误信息:
SQL> select FILE#,ERROR from V$RECOVER_FILE;
     FILE# ERROR
---------- ---------------------------------------------------
     5 FILE NOT FOUND

查看当前数据库的状态:
SQL> select OPEN_MODE from v$database;
OPEN_MODE
--------------------
MOUNTED
  • 执行手工恢复
转储数据文件:将备份拷贝回原来的位置
cp /home/oracle/backup/demo1/mytbs01.dbf /home/oracle/mytbs01.dbf

恢复:
recover datafile 8; 

打开数据库:
alter database open;
  • 检查数据库是否恢复

查看原文

赞 0 收藏 0 评论 0

赵强老师 发布了文章 · 7月2日

【赵强老师】什么是Spark SQL?

一、Spark SQL简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。

为什么要学习Spark SQL?我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!同时Spark SQL也支持从Hive中读取数据。

二、Spark SQL的特点

  • 无缝集成在Spark中,将SQL查询与Spark程序混合。Spark SQL允许您使用SQL或熟悉的DataFrame API在Spark程序中查询结构化数据。适用于Java、Scala、Python和R语言。
  • 提供统一的数据访问,以相同的方式连接到任何数据源。DataFrames和SQL提供了一种访问各种数据源的通用方法,包括Hive、Avro、Parquet、ORC、JSON和JDBC。您甚至可以通过这些源连接数据。
  • 支持Hive集成。在现有仓库上运行SQL或HiveQL查询。Spark SQL支持HiveQL语法以及Hive SerDes和udf,允许您访问现有的Hive仓库。
  • 支持标准的连接,通过JDBC或ODBC连接。服务器模式为业务智能工具提供了行业标准JDBC和ODBC连接。

三、核心概念:DataFrames和Datasets

  • DataFrame

DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,例如:

  • 结构化数据文件
  • hive中的表
  • 外部数据库或现有RDDs

DataFrame API支持的语言有Scala,Java,Python和R。

从上图可以看出,DataFrame多了数据的结构信息,即schema。RDD是分布式的 Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

  • Datasets

Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化,使用强大的lambda函数的能力)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。

四、创建DataFrames

  • 测试数据如下:员工表

  • 定义case class(相当于表的结构:Schema)
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
  • 将HDFS上的数据读入RDD,并将RDD与case Class关联
val lines = sc.textFile("hdfs://bigdata111:9000/input/emp.csv").map(_.split(","))
  • 把每个Array映射成一个Emp的对象
val emp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
  • 生成DataFrame
val allEmpDF = emp.toDF
  • 通过DataFrames查询数据

  • 将DataFrame注册成表(视图)
allEmpDF.createOrReplaceTempView("emp")
  • 执行SQL查询
spark.sql("select * from emp").show

查看原文

赞 0 收藏 0 评论 0

认证与成就

  • 获得 2 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 3 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 3月31日
个人主页被 528 人浏览