labmem

labmem 查看完整档案

填写现居城市  |  填写毕业院校  |  填写所在公司/组织填写个人主网站
编辑

el psy congroo

个人动态

labmem 赞了文章 · 2017-01-05

RxJava系列番外篇:一个RxJava解决复杂业务逻辑的案例

之前写过一系列RxJava1的文章,也承诺过会尽快有RxJava2的介绍。无奈实际项目中还未真正的使用RxJava2,不敢妄动笔墨。所以这次还是给大家分享一个使用RxJava1解决问题的案例,希望对大家在使用RxJava的时候有一点点启发。对RxJava还不了解的同学可以先去看看我之前的RxJava系列文章:

业务场景

MinimalistWeather这个开源的天气App来举例:

进入App首页后,首先我们需要从数据库中获取当前城市的天气数据,如果数据库中存在天气数据则在UI页面上展示天气数据;如果数据库中未存储当前城市的天气数据,或者已存储的天气数据的发布时间相比现在已经超过了一小时,并且网络属于连接状态则调用API从服务端获取天气数据。如果获取到到的天气数据发布时间和当前数据库中的天气数据发布时间一致则丢弃掉从服务端获取到的天气数据,如果不一致则更新数据库并且在页面上展示最新的天气信息。(同时天气数据源是可配置的,可选择是小米天气数据源还是Know天气数据源)

解决方案

首先我们需要创建一个从数据库获取天气数据的Observable observableForGetWeatherFromDB,同时我们也需要创建一个从API获取天气数据的Observable observableForGetWeatherFromNetWork;为了在无网络状态下免于创建observableForGetWeatherFromNetWork我们在这之前需要首先判断下网络状态。最后使用contact操作符将两个Observable合并,同时使用distincttakeUntil操作符来过滤筛选数据以符合业务需求,然后结合subscribeOnobserveOn做线程切换。上述这一套复杂的业务逻辑如果使用传统编码方式将是极其复杂的。下面我们来看看使用RxJava如何清晰简洁的来实现这个复杂的业务:

Observable<Weather> observableForGetWeatherData;
//首先创建一个从数据库获取天气数据的Observable
Observable<Weather> observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe<Weather>() {
    @Override
    public void call(Subscriber<? super Weather> subscriber) {
        try {
            Weather weather = weatherDao.queryWeather(cityId);
            subscriber.onNext(weather);
            subscriber.onCompleted();
        } catch (SQLException e) {
            throw Exceptions.propagate(e);
        }
    }
});

if (!NetworkUtils.isNetworkConnected(context)) {
    observableForGetWeatherData = observableForGetWeatherFromDB;
} else {
    //接着创建一个从网络获取天气数据的Observable
    Observable<Weather> observableForGetWeatherFromNetWork = null;
    switch (configuration.getDataSourceType()) {
        case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
            observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
                    .map(new Func1<KnowWeather, Weather>() {
                        @Override
                        public Weather call(KnowWeather knowWeather) {
                            return new KnowWeatherAdapter(knowWeather).getWeather();
                        }
                    });
            break;
        case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
            observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
                    .map(new Func1<MiWeather, Weather>() {
                        @Override
                        public Weather call(MiWeather miWeather) {
                            return new MiWeatherAdapter(miWeather).getWeather();
                        }
                    });
            break;
    }
    assert observableForGetWeatherFromNetWork != null;
    observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
            .doOnNext(new Action1<Weather>() {
                @Override
                public void call(Weather weather) {
                    Schedulers.io().createWorker().schedule(() -> {
                        try {
                            weatherDao.insertOrUpdateWeather(weather);
                        } catch (SQLException e) {
                            throw Exceptions.propagate(e);
                        }
                    });
                }
            });

    //使用concat操作符将两个Observable合并
    observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)
            .filter(new Func1<Weather, Boolean>() {
                @Override
                public Boolean call(Weather weather) {
                    return weather != null && !TextUtils.isEmpty(weather.getCityId());
                }
            })
            .distinct(new Func1<Weather, Long>() {
                @Override
                public Long call(Weather weather) {
                    return weather.getRealTime().getTime();//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉
                }
            })
            .takeUntil(new Func1<Weather, Boolean>() {
                @Override
                public Boolean call(Weather weather) {
                    return System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000;//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流
                }
            });
}

observableForGetWeatherData.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Weather>() {
            @Override
            public void call(Weather weather) {
                displayWeatherInformation();
            }
        }, new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show();
            }
        });

上面的代码看起来比较复杂,我们采用Lambda表达式简化下代码:

Observable<Weather> observableForGetWeatherData;
//首先创建一个从数据库获取天气数据的Observable
Observable<Weather> observableForGetWeatherFromDB = Observable.create(new Observable.OnSubscribe<Weather>() {
    @Override
    public void call(Subscriber<? super Weather> subscriber) {
        try {
            Weather weather = weatherDao.queryWeather(cityId);
            subscriber.onNext(weather);
            subscriber.onCompleted();
        } catch (SQLException e) {
            throw Exceptions.propagate(e);
        }
    }
});

if (!NetworkUtils.isNetworkConnected(context)) {
    observableForGetWeatherData = observableForGetWeatherFromDB;
} else {
    //接着创建一个从网络获取天气数据的Observable
    Observable<Weather> observableForGetWeatherFromNetWork = null;
    switch (configuration.getDataSourceType()) {
        case ApiConstants.WEATHER_DATA_SOURCE_TYPE_KNOW:
            observableForGetWeatherFromNetWork = ApiClient.weatherService.getKnowWeather(cityId)
                    .map(knowWeather -> new KnowWeatherAdapter(knowWeather).getWeather());
            break;
        case ApiConstants.WEATHER_DATA_SOURCE_TYPE_MI:
            observableForGetWeatherFromNetWork = ApiClient.weatherService.getMiWeather(cityId)
                    .map(miWeather -> new MiWeatherAdapter(miWeather).getWeather());
            break;
    }
    assert observableForGetWeatherFromNetWork != null;
    observableForGetWeatherFromNetWork = observableForGetWeatherFromNetWork
            .doOnNext(weather -> Schedulers.io().createWorker().schedule(() -> {
                try {
                    weatherDao.insertOrUpdateWeather(weather);
                } catch (SQLException e) {
                    throw Exceptions.propagate(e);
                }
            }));

    //使用concat操作符将两个Observable合并
    observableForGetWeatherData = Observable.concat(observableForGetWeatherFromDB, observableForGetWeatherFromNetWork)
            .filter(weather -> weather != null && !TextUtils.isEmpty(weather.getCityId()))
            .distinct(weather -> weather.getRealTime().getTime())//如果天气数据发布时间一致,我们再认为是相同的数据从丢弃掉
            .takeUntil(weather -> System.currentTimeMillis() - weather.getRealTime().getTime() <= 60 * 60 * 1000);//如果天气数据发布的时间和当前时间差在一小时以内则终止事件流
}

observableForGetWeatherData.subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(weather -> displayWeatherInformation(),
                throwable -> Toast.makeText(context, throwable.getMessage(), Toast.LENGTH_LONG).show());

小技巧

在上述的实现中有几点是我们需要注意的:

  1. 为什么我需要在判断网络那块整个if else?这样看起来很不优雅,我们通过RxJava符完全可以实现同样的操作啊!之所以这样做是为了在无网络状况下去创建不必要的Observable observableForGetWeatherFromNetWork;

  2. 更新数据库的操作不应该阻塞更新UI,因此我们在observableForGetWeatherFromNetWorkdoOnNext中需要通过Schedulers.io().createWorker()去另起一条线程,以此保证更新数据库不会阻塞更新UI的操作。

有同学可能会问为什么不在doOnNext之后再调用一次observeOn把更新数据库的操作切换到一条新的子线程去操作呢?其实一开始我也是这样做的,后来想想不对。整个Observable的事件传递处理就像是在一条流水线上完成的,虽然我们可以通过observeOn来指定子线程去处理更新数据库的操作,但是只有等这条子线程完成了更新数据库的任务后事件才会继续往后传递,这样就阻塞了更新UI的操作。对此有疑问的同学可以去看看我之前关于RxJava源码分析的文章或者自己动手debug看看。

问题

最后给大家留个两个问题:

  1. 上述代码是最佳实现方案吗?还有什么更加合理的做法?

  2. 我们在observableForGetWeatherData中使用distincttakeUntil过滤筛选天气数据的时候网络请求会不会已经发出去了?这样做还有意义吗?

欢迎大家留言讨论。

本文中的代码在MinimalistWeather中的WeatherDataRepository类中有同样的实现,文章中为了更完整的将整个实现过程呈现出来,对代码做了部分改动。

如果大家喜欢这一系列的文章,欢迎关注我的知乎专栏、Github以及简书。

查看原文

赞 2 收藏 12 评论 0

labmem 赞了文章 · 2016-11-09

源码之下无秘密 ── 做最好的 Netty 源码分析教程

背景

在工作中, 虽然我经常使用到 Netty 库, 但是很多时候对 Netty 的一些概念还是处于知其然, 不知其所以然的状态, 因此就萌生了学习 Netty 源码的想法.
刚开始看源码的时候, 自然是比较痛苦的, 主要原因有两个: 第一, 网上没有找到让我满意的详尽的 Netty 源码分析的教程; 第二, 我也是第一次系统地学习这么大代码量的源码. 由于这两个原因, 最开始时, 看代码的进度很慢, 甚至一度想放弃了, 不过最后很庆幸自己能够坚持下去, 并因此从 Netty 源码中学到了很多宝贵的知识.

下面我将自己在 Netty 源码学习过程记录下来, 整理成博客, 与大家分享交流, 共同学习. 由于本人才疏学浅, 文章中难免有不少错误之处, 期待能得到大家的建议和斧正.

最后, 忘了提了, 我使用的 Netty 版本: 4.0.33.Final

PS. 不小心做了一次标题党, 不过正如标题所言, 即使不是最好的, 那也要尽力 做到最好的!

此系列文章已发布到我的 github

目录

本文由 yongshun 发表于个人博客, 采用 署名-相同方式共享 3.0 中国大陆许可协议.
Email: yongshun1228@gmail .com
本文标题为: 源码之下无秘密 ── 做最好的 Netty 源码分析教程
本文链接为: https://segmentfault.com/a/1190000007282628

查看原文

赞 158 收藏 317 评论 24

labmem 赞了文章 · 2016-11-09

记一次有趣的 Netty 源码问题

背景

起因是一个朋友问我的一个关于 ServerBootstrap 启动的问题.
相关 issue
他的问题我复述一下:
ServerBootstrap 的绑定流程如下:

ServerBootstrap.bind ->
    AbstractBootstrap.bind ->
        AbstractBootstrap.doBind ->
            AbstractBootstrap.initAndRegister ->
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)
            doBind0() ->
                channel.eventLoop().execute( () -> channel.bind) ->
                    AbstractUnsafe.bind

AbstractUnsafe.register0 中可能会调用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        ...
        boolean firstRegistration = neverRegistered;
        doRegister();
        ...
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        ...
    }
}

并且在 AbstractUnsafe.bind 中也会有 pipeline.fireChannelActive() 的调用, 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    ...
}

那么有没有可能造成了两次的 pipeline.fireChannelActive() 调用?

我的回答是不会. 为什么呢? 对于直接想知道答案的朋友可以直接阅读到最后面的 回答总结 两节..

下面我们就来根据代码详细分析一下.

分析

首先, 根据我们上面所列出的调用流程, 会有 AbstractBootstrap.doBind 的调用, 它的代码如下:

private ChannelFuture doBind(final SocketAddress localAddress) {
        // 步骤1
        final ChannelFuture regFuture = initAndRegister();
        ...
        // 步骤2
        if (regFuture.isDone()) {
            ...
            doBind0(regFuture, channel, localAddress, promise);
            ...
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    ...
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
        }
}

首先在 doBind 中, 执行步骤1, 即调用 initAndRegister 方法, 这个方法会最终调用到AbstractChannel#AbstractUnsafe.register. 而在 AbstractChannel#AbstractUnsafe.register 中, 会通过 eventLoop.execute 的形式将 AbstractUnsafe.register0 的调用提交到任务队列中(即提交到 eventLoop 线程中, 而当前代码所在的线程是 main 线程), 即:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 当前线程是主线程, 因此这个判断是 false
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new OneTimeTask() {
                @Override
                public void run() {
                    // register0 在 eventLoop 线程中执行.
                    register0(promise);
                }
            });
        } catch (Throwable t) {
           ...
        }
    }
}

接着 AbstractBootstrap.initAndRegister 返回, 回到 AbstractBootstrap.doBind 中, 于是执行到步骤2. 注意, 因为 AbstractUnsafe.register0 是在 eventLoop 中执行的, 因此有可能主线程执行到步骤2 时, AbstractUnsafe.register0 已经执行完毕了, 此时必然有 regFuture.isDone() == true; 但也有可能 AbstractUnsafe.register0 没有来得及执行, 因此此时 regFuture.isDone() == false. 所以上面的步骤2 考虑到了这两种情况, 因此分别针对这两种情况做了区分, 即:

// 步骤2
if (regFuture.isDone()) {
    ...
    doBind0(regFuture, channel, localAddress, promise);
    ...
} else {
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            ...
            doBind0(regFuture, channel, localAddress, promise);
        }
    });
}

一般情况下, regFuture.isDone() 为 false, 因为绑定操作是比较费时的, 因此很大几率会执行到 else 分支, 并且 if 分支和 else 分支从结果上说没有不同, 而且 if 分支逻辑还更简单一些, 因此我们以 else 分支来分析吧. 在 else 分支中, 会为 regFuture 设置一个回调监听器. regFuture 是一个 ChannelFuture, 而 ChannelFuture 代表了一个 Channel 的异步 IO 的操作结果, 因此这里 regFuture 代表了 Channel 注册(register) 的这个异步 IO 的操作结果.
Netty 这里之所以要为 regFuture 设置一个回调监听器, 是为了保证 register 和 bind 的时序上的正确性: Channel 的注册必须要发生在 Channel 的绑定之前.
(关于时序的正确性的问题, 我们在后面有证明)

接下来我们来看一下 AbstractUnsafe.register0 方法:

private void register0(ChannelPromise promise) {
    try {
        ....
        // neverRegistered 一开始是 true, 因此 firstRegistration == true
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;
        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不会执行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

注意, 我需要再强调一下, 这里 AbstractUnsafe.register0 是在 eventLoop 中执行的.
AbstractUnsafe.register0 中会调用 doRegister() 注册 NioServerSocketChannel, 然后调用 safeSetSuccess() 设置 promise 的状态为成功. 而这个 promise 变量是什么呢? 我将 AbstractBootstrap.doBind 的调用链写详细一些:

AbstractBootstrap.doBind ->
    AbstractBootstrap.initAndRegister ->
        MultithreadEventLoopGroup.register ->
            SingleThreadEventLoop.register -> 
                AbstractChannel#AbstractUnsafe.register ->
                    eventLoop.execute( () -> AbstractUnsafe.register0)

在 SingleThreadEventLoop.register 中会实例化一个 DefaultChannelPromise, 即:

@Override
public ChannelFuture register(Channel channel) {
    return register(channel, new DefaultChannelPromise(channel, this));
}

接着调用重载的 SingleThreadEventLoop.register 方法:

@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

我们看到, 实例化的 DefaultChannelPromise 最终会以方法返回值的方式返回到调用方, 即返回到 AbstractBootstrap.doBind 中:

final ChannelFuture regFuture = initAndRegister();

因此我们这里有一个共识: regFuture 是一个在 SingleThreadEventLoop.register 中实例化的 DefaultChannelPromise 对象.

再回到 SingleThreadEventLoop.register 中, 在这里会调用 channel.unsafe().register(this, promise), 将 promise 对象传递到 AbstractChannel#AbstractUnsafe.register 中, 因此在 AbstractUnsafe.register0 中的 promise 就是 AbstractBootstrap.doBind 中的 regFuture.
promise == regFuture 很关键.

既然我们已经确定了 promise 的身份, 那么调用的 safeSetSuccess(promise); 我们也知道是干嘛的了. safeSetSuccess 方法设置一个 Promise 的状态为成功态, 而 Promise 的 成功态 是最终状态, 即此时 promise.isDone() == true. 那么 设置 promise 为成功态后, 会发生什么呢?
还记得不 promise == regFuture, 而我们在 AbstractBootstrap.doBind 的 else 分支中设置了一个回调监听器:

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Throwable cause = future.cause();
        if (cause != null) {
            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
            // IllegalStateException once we try to access the EventLoop of the Channel.
            promise.setFailure(cause);
        } else {
            // Registration was successful, so set the correct executor to use.
            // See https://github.com/netty/netty/issues/2586
            promise.executor = channel.eventLoop();
        }
        doBind0(regFuture, channel, localAddress, promise);
    }
});

因此当 safeSetSuccess(promise); 调用时, 根据 Netty 的 Promise/Future 机制, 会触发上面的 operationComplete 回调, 在回调中调用 doBind0 方法:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

注意到, 有一个关键的地方, 代码中将 **channel.bind** 的调用放到了 eventLoop 中执行. doBind0 返回后, 代码继续执行 AbstractUnsafe.register0 方法的剩余部分代码, 即:

private void register0(ChannelPromise promise) {
    try {
        ....
        safeSetSuccess(promise);
        // safeSetSuccess 返回后, 继续执行如下代码
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        // firstRegistration == true, 而 isActive() == false, 
        // 因此不会执行到 pipeline.fireChannelActive()
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractUnsafe.register0 方法执行完毕后, 才执行到 channel.bind 方法.

channel.bind 方法最终会调用到 AbstractChannel#AbstractUnsafe.bind 方法, 源码如下:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    logger.info("---wasActive: {}---", wasActive);

    try {
        // 调用 NioServerSocketChannel.bind 方法, 
        // 将底层的 Java NIO SocketChannel 绑定到指定的端口.
        // 当 SocketChannel 绑定到端口后, isActive() 才为真.
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    boolean activeNow = isActive();
    logger.info("---activeNow: {}---", activeNow);

    // 这里 wasActive == false
    // isActive() == true
    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

上面的代码中, 调用了 doBind(localAddress) 将底层的 Java NIO SocketChannel 绑定到指定的端口. 并且当 SocketChannel 绑定到端口后, isActive() 才为真.
因此我们知道, 如果 SocketChannel 第一次绑定时, 在调用 doBind 前, wasActive == false == isActive(), 而当调用了 doBind 后, isActive() == true, 因此第一次绑定端口时, if 判断成立, 会调用 pipeline.fireChannelActive().

关于 Channel 注册与绑定的时序问题

我们在前的分析中, 直接认定了 Channel 注册Channel 的绑定 之前完成, 那么依据是什么呢?
其实所有的关键在于 EventLoop 的任务队列机制.
不要闲我啰嗦哦. 我们需要继续回到 AbstractUnsafe.register0 的调用中(再次强调一下, 在 eventLoop 线程中执行AbstractUnsafe.register0), 这个方法我们已经分析了, 它会调用 safeSetSuccess(promise), 并由 Netty 的 Promise/Future 机制, 导致了AbstractBootstrap.doBind 中的 regFuture 所设置的回调监听器的 operationComplete 方法调用, 而 operationComplete 中调用了 AbstractBootstrap.doBind0:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

doBind0 中, 根据 EventLoop 的任务队列机制, 会使用 eventLoop().execute 将 channel.bind 封装为一个 Task, 放到 eventLoop 的 taskQueue 中.
如下用一幅图表示上面的过程:
Alt text

原图在此

而当 channel.bind 被调度时, AbstractUnsafe.register0 早就已经调用结束了.

因此由于 EventLoop 的任务队列机制, 我们知道, 在执行 AbstractUnsafe.register0 时, 是在 EventLoop 线程中的, 而 channel.bind 的调用是以 task 的形式添加到 taskQueue 队列的末尾, 因此必然是有 EventLoop 线程先执行完 AbstractUnsafe.register0 方法后, 才有机会从 taskQueue 中取出一个 task 来执行, 因此这个机制从根本上保证了 Channel 注册发生在绑定 之前.

回答

你的疑惑是, AbstractChannel#AbstractUnsafe.register0 中, 可能会调用 pipeline.fireChannelActive(), 即:

private void register0(ChannelPromise promise) {
    try {
        ...
        boolean firstRegistration = neverRegistered;
        doRegister();
        ...
        if (firstRegistration && isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        ...
    }
}

并且在 AbstractChannel#AbstractUnsafe.bind 中也可能会调用到pipeline.fireChannelActive(), 即:

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        ...
    }

    if (!wasActive && isActive()) {
        invokeLater(new OneTimeTask() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
    ...
}

我觉得是 不会. 因为根据上面我们分析的结果可知, Netty 的 Promise/Future 与 EventLoop 的任务队列机制保证了 NioServerSocketChannel 的注册和 NioServerSocketChannel 的绑定的时序: Channel 的注册必须要发生在 Channel 的绑定之前, 而当一个 NioServerSocketChannel 没有绑定到具体的端口前, 它是不活跃的(Inactive), 因此在 register0 中, if (firstRegistration && isActive()) 就不成立, 进而就不会执行到 pipeline.fireChannelActive() 了.
而执行完注册操作后, 在 AbstractChannel#AbstractUnsafe.bind 才会调用pipeline.fireChannelActive(), 因此最终只有一次 fireChannelActive 调用.

总结

有两点需要注意的:

  • isActive() == true 成立的关键是此 NioServerSocketChannel 已经绑定到端口上了.

  • 由 Promise/Future 与 EventLoop 机制, 导致了 Channel 的注册 发生在 Channel 的绑定 之前, 因此在 AbstractChannel#AbstractUnsafe.register0 中的 isActive() == false, if 判断不成立, 最终就是 register0 中的 pipeline.fireChannelActive() 不会被调用.

查看原文

赞 4 收藏 6 评论 1

labmem 关注了问题 · 2016-10-19

java web开发,报表工具的选择

项目(Java开发)现在要做报表统计,网上搜了一下,貌似jasperreport用的比较多,内部也封装了各种导出功能。
同时我页看到百度前端的报表工具echart也不错,但是只能保存为图片,如果要其他导出形式,还需自己将数据提交到后台,在进一步处理才行。

不知大家在做报表这块时 是如何考虑的? 有其他推荐吗?

关注 15 回答 10

labmem 赞了文章 · 2016-08-29

HTTP结构讲解——《HTTP权威指南》系列

WilsonLiu's blog 首发地址

HTTP结构

第二部分的5章主要介绍了HTTP服务器,代理,缓存,网关和机器人应用程序,这些都是Web系统架构的构造模块。

Web服务器 第五章

Web服务器会对HTTP请求进行处理并提供响应。术语"web服务器"可以用来表示Web服务器的软件,也可以用来表示提供Web页面的特定设备或计算机。

实际的Web服务器会做些什么

  1. 建立连接----接受一个客户端连接,或者如果不希望与这个客户端建立连接,就将其关闭

  2. 接收请求----从网络中读取一条HTTP请求报文

  3. 处理请求----对请求报文进行解释,并采取行动

  4. 访问资源----访问报文中指定的资源

  5. 构建响应----创建带有正确首部的HTTP响应报文

  6. 发送响应----将响应回送给客户端

  7. 记录事务处理过程----将与已完成事务有关的内容记录在一个日志文件中

第一步——接受客户端连接

如果客户端已经打开了一条到服务器的持久连接,可以使用那条连接来发送它的请求。否则,客户端需要打开一条新的到服务器的连接。
处理新连接
客户端请求一条道web服务器的TCP连接时,Web服务器会建立连接,判断连接的另一端是哪个客户端,从TCP连接中将IP地址解析出来。一旦新连接建立起来并被接受,服务器就会将新连接添加到其现存Web服务器连接列表中,并做好监视连接上数据传输的准备。

客户端主机名识别
可以用"反向DNS"对大部分Web服务器进行配置,以便将客户端IP地址转换成客户端主机名。但需要注意的是,主机名的查找可能会花费很长时间,这样会降低Web事务处理的速度。因此,很多大容量Web服务器要么会禁止主机名解析,要么只允许对特定内容进行解析。

第二步——接收请求报文

连接上有数据到达时,web服务器会从网络连接中读取数据,并将请求报文中的内容解析出来。
解析请求报文时,web服务器会不定期地从网络上接收输入数据。网络连接可能随时都会出现延迟。web服务器需要从网络中读取数据,将部分报文数据临时存储在内存中,直到收到足以进行解析的数据并理解其意义为止。
报文的内部表示法
有些Web服务器还会用便于进行报文操作的内部数据结构来存储请求报文,这样就可以将这些报文的数据存放在一个快速查询表中,以便快速访问特定首部的具体值了。

连接的输入/输出处理结构
不同的Web服务器结构以不同的方式为请求服务,如下。

  • 单线程Web服务器

  • 多进程及多线程Web服务器

  • 复用I/O的服务器

  • 复用的多线程Web服务器

第三步——处理请求

一旦web服务器收到了请求,就可以根据方法,资源,首部和可选的主体部分对请求进行处理了。

第四步——对资源的映射以及访问

Web服务器是资源服务器。他们负责发送预先创建好的内容,比如HTML页面或者JPEG图片,以及运行在服务器上的资源生成程序所产生的动态内容。在web服务器将内容传送给客户端之前,要将请求报文中的URI映射为Web服务器上适当的内容或内容生成器,以识别出内容的源头。

docroot
通常,web服务器的文件系统会有一个特殊的文件夹专门用于存放web内容。这个文件夹被称为文档的根目录(document root)。web服务器从请求报文中获取URI,并将其附加在文档根目录的后面。

目录列表
Web服务器可以接收对目录URL的请求,其路径可以解析为一个目录,而不是文件。我们可以对大多数Web服务器进行配置,使其在客户端请求目录URL时采取不同的动作。

  • 返回一个错误

  • 不返回目录,返回一个特殊的默认"索引文件" (DirectoryIndex index.html home.html)

  • 扫描目录,返回一个包含目录内容的HTML界面 (在Aapche中可以通过指令Options -Indexes禁止)

第五步——构建响应

一旦web服务器识别出了资源,就执行请求方法中描述的动作,并返回响应报文。响应报文中包含了响应状态码,响应首部,如果生成了响应主体的话,还包括响应主体。

第六步——发送响应

Web服务器通过连接发送数据时也会面临与接收数据一样的问题。服务器要记录连接的状态,还要特别注意对持久连接的处理。对非持久连接而言,服务器应该在发送了整条报文之后,关闭自己这一端的连接。
对持久连接来说,连接可能仍保持打开状态,在这种情况下,服务器要特别小心,要正确的计算Content-Length首部,不然客户端就无法知道响应什么时候结束了。

第七步——记录日志

当事务结束之后,web服务器会在日志文件中添加一个条目,来描述已执行的事务。

代理 第六章

web上的代理服务器是代表客户端完成事务处理的中间人。如果没有Web代理,HTTP客户端就要直接与HTTP服务器进行对话,有了Web代理,客户端就可以与代理进行对话,然后由代理代表客户端与服务器进行交流,客户端仍然会完成事务的处理,但它是通过代理服务器提供的优质服务来实现的。

代理与网关的区别
严格的来说,代理连接的是两个或多个使用相同协议的应用程序,而网关连接的则是两个或多个使用不同协议的端点。

代理的应用
代理服务器可以实现各种有用的功能,他们可以改善安全性,提高性能,节省费用。代理服务器可以看到并接触所有流过的HTTP流量,所有代理可以监视流量并对其进行修改,以实现很多有用的增值Web服务。

  • 儿童过滤器

  • 文档访问控制

  • 安全防火墙

  • web缓存

  • 反向代理

  • 内容路由器

  • 转码器

  • 匿名者

代理服务器的部署

可以根据其目标用途,将代理放在任意位置。

  • 出口代理

  • 访问(入口)代理

  • 反向代理

  • 网络交换代理

层次化的代理
可以通过代理层次结构将代理级联起来。在代理的层次结构中,会将报文从一个代理传给另外一个代理,直到最终抵达原始服务器为止(然后通过代理传回客户端)。

如何使用代理

  • 修改客户端的代理配置

  • 修改网络,对流量进行拦截并导入一个代理

  • 修改DNS的命名空间,假扮web服务器的名字和IP地址。

  • 修改Web服务器,服务器发送重定向命令

客户端的代理设置

  • 手工配置

  • 预先配置浏览器

  • 代理的自动配置 PAC

  • WPAD的代理发行

缓存 第七章

web缓存是可以自动保存常见文档副本的HTTP设备。当Web请求抵达缓存时,如果本地有"已缓存的"副本,就可以从本地存储设备而不是原始服务器中提取这个文档。

缓存可以优化一下问题

  • 冗余的数据传输

  • 带宽瓶颈

  • 瞬间拥塞

  • 距离时延

命中和未命中的

缓存无法保存世界上的每一份文档。可以用已有的副本为某些到达缓存的请求提供服务,这被称为缓存命中 (cache hit),其他一些请求可能因为没有副本可用,而被转发给原始服务器,这被称为缓存未命中(cache miss)。

  • 文档命中率 (说明了阻止了多个通往外部网络的Web事务,有效降低整体时延)

  • 字节命中率 (说明了阻止了多少字节传向因特网,有利于节省带宽)

再验证

缓存可以在任意时刻,以任意频率对副本进行再验证。如果验证过没有更新则将副本提供给客户端,这被称为再验证命中或缓慢命中,这种方式确实要与原始服务器进行核对,所以会比单纯的缓存命中要慢,但它没有从服务器中获取对象数据,所以要比缓存未命中快一些。

缓存的处理步骤

  1. 接收——缓存从网络中读取抵达的请求报文

  2. 解析——缓存对报文进行解析,提取出URL和各种首部

  3. 查询——缓存查看是否有本地副本可用,如果没有,就获取一份副本(并将其保存在本地)

  4. 新鲜度检测——缓存查看已缓存副本是否足够新鲜,如果不是,就询问服务器是否有任何更新

  5. 创建响应——缓存会用新的首部和已缓存的主体来构建一条响应报文

  6. 发送——缓存通过网络将响应发回给客户端

  7. 日志——缓存可选地创建一个日志文件条目来描述这个事务

保持副本的新鲜

文档过期 (document expiration)

通过特殊的HTTP Cache-Control: max-age = 484200首部和Expires: Fri, 05,2016, 17:20:30 GMT首部,HTTP让原始服务器向每个文档附加了一个过期日期。在缓存文档过期之前,可以以任意频率使用这些副本,而无需与服务器联系。
HTTP/1.0+的Expires首部使用的是绝对日期而不是相对时间,所以我们更倾向于使用比较新的HTTP/1.1的Cache-Control,绝对日期依赖于计算机时钟的正确设置。

服务器再验证 (server revalidation)

文档过期并不意味着它和服务器上目前活跃的文档有实际的区别,这只是意味着到了要进行核对的时间了。

  1. 如果再验证显示内容发生了变化,缓存会获取一份新的文档副本,并将其缓存在旧文档的位置上,然后将文档发送给客户端。

  2. 如果再验证显示内容没有发送变化,缓存只需要获取新的首部,包括一个新的过期时间,并对缓存中的首部进行更新就行了。

用条件方法进行再验证

HTTP定义了5个条件请求首部,对缓存再验证来说最有用的2个首部是If-Modified-Since:dateIf-None-Match:tag(只有两个条件都满足时,才能返回304响应)。

另外3个条件首部包括If-Unmodified-Since(在进行部分文件的传输时,获取文件的其余部分之前要确保文件未发生变化,此时这个首部是非常有用的),If-Range(支持对不完整文档的缓存)和If-Match(用于与web服务器打交道时的并发控制)

If-Modified-Since:Date 再验证

如果从指定日期之后文档被修改过了,就执行请求的方法。可以与Last-Modified服务器响应首部配合使用,只有在内容被修改后与已缓存版本有所不同时才去获取内容。

If-None-Match:实体标签再验证
有些情况下仅使用最后修改日期进行再严重是不够的

  • 有些文档可能会被周期性地重写(比如,从一个后台进程中写入),但实际包含的数据常常是一样的。经内容没有变化,但修改日期会发生变化。

  • 有些文档可能被修改了,但所做修改并不重要,不需要让世界范围内的缓存都重装数据(比如对拼写或注释的修改)

  • 有些服务器无法准确地判定其页面的最后修改日期

  • 有些服务器提供的文档会在亚秒间隙发生变化(比如,实时监视器),对这些服务器来说,以一秒为粒度的修改日期可能就不够用了

为了解决这些问题,HTTP允许用户对被称为实体标签(ETag)的“版本标识符”进行比较。实体标签是附加到文档上的任意标签(引用字符串)。
当发布者对文档进行修改时,可以修改文档的实体标签来说明这个新的版本,这样,如果实体标签被修改了,缓存就可以用If-None-Match条件首部来GET文档的新副本了。

控制缓存的能力

服务器可以通过HTTP定义的几种方式来指定在文档过期前可以将其缓存多长时间。按照优先级递减的顺序,服务器可以:

  • Cache-Control: no-store

  • Cache-Control: no-cache

  • Cache-Control: must-revalidate

  • Cache-Control: max-age

  • 附加一个Expires日期首部到响应中去

  • 不附加过期信息,让缓存确定自己的过期日期

集成点:网关,隧道及中继 第八章

网关 gateway

HTTP扩展和接口的发展是由用户需求驱动的。要在web上发布更复杂的资源的需求出现时,单个应用程序无法处理所有这些能想到的资源。

为了解决这个问题,开发者提出了网关的概念,网关可以作为某种翻译器使用,他可以自动将HTTP流量转换为其他协议,这样HTTP客户端无需了解其他协议,就可以与其他应用层序进行交互了。

可以用一个斜杠来分割客户端和服务器端协议,并以此对网关进行描述
<客户端协议>/<服务器端协议>

CGI Common Gateway Interface

CGI是一个标准接口集,web服务器可以用它来装载程序以响应对特定URL的HTTP请求,并收集程序的输出数据,将其放在HTTP响应中回送。

隧道

web隧道允许用户通过http连接发送非http流量,这样就可以在http上捎带其他协议数据了。使用web隧道最常见的原因就是要在http连接中嵌入非http流量,这样,这类流量就可以穿过只允许web流量通过的防火墙了。

中继 relay

中继是没有完全遵循http规范的简单http代理。中继负责处理http中建立连接的部分,然后对字节进行盲转发。

Web机器人 第九章

Web爬虫是一种机器人,它们会递归地对各种信息性Web站点进行遍历,获取第一个Web页面,然后获取那个页面指向的所有web页面,然后是那些页面指向的所有页面,以此类推。递归地跟踪这些web链接的机器人会沿着HTML超链接创建的网络"爬行",所有称其为爬虫(crawler)或蜘蛛(spider)。

爬虫

根集

在把爬虫放出去之前,需要给他一个起始点。爬虫开始访问的URL初始集合被称作根集(root set)。

避免环路

机器人必须知道他们到过何处,以避免环路(cycle)的出现。

面包屑留下的痕迹

管理大规模web爬虫对其访问过的地址进行管理时使用的一些有用的技术

  • 树和散列表

  • 有损的存在位图

  • 检查点

  • 分类

别名

由于URL“别名”的存在,即使使用了正确的数据结构,有时也很难分辨出以前是否访问过某个页面,如果两个URL看起来不一样,但实际指向的是同一资源,就称这两个URL互为"别名"。

避免循环和重复的一些方法

  • 规范化URL

  • 广度优先的爬行

  • 节流

  • 限制URL大小

  • URL/站点黑名单

  • 模式检测

  • 内容指纹

  • 人工监视

机器人的HTTP

虚拟主机

机器人实现者要支持Host首部,随着虚拟主机的流行,请求中不包含Host首部的话,可能会使机器人将错误的内容与一个特定的URL关联起来。

条件请求

对时间戳或实体标签进行比较,看看它们最近获取的版本是否已经升级以减少获取未更新的内容。

查看原文

赞 5 收藏 8 评论 0

labmem 关注了问题 · 2016-08-20

解决java PrintStream 和 PrintWriter 体现了什么设计模式 ?

我是个初学者,不知道说得对不对,我感觉 这两个打印流 体现的是一种 适配器 的 设计模式 ,对不对呢? 他们都是把一些需要打印的流作为构造器参数 , 把他们呢封装起来 , 可以说为 封装吗 ? 然后实现一些本身流不能实现的功能 . 想法对不对呢 , 望指明

关注 1 回答 1

labmem 回答了问题 · 2016-08-20

解决java PrintStream 和 PrintWriter 体现了什么设计模式 ?

Java I/O这种封装方式是装饰者模式,不是适配器模式。
装饰者模式和适配器模式确实比较像,但还是有些区别的。
适配器模式强调的是将一个接口(程序设计上的接口,非特指Java中的接口)转换为另一个接口;
而装饰者模式则是强调在被装饰者的行为前后加上自己的行为,甚至将被装饰者的行为整个取代掉。

关注 1 回答 1

labmem 回答了问题 · 2016-08-07

解决关于虚引用PhantomReference

这里看到了PhantomReference的用法:

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Field;

public class Test {
    public static boolean isRun = true;

    @SuppressWarnings("static-access")
    public static void main(String[] args) throws Exception {
        String abc = new String("abc");
        System.out.println(abc.getClass() + "@" + abc.hashCode());
        final ReferenceQueue<String> referenceQueue = new ReferenceQueue<String>();
        new Thread() {
            public void run() {
                while (isRun) {
                    Object obj = referenceQueue.poll();
                    if (obj != null) {
                        try {
                            Field rereferent = Reference.class
                                    .getDeclaredField("referent");
                            rereferent.setAccessible(true);
                            Object result = rereferent.get(obj);
                            System.out.println("gc will collect:"
                                    + result.getClass() + "@"
                                    + result.hashCode() + "\t"
                                    + (String) result);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }.start();
        PhantomReference<String> abcWeakRef = new PhantomReference<String>(abc,
                referenceQueue);
        abc = null;
        Thread.currentThread().sleep(3000);
        System.gc();
        Thread.currentThread().sleep(3000);
        isRun = false;
    }
}

原来要和ReferenceQueue搭配使用。

关注 2 回答 1

labmem 赞了文章 · 2016-08-04

Android 中线程间通信原理分析:Looper, MessageQueue, Handler

转载一篇我之前发在csdn上的博客。

自问自答的两个问题

在我们去讨论Handler,Looper,MessageQueue的关系之前,我们需要先问两个问题:

  1. 这一套东西搞出来是为了解决什么问题呢?

  2. 如果让我们来解决这个问题该怎么做?

以上者两个问题,是我最近总结出来的,在我们学习了解一个新的技术之前,最好是先能回答这两个问题,这样你才能对你正在学习的东西有更深刻的认识。

第一个问题:google的程序员们搞出这一套东西是为了解决什么问题的?这个问题很显而易见,为了解决线程间通信的问题。我们都知道,Android的UI/View这一套系统是运行在主线程的,并且这个主线程是死循环的,来看看具体的证据吧。

public final class ActivityThread {
    public static void main(String[] args) {
        
        //...
        
        Looper.loop();

        throw new RuntimeException("Main thread loop unexpectedly exited");
    }
}

如上面的代码示例所示,ActivityThread.main()方法作为Android程序的入口,里面我省略了一些初始化的操作,然后就执行了一句Looper.loop()方法,就没了,再下一行就抛异常了。

loop()方法里面实际上就是一个死循环,一直在执行着,不断的从一个MQ(MessageQueue,后面我都缩写成MQ了)去取消息,如果有的话,那么就执行它或者让它的发送者去处理它。

一般来说,主线程循环中都是执行着一些快速的UI操作,当你有手touch屏幕的时候,系统会产生事件,UI会处理这些事件,这些事件都会在主线程中执行,并快速的响应着UI的变化。如果主线程上发生一些比较耗时的操作,那么它后面的方法就无法得到执行了,那么就会出现卡顿,不流畅。

因此,Android并不希望你在主线程去做一些耗时的操作,这里对“耗时”二字进行朴素的理解就行了,就是执行起来需要消耗的时间比较多的操作。比如读写文件,小的文件也许很快,但你无法预料文件的大小,再比如访问网络,再比如你需要做一些复杂的计算等等。

为了不阻碍主线程流畅的执行,我们就必须在需要的时候把耗时的操作放到其他线程上去,当其他线程完成了工作,再给一个通知(或许还带着数据)给到主线程,让主线程去更新UI什么的,当然了,如果你要的耗时操作只是默默无闻的完成就行了,并不需要通知UI,那么你完全不需要给通知给到UI线程。这就是线程间的通信,其他线程做耗时操作,完成了告诉UI线程,让它进行更新。为了解决这个问题,Android系统给我们提供了这样一套方案来解决。

第二个问题:如果让我们来想一套方案来解决这个线程间通信的问题,该怎么做呢?

先看看我们现在已经有的东西,我们有一个一直在循环的主线程,它实现起来大概是这个样子:

public class OurSystem {
    public static void main(String [] args) {
        for (;;) {
            //do something...
        }
    }
}

为什么主线程要一直死循环的执行呢?

关于这一点,我个人并没有特别透彻的认知,但我猜测,对于有GUI的系统/程序,应该都有一个不断循环的主线程,因为这个GUI程序肯定是要跟人进行交互的,也就是说,需要等待用户的输入,比如触碰屏幕,动动鼠标,敲敲键盘什么的,这些事件肯定是硬件层先获得一个响应/信号,然后会不断的向上封装传递。

如果说我们一碰屏幕,一碰鼠标,就开启一个新线程去处理UI上的变化,首先,这当然是可以的!UI在什么线程上更新其实都是可以的嘛,并不是说一定要在主线程上更新,这是系统给我设的一个套子。然后,问题也会复杂的多,如果我们快速的点击2下鼠标,那么一瞬间就开启了两个新线程去执行,那么这两个线程的执行顺序呢?两个独立的线程,我们是无法保证说先启动的先执行。

所以第一个问题就是执行顺序的问题。

第二个问题就是同步,几个相互独立的线程如果要处理同一个资源,那么造成的结果都是令人困惑,不受控制的。另一方面强行给所有的操作加上同步锁,在效率上也会有问题。

为了解决顺序执行的问题,非常容易就想到的一种方案是事件队列,各种各样的事件先进入到一个队列中,然后有个东西会不断的从队列中获取,这样第一个事件一定在第二个事件之前被执行,这样就保证了顺序,如果我们把这个“取事件”的步骤放在一个线程中去做,那么也顺便解决了资源同步的问题。

因此,对于GUI程序会有一个一直循环的(主)线程,可能就是这样来的吧。

这是一个非常纯净的死循环,我们想要做一些事情的话,就得让它从一个队列里面获取一些事情来做,就像打印机一样。因此我们再编写一个消息队列类,来存放消息。消息队列看起来应该是这样:

public class OurMessageQueue() {
    private LinkedList<Message> mQueue = new LinkedList<Message>();
    
    // 放进去一条消息
    public void enQueue() {
        //...
    }
    
    // 取出一条消息
    public Message deQueue() {
        //...
    }
    
    // 判断是否为空队列
    public boolean isEmpty() {
        //...
    }
}

接下来我们的循环就需要改造成能从消息队列里获取消息,并能够根据消息来做些事情了:

public class OurSystem {
    public static void main(String [] args) {
        
        // 初始化消息队列
        OurMessageQueue mq = ...
    
        for (;;) {
            if (!mq.isEmpty()) {
                Message msg = mq.deQueue();
                //do something...
            }
        }
    }
}

现在我们假象一下,我们需要点击一下按钮,然后去下载一个超级大的文件,下载完成后,我们再让主线程显示文件的大小。

首先,按一下按钮,这个事件应该会被触发到主线程来(具体怎么来的我还尚不清楚,但应该是先从硬件开始,然后插入到消息队列中,主线程的循环就能获取到了),然后主线程开启一个新的异步线程来进行下载,下载完成后再通知主线程来更新,代码看上去是这样的:

// 脑补的硬件设备……
public class OurDevice {
    
    // 硬件设备可能有一个回调
    public void onClick() {
    
        // 先拿到同一个消息队列,并把我们要做的事情插入队列中
        OurMessageQueue mq = ...
        Message msg = Message.newInstance("download a big file");
        mq.enQueue(msg);
    }
}

然后,我们的主线程循环获取到了消息:

public class OurSystem {
    public static void main(String [] args) {
        
        // 初始化消息队列
        OurMessageQueue mq = ...
    
        for (;;) {
            if (!mq.isEmpty()) {
                Message msg = mq.deQueue();
                
                // 是一条通知我们下载文件的消息
                if (msg.isDownloadBigFile()) {
                
                    // 开启新线程去下载文件
                    new Thread(new Runnable() {
                        void run() {
                            // download a big file, may cast 1 min...
                            // ...
                            // ok, we finished download task.
                            
                            // 获取到同一个消息队列
                            OurMessageQueue mq = ...
                            
                            // 消息入队
                            mq.enQueue(Message.newInstance("finished download"));
                        }
                    }).start();
                }
                
                // 是一条通知我们下载完成的消息
                if (msg.isFilishedDownload()) {
                    // update UI!
                }
            }
        }
    }
}

注意,主线程循环获取到消息的时候,显示对消息进行的判断分类,不同的消息应该有不同的处理。在我们获取到一个下载文件的消息时,开启了一个新的线程去执行,耗时操作与主线程就被隔离到不同的执行流中,当完成后,新线程中用同一个消息队列发送了一个通知下载完成的消息,主线程循环获取到后,里面就可以更新UI。

这样就是一个我随意脑补的,简单的跨线程通信的方案。

有如下几点是值得注意的:

  • 主线程是死循环的从消息队列中获取消息。

  • 我们要将消息发送到主线程的消息队列,我们需要通过某种方法能获取到主线程的消息队列对象

  • 消息(Message)的结构应该如何设计呢?

Android 中的线程间通信方案

Looper

android.os.Looper from Grepcode

Android中有一个Looper对象,顾名思义,直译过来就是循环的意思,Looper也确实干了维持循环的事情。

Looper的代码是非常简单的,去掉注释也就300多行。在官方文档的注释中,它推荐我们这样来使用它:

class LooperThread extends Thread {
    public Handler mHandler;

    public void run() {
        Looper.prepare();

        mHandler = new Handler() {
            public void handleMessage(Message msg) {
              // process incoming messages here
            }
        };

        Looper.loop();
    }
}

先来看看prepare方法干了什么。

Looper.prepare()

public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

注意prepare(boolean)方法中,有一个sThreadLocal变量,这个变量有点像一个哈希表,它的key是当前的线程,也就是说,它可以存储一些数据/引用,这些数据/引用是与当前线程是一一对应的,在这里的作用是,它判断一下当前线程是否有Looper这个对象,如果有,那么就报错了,"Only one Looper may be created per thread",一个线程只允许创建一个Looper,如果没有,就new一个新的塞进这个哈希表中。然后它调用了Looper的构造方法。

Looper 的构造方法

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed);
    mThread = Thread.currentThread();
}

Looper的构造方法中,很关键的一句,它new了一个MessageQueue对象,并自己维持了这个MQ的引用。

此时prepare()方法的工作就结束了,接下来需要调用静态方法loop()来启动循环。

Looper.loop()

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;

    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        msg.target.dispatchMessage(msg);

        //...
    }
}

loop()方法,我做了省略,省去了一些不关心的部分。剩下的部分非常的清楚了,首先调用了静态方法myLooper()获取一个Looper对象。

public static Looper myLooper() {
    return sThreadLocal.get();
}

myLooper()同样是静态方法,它是直接从这个ThreadLocal中去获取,这个刚刚说过了,它就类似于一个哈希表,key是当前线程,因为刚刚prepare()的时候,已经往里面set了一个Looper,那么此时应该是可以get到的。拿到当前线程的Looper后,接下来,final MessageQueue queue = me.mQueue;拿到与这个Looper对应的MQ,拿到了MQ后,就开启了死循环,对消息队列进行不停的获取,当获取到一个消息后,它调用了Message.target.dispatchMessage()方法来对消息进行处理。

Looper的代码看完了,我们得到了几个信息:

  • Looper调用静态方法prepare()来进行初始化,一个线程只能创建一个与之对应的LooperLooper初始化的时候会创建一个MQ,因此,有了这样的对应关系,一个线程对应一个Looper,一个Looper对应一个MQ。可以说,它们三个是在一条线上的。

  • Looper调用静态方法loop()开始无限循环的取消息,MQ调用next()方法来获取消息

MessageQueue

android.os.MessageQueue from Grepcode

对于MQ的源码,简单的看一下,构造函数与next()方法就好了。

MQ的构造方法

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}

MQ的构造方法简单的调用了nativeInit()来进行初始化,这是一个jni方法,也就是说,可能是在JNI层维持了它这个消息队列的对象。

MessageQueue.next()

Message next() {
    
    final long ptr = mPtr;
    if (ptr == 0) {
        return null;
    }

    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }

        nativePollOnce(ptr, nextPollTimeoutMillis);

        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (false) Log.v("MessageQueue", "Returning message: " + msg);
                    return msg;
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
        }

    }
}

next()方法的代码有些长,我作了一些省略,请注意到,这个方法也有一个死循环,这样做的效果就是,在Looper的死循环中,调用了next(),而next()这里也在死循环,表面上看起来,方法就阻塞在Looper的死循环中的那一行了,知道next()方法能返回一个Message对象出来。

简单浏览MQ的代码,我们得到了这些信息:

  • MQ的初始化是交给JNI去做的

  • MQ的next()方法是个死循环,在不停的访问MQ,从中获取消息出来返回给Looper去处理。

Message

android.os.Message from Grepcode

Message对象是MQ中队列的element,也是Handler发送,接收处理的一个对象。对于它,我们需要了解它的几个成员属性即可。

Message的成员变量可以分为三个部分:

  • 数据部分:它包括what(int), arg1(int), arg2(int), obj(Object), data(Bundle)等,一般用这些来传递数据。

  • 发送者(target):它有一个成员变量叫target,它的类型是Handler的,这个成员变量很重要,它标记了这个Message对象本身是谁发送的,最终也会交给谁去处理。

  • callback:它有一个成员变量叫callback,它的类型是Runnable,可以理解为一个可以被执行的代码片段。

Handler

android.os.Handler from Grepcode

Handler对象是在API层面供给开发者使用最多的一个类,我们主要通过这个类来进行发送消息与处理消息。

Handler的构造方法(初始化)

通常我们调用没有参数的构造方法来进行初始化,使用起来大概是这样的:

Handler mHandler = new Handler() {
    handleMessage(Message msg) {
        //...
    }
}

没有参数的构造方法最终调用了一个两个参数的构造方法,它的部分源码如下:

public Handler(Callback callback, boolean async) {
    //...
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

注意到,它对mLooper成员变量进行了赋值,通过Looper.myLooper()方法获取到当前线程对应的Looper对象。上面已经提到过,如果Looper调用过prepare()方法,那么这个线程对应了一个Looper实例,这个Looper实例也对应了一个MQ,它们三者之间是一一对应的关系。

然后它通过mLooper对象,获取了一个MQ,存在自己的mQueue成员变量中。

Handler的初始化代码说明了一点,Handler所初始化的地方(所在的线程),就是从将这个线程对应的Looper的引用赋值给Handler,让Handler也持有

对于主线程来说,我们在主线程的执行流中,new一个Handler对象,Handler对象都是持有主线程的Looper(也就是Main Looper)对象的。

同样的,如果我们在一个新线程,不调用Looper.prepare()方法去启动一个Looper,直接new一个Handler对象,那么它就会报错。像这样

new Thread(new Runnable() {
        @Override
        public void run() {
            //Looper.prepare(); 

            //因为Looper没有初始化,所以Looper.myLooper()不能获取到一个Looper对象
            Handler h = new Handler();
            h.sendEmptyMessage(112);

        }
     }).start();

以上代码运行后会报错:

java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()

小结Handler的初始化会获取到当前线程的Looper对象,并通过Looper拿到对应的MQ对象,如果当前线程的执行流并没有执行过Looper.prepare(),则无法创建Handler对象

Handler.sendMessage()

sendMessage消息有各种各样的形式或重载,最终会调用到这个方法:

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

它又调用了enqueueMessage方法:

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

注意到它对Messagetarget属性进行了赋值,这样这条消息就知道自己是被谁发送的了。然后将消息加入到队列中。

Handler.dispatchMessage()

Message对象进入了MQ后,很快的会被MQ的next()方法获取到,这样Looper的死循环中就能得到一个Message对象,回顾一下,接下来,就调用了Message.target.dispatchMessage()方法对这条消息进行了处理。

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

private static void handleCallback(Message message) {
    message.callback.run();
}

public void handleMessage(Message msg) {
    //这个方法是空实现,让客户端程序员去覆写实现自己的逻辑
}

dispatchMessage方法有两个分支,如果callbackRunnable)不是null,则直接执行callback.run()方法,如果callbacknull,则将msg作为参数传给handleMessage()方法去处理,这样就是我们常见的处理方法了。

Message.target与Handler

特别需要注意Message中的target成员变量,它是指向自己的发送者,这一点意味着什么呢?

意味着:一个有Looper的线程可以有很多个Handler,这些Handler都是不同的对象,但是它们都可以将Message对象发送到同一个MQ中,Looper不断的从MQ中获取这些消息,并将消息交给它们的发送者去处理。一个MQ是可以对应多个Handler的(多个Handler都可以往同一个MQ中消息入队)

下图可以简要的概括下它们之间的关系。

Looper,MessageQueue,Handler,Message

总结

  • Looper调用prepare()进行初始化,创建了一个与当前线程对应的Looper对象(通过ThreadLocal实现),并且初始化了一个与当前Looper对应的MessageQueue对象。

  • Looper调用静态方法loop()开始消息循环,通过MessageQueue.next()方法获取Message对象。

  • 当获取到一个Message对象时,让Message的发送者(target)去处理它。

  • Message对象包括数据,发送者(Handler),可执行代码段(Runnable)三个部分组成。

  • Handler可以在一个已经Looper.prepare()的线程中初始化,如果线程没有初始化Looper,创建Handler对象会失败

  • 一个线程的执行流中可以构造多个Handler对象,它们都往同一个MQ中发消息,消息也只会分发给对应的Handler处理。

  • Handler将消息发送到MQ中,Messagetarget域会引用自己的发送者,Looper从MQ中取出来后,再交给发送这个MessageHandler去处理。

  • Message可以直接添加一个Runnable对象,当这条消息被处理的时候,直接执行Runnable.run()方法。

查看原文

赞 3 收藏 9 评论 3

labmem 赞了回答 · 2016-07-29

解决@Resource作用

/**
 * The Resource annotation marks a resource that is needed
 * by the application.  This annotation may be applied to an
 * application component class, or to fields or methods of the
 * component class.  When the annotation is applied to a
 * field or method, the container will inject an instance
 * of the requested resource into the application component
 * when the component is initialized.  If the annotation is
 * applied to the component class, the annotation declares a
 * resource that the application will look up at runtime. <p>
 *
 * Even though this annotation is not marked Inherited, deployment
 * tools are required to examine all superclasses of any component
 * class to discover all uses of this annotation in all superclasses.
 * All such annotation instances specify resources that are needed
 * by the application component.  Note that this annotation may
 * appear on private fields and methods of superclasses; the container
 * is required to perform injection in these cases as well.
 *
 * @since Common Annotations 1.0
 */
@Target({TYPE, FIELD, METHOD})
@Retention(RUNTIME)

这是@Resource注解的介绍,@Resource用在字段或者方法上的时候,Spring会从容器里面寻找需要的类型Bean,@Resource用在类上的时候,Spring会把当前类作为资源放入Spring容器。

@Autowired注解表示让Spring容器自动注入这个bean,@Resource表示把当前注解的类交给Spring容器管理,你不使用@Resource注解,Spring容器就不会加载这个类型的bean,需要注入的时候自然就报错了

你把@Resource用在interface上面也是有问题的吧 应该放在实现类上面

关注 6 回答 5

认证与成就

  • 获得 16 次点赞
  • 获得 26 枚徽章 获得 1 枚金徽章, 获得 8 枚银徽章, 获得 17 枚铜徽章

擅长技能
编辑

(゚∀゚ )
暂时没有

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2015-04-23
个人主页被 2.9k 人浏览